soumilshah1995 opened a new issue, #11391:
URL: https://github.com/apache/hudi/issues/11391

   Description:
   I'm encountering an issue while attempting to use DynamoDB Based Lock with 
an Apache Hudi PySpark job running locally. The goal is to have the job access 
DynamoDB in a specified region for locking purposes.
   
   Configuration Used:
   ```
   'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
   'hoodie.cleaner.policy.failed.writes': 'LAZY',
   'hoodie.write.lock.provider': 
'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
   'hoodie.write.lock.dynamodb.table': 'hudi-lock-table',  # DynamoDB table 
name for locking
   'hoodie.write.lock.dynamodb.region': curr_region,  # DynamoDB region
   'hoodie.write.lock.dynamodb.endpoint_url': 
f'dynamodb.{curr_region}.amazonaws.com',
   'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST',
   
   ```
   
   Error Logs
   ```
   org.apache.hudi.exception.HoodieException: Unable to load class
       at 
org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:58)
       ...
   Caused by: java.lang.ClassNotFoundException: 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
       ...
   
   ```
   
   
   # Code 
   ```
   try:
       import os
       import sys
       import uuid
       import pyspark
       import datetime
       from pyspark.sql import SparkSession
       from pyspark import SparkConf, SparkContext
       from faker import Faker
       import datetime
       from datetime import datetime
       import random
       import pandas as pd
       from pyspark.sql.types import StructType, StructField, StringType, 
DateType, FloatType
       from pyspark.sql.functions import col
   
       from datetime import datetim
   
       print("Imports loaded ")
   
   except Exception as e:
       print("error", e)
   
   HUDI_VERSION = '0.14.0'
   SPARK_VERSION = '3.4'
   
   os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
   SUBMIT_ARGS = f"--packages 
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   
   spark = SparkSession.builder \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   schema = StructType([
       StructField("orderID", StringType(), True),
       StructField("productSKU", StringType(), True),
       StructField("customerID", StringType(), True),
       StructField("orderDate", StringType(), True),
       StructField("orderAmount", FloatType(), True),
       StructField("state", StringType(), True)
   ])
   
   # Create the data array with the additional state value
   data = [
       ("order1", "prod001", "cust001", "2024-01-15", 150.00, "CA"),
       ("order002", "prod002", "cust002", "2024-01-16", 200.00, "NY"),
       ("order003", "prod003", "cust003", "2024-01-17", 300.00, "TX"),
       ("order004", "prod004", "cust004", "2024-01-18", 250.00, "FL"),
       ("order005", "prod005", "cust005", "2024-01-19", 100.00, "WA"),
       ("order006", "prod006", "cust006", "2024-01-20", 350.00, "CA"),
       ("order007", "prod007", "cust007", "2024-01-21", 400.00, "NY")
   ]
   
   # Create the DataFrame
   df = spark.createDataFrame(data, schema)
   
   # Show the DataFrame with the new "state" column
   df.show()
   
   
   def write_to_hudi(spark_df,
                     table_name,
                     db_name,
                     method='upsert',
                     table_type='COPY_ON_WRITE',
                     recordkey='',
                     precombine='',
                     partition_fields='',
                     index_type='BLOOM',
                     curr_region='us-east-1'
                     ):
       path = 
f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
   
       hudi_options = {
           'hoodie.table.name': table_name,
           'hoodie.datasource.write.table.type': table_type,
           'hoodie.datasource.write.table.name': table_name,
           'hoodie.datasource.write.operation': method,
           'hoodie.datasource.write.recordkey.field': recordkey,
           'hoodie.datasource.write.precombine.field': precombine,
           "hoodie.datasource.write.partitionpath.field": partition_fields,
   
           'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
           'hoodie.cleaner.policy.failed.writes': 'LAZY',
           'hoodie.write.lock.provider': 
'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
           'hoodie.write.lock.dynamodb.table': 'hudi-lock-table',  # DynamoDB 
table name for locking
           'hoodie.write.lock.dynamodb.region': curr_region,  # DynamoDB region
           'hoodie.write.lock.dynamodb.endpoint_url': 
f'dynamodb.{curr_region}.amazonaws.com',
           'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST',
   
       }
       print(hudi_options)
   
       print("\n")
       print(path)
       print("\n")
   
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
   
   
   write_to_hudi(
       spark_df=df,
       db_name="default",
       table_name="orders",
       recordkey="orderID",
       precombine="orderDate",
       partition_fields="state",
       index_type="RECORD_INDEX"
   )
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to