soumilshah1995 opened a new issue, #7286:
URL: https://github.com/apache/hudi/issues/7286

   **_Tips before filing an issue_**
   ### HUDI Dynamodb Concurrency Control
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   * Yes i have been through all Blog post and articles 
   * https://hudi.apache.org/docs/next/concurrency_control/
   * 
https://hudi.apache.org/docs/configurations/#DynamoDB-based-Locks-Configurations
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   - Yes
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   * Code works great i am trying to integrate with DynamoDB for concurrency 
control but HUDI throws error here is code attached 
   ```
   try:
       import os
       import sys
       import uuid
   
       import pyspark
       from pyspark import SparkConf, SparkContext
       from pyspark.sql import SparkSession
       from pyspark.sql.functions import col, asc, desc
       from awsglue.utils import getResolvedOptions
       from awsglue.dynamicframe import DynamicFrame
       from awsglue.context import GlueContext
   
       from faker import Faker
   
       print("All modules are loaded .....")
   
   except Exception as e:
       print("Some modules are missing {} ".format(e))
   
   os.environ['PYSPARK_PYTHON'] = sys.executable
   os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
   
   args = getResolvedOptions(sys.argv, ['base_s3_path', 'table_name'])
   
   base_s3_path = args['base_s3_path']
   table_name = args['table_name']
   
   final_base_path = "{base_s3_path}/tmp/{table_name}".format(
       base_s3_path=base_s3_path, table_name=table_name
   )
   
   target_s3_path = "{base_s3_path}/tmp/hudi_{table_name}_target".format(
       base_s3_path=base_s3_path,
       table_name=table_name
   )
   database_name1 = "mydb"
   curr_region = 'us-east-1'
   
   global faker
   faker = Faker()
   
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data():
           return [
               (
                   x,
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 
'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 
'RJ')),
                   faker.random_int(min=10000, max=150000),
                   faker.random_int(min=18, max=60),
                   faker.random_int(min=0, max=100000),
                   faker.unix_time()
               ) for x in range(10)
           ]
   
   
   def create_spark_session():
       spark = SparkSession \
           .builder \
           .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
           .getOrCreate()
       return spark
   
   
   spark = create_spark_session()
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   
   hudi_options = {
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.recordkey.field': 'emp_id',
       'hoodie.datasource.write.partitionpath.field': 'state',
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': 'upsert',
       'hoodie.datasource.write.precombine.field': 'ts',
   
       'hoodie.datasource.hive_sync.enable': 'true',
       "hoodie.datasource.hive_sync.mode":"hms",
       'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
       'hoodie.datasource.hive_sync.database': database_name1,
       'hoodie.datasource.hive_sync.table': table_name,
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
       'hoodie.datasource.hive_sync.partition_fields': 'state',
       'hoodie.datasource.write.hive_style_partitioning': 'true',
   
       'hoodie.write.concurrency.mode' : 'optimistic_concurrency_control'
       ,'hoodie.cleaner.policy.failed.writes' : 'LAZY'
       ,'hoodie.write.lock.provider' : 
'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider'
       ,'hoodie.write.lock.dynamodb.table' : 'hudi-blog-lock-table'
       ,'hoodie.write.lock.dynamodb.partition_key' : 'tablename'
       ,'hoodie.write.lock.dynamodb.region' : '{0}'.format(curr_region)
       ,"hoodie.aws.access.key":"XXXXXXXXXXXXXXXX"
       ,"hoodie.aws.secret.key":"XXXXXXXXXXXXXXXXX"
   
   
   
   }
   
   
   
   
   # ====================================================
   """Create Spark Data Frame """
   # ====================================================
   data = DataGenerator.get_data()
   columns = ["emp_id", "employee_name", "department", "state", "salary", 
"age", "bonus", "ts"]
   df = spark.createDataFrame(data=data, schema=columns)
   
df.write.format("hudi").options(**hudi_options).mode("overwrite").save(final_base_path)
   
   
   
   
   # ====================================================
   """APPEND """
   # ====================================================
   
   # impleDataUpd = [
   #     (3, "xxx", "Sales", "RJ", 81000, 30, 23000, 827307999),
   #     (7, "x change", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
   # ]
   #
   # columns = ["emp_id", "employee_name", "department", "state", "salary", 
"age", "bonus", "ts"]
   # usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   # 
usr_up_df.write.format("hudi").options(**hudi_options).mode("append").save(final_base_path)
   
   
   # # ====================================================
   
   # final_read_df = spark.read.format("hudi").load(final_base_path)
   # final_read_df.createOrReplaceTempView("hudi_users_view")
   # glueContext.purge_s3_path(target_s3_path,{"retentionPeriod": 0, 
"excludeStorageClasses": ["STANDARD_IA"]} )
   #
   #
   # spark.sql(f"CREATE DATABASE IF NOT EXISTS hudi_demo")
   # spark.sql(f"DROP TABLE IF EXISTS hudi_demo.hudi_users")
   # spark.sql(f"CREATE TABLE IF NOT EXISTS hudi_demo.hudi_users USING PARQUET 
LOCATION '{target_s3_path}' as (SELECT * from hudi_users_view)")
   
   ``` 
   
![image](https://user-images.githubusercontent.com/39345855/203575607-b7d9f884-66ae-4df3-a361-d62a44d1070b.png)
   
   
![image](https://user-images.githubusercontent.com/39345855/203575719-346c9fc0-d85c-40b3-a903-02566f899068.png)
   
   
   ### Please not error arises when adding dynamodb other wise all settings 
works fine as expected 
   JAR FILE i am using can be found
   
https://github.com/soumilshah1995/getting-started-with-pyspark-and-apache-hudi-glue-/tree/main/Video%202/jars
   
   * hudi-spark3-bundle_2.12-0.9.0.jar 
   * spark-avro_2.12-3.0.1.jar
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to