soumilshah1995 commented on issue #8412:
URL: https://github.com/apache/hudi/issues/8412#issuecomment-1514656803

   tried what you said still having issue with Hive Sync 
   
![image](https://user-images.githubusercontent.com/39345855/233075724-922544aa-4333-4abb-8dbf-391403ca331d.png)
   
   # Old Py Code
   ```
   """
   Download the dataset
   
https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link
   
   """
   try:
       import json
       import uuid
       import os
       import boto3
       from dotenv import load_dotenv
   
       load_dotenv("../.env")
   except Exception as e:
       pass
   
   global AWS_ACCESS_KEY
   global AWS_SECRET_KEY
   global AWS_REGION_NAME
   
   AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
   AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
   AWS_REGION_NAME = os.getenv("DEV_REGION")
   
   client = boto3.client("emr-serverless",
                         aws_access_key_id=AWS_ACCESS_KEY,
                         aws_secret_access_key=AWS_SECRET_KEY,
                         region_name=AWS_REGION_NAME)
   
   
   def lambda_handler_test_emr(event, context):
       # ------------------Hudi settings 
---------------------------------------------
       glue_db = "hudidb"
       table_name = "tbl_tbl_invoices"
       op = "UPSERT"
       table_type = "COPY_ON_WRITE"
   
       record_key = 'invoiceid'
       precombine = "replicadmstimestamp"
       target_path = "s3://delta-streamer-demo-hudi/hudi/"
       raw_path = "s3://delta-streamer-demo-hudi/raw/"
       partition_feild = "destinationstate"
       MODE = 'FULL_RECORD'  # FULL_RECORD  | METADATA_ONLY
   
       # 
---------------------------------------------------------------------------------
       #                                       EMR
       # 
--------------------------------------------------------------------------------
       ApplicationId = os.getenv("ApplicationId")
       ExecutionTime = 600
       ExecutionArn = os.getenv("ExecutionArn")
       JobName = 'delta_streamer_bootstrap_{}'.format(table_name)
   
       # 
--------------------------------------------------------------------------------
   
       spark_submit_parameters = ' --conf 
spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar'
       spark_submit_parameters += ' --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer'
       spark_submit_parameters += ' --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer'
   
       arguments = [
           '--run-bootstrap',
           # "--enable-sync",
           "--target-base-path", target_path,
           "--target-table", table_name,
           "--table-type", table_type,
           "--hoodie-conf", "hoodie.bootstrap.base.path={}".format(raw_path),
           "--hoodie-conf", 
"hoodie.datasource.write.recordkey.field={}".format(record_key),
           "--hoodie-conf", 
"hoodie.datasource.write.precombine.field={}".format(precombine),
           "--hoodie-conf", 
"hoodie.datasource.write.partitionpath.field={}".format(partition_feild),
           "--hoodie-conf", 
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
           "--hoodie-conf",
           
"hoodie.bootstrap.full.input.provider=org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider",
           "--hoodie-conf",
           
"hoodie.bootstrap.mode.selector=org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector",
           "--hoodie-conf", 
"hoodie.bootstrap.mode.selector.regex.mode={}".format(MODE),
   
           # "--hoodie-conf", "hoodie.database.name={}".format(glue_db),
           # "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
           # "--hoodie-conf", 
"hoodie.datasource.hive_sync.table={}".format(table_name),
           # "--hoodie-conf", 
"hoodie.datasource.hive_sync.partition_fields={}".format(partition_feild),
   
       ]
   
       response = client.start_job_run(
           applicationId=ApplicationId,
           clientToken=uuid.uuid4().__str__(),
           executionRoleArn=ExecutionArn,
           jobDriver={
               'sparkSubmit': {
                   'entryPoint': "command-runner.jar",
                   'entryPointArguments': arguments,
                   'sparkSubmitParameters': spark_submit_parameters
               },
           },
           executionTimeoutMinutes=ExecutionTime,
           name=JobName,
       )
       print("response", end="\n")
       print(response)
   
   
   lambda_handler_test_emr(context=None, event=None)
   
   
   ```
   
   # New Py Code
   
   ```
   """
   Download the dataset
   
https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link
   
   """
   try:
       import json
       import uuid
       import os
       import boto3
       from dotenv import load_dotenv
   
       load_dotenv("../.env")
   except Exception as e:
       pass
   
   global AWS_ACCESS_KEY
   global AWS_SECRET_KEY
   global AWS_REGION_NAME
   
   AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
   AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
   AWS_REGION_NAME = os.getenv("DEV_REGION")
   
   client = boto3.client("emr-serverless",
                         aws_access_key_id=AWS_ACCESS_KEY,
                         aws_secret_access_key=AWS_SECRET_KEY,
                         region_name=AWS_REGION_NAME)
   
   
   def lambda_handler_test_emr(event, context):
       # ------------------Hudi settings 
---------------------------------------------
       glue_db = "hudidb"
       table_name = "tbl_invoices_bootstrap"
       op = "UPSERT"
       table_type = "COPY_ON_WRITE"
   
       record_key = 'invoiceid'
       precombine = "replicadmstimestamp"
       target_path = "s3://delta-streamer-demo-hudi/hudi/"
       raw_path = "s3://delta-streamer-demo-hudi/raw/"
       partition_feild = "destinationstate"
       MODE = 'FULL_RECORD'  # FULL_RECORD  | METADATA_ONLY
   
       # 
---------------------------------------------------------------------------------
       #                                       EMR
       # 
--------------------------------------------------------------------------------
       ApplicationId = os.getenv("ApplicationId")
       ExecutionTime = 600
       ExecutionArn = os.getenv("ExecutionArn")
       JobName = 'delta_streamer_bootstrap_{}'.format(table_name)
       jar_path = 
"s3://delta-streamer-demo-hudi/jar/hudi-spark3-bundle_2.12-0.8.0.jar"
       # 
--------------------------------------------------------------------------------
   
       spark_submit_parameters = ' --conf 
spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar'
       spark_submit_parameters += ' --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer'
       spark_submit_parameters += ' --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer'
   
       arguments = [
           '--run-bootstrap',
   
           "--enable-sync",
   
           "--target-base-path", target_path,
           "--target-table", table_name,
           "--table-type", table_type,
           "--hoodie-conf", "hoodie.bootstrap.base.path={}".format(raw_path),
           "--hoodie-conf", 
"hoodie.datasource.write.recordkey.field={}".format(record_key),
           "--hoodie-conf", 
"hoodie.datasource.write.precombine.field={}".format(precombine),
           "--hoodie-conf", 
"hoodie.datasource.write.partitionpath.field={}".format(partition_feild),
           "--hoodie-conf", 
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
           
"--hoodie-conf","hoodie.bootstrap.full.input.provider=org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider",
           
"--hoodie-conf","hoodie.bootstrap.mode.selector=org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector",
           "--hoodie-conf", 
"hoodie.bootstrap.mode.selector.regex.mode={}".format(MODE),
   
           "--hoodie-conf", "hoodie.database.name={}".format(glue_db),
           "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
           "--hoodie-conf", 
"hoodie.datasource.hive_sync.table={}".format(table_name),
           "--hoodie-conf", 
"hoodie.datasource.hive_sync.partition_fields={}".format(partition_feild),
   
       ]
   
       response = client.start_job_run(
           applicationId=ApplicationId,
           clientToken=uuid.uuid4().__str__(),
           executionRoleArn=ExecutionArn,
           jobDriver={
               'sparkSubmit': {
                   'entryPoint': jar_path,
                   'entryPointArguments': arguments,
                   'sparkSubmitParameters': spark_submit_parameters
               },
           },
           executionTimeoutMinutes=ExecutionTime,
           name=JobName,
       )
       print("response", end="\n")
       print(response)
   
   
   lambda_handler_test_emr(context=None, event=None)
   
   ```
   
   lets connect on slack and see if we can resolve the issue maybe i am missing 
something here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to