samirakarki opened a new issue, #8135:
URL: https://github.com/apache/hudi/issues/8135

   We are using AWS Glue and Hudi connector 0.10.1.2. Spark version:3.1
   Our ELT pipeline full load is capturing all the output records based out of 
join among different tables. However, on the incremental load, we are having a 
large discrepancy in the output table.
   
   Here's one scenario for incremental load where the discrepancy is happening:
   
   We join multiple tables  to create a view in s3 bucket.
   
   Spark sql join : This is inner join
   when the first table in join does not have any update in any of the records, 
the last_commit_time remains unchanged, returns empty dataframe.
   The second table does have some update in records with same join key
   
   We have our built in function for get_dynamic_frame
   
   def hudi_get_dynamic_frame(glue_context: glue_context,
                             glue_table_name:glue_catalog,
                             input_bucket:raw_bucket,
                             input_path:bucket_to_read,
                             partitioned:true,
                             Transformation_ctx: last_hudi_commit_time):
   
   First_table_in _join = old timestamp (since no change in any records, 
returns empty dataframe)
   second_table_in _join = latest timestamp (record changed in the incremental 
load)
   
   Here,the join logic doesn't work properly because the second table doesn’t 
know which table/key to join with the first_table since that returns a none 
dynamic frame. Hence the output table doesn't capture update in second table 
based on the join key.
   
   In the above scenario I have just stated inner join as an example. We have 
inner and left join among different tables. But we are missing records or there 
are duplicate records for primary_key in the final joined table.
   
   We want join logic to get all updates & inserts from first_table in the join 
with the corresponding updates and inserts from second_table in join based on 
the join_keys in the incremental load.  
   
   Hudi Configurations:
   HUDI_COMMON_CONFIG = {
      'connectionName': 'hudi-connection',
      'className': 'org.apache.hudi',
      'hoodie.datasource.hive_sync.enable': 'true',
      'hoodie.datasource.hive_sync.use_jdbc': 'false',
      'hoodie.datasource.hive_sync.database': 'default',
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.combine.before.insert': 'true',
      'hoodie.datasource.write.reconcile.schema': 'true',
      'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.NonPartitionedExtractor',
      'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
      'hoodie.datasource.hive_sync.mode': "hms"
   
   }
   
   we have custom function to get Last_read_commit 
    Hudi config:
      connection_options['hoodie.datasource.query.type'] = 'incremental'
      connection_options['hoodie.datasource.read.begin.instanttime']= 
last_read_commit
   
   Partitioned Hudi config:
   connection_options['hoodie.datasource.hive_sync.partition_extractor_class'] 
= 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
   connection_options['hoodie.datasource.hive_sync.partition_fields'] = 
partition_field
   
   # Add hudi connection options specific to partitions
   connection_options['hoodie.datasource.hive_sync.partition_extractor_class'] 
= \
      'org.apache.hudi.hive.MultiPartKeysValueExtractor'
   connection_options['hoodie.datasource.hive_sync.partition_fields'] = 
'_hoodie_partition_path'
   
   # Use consistent timestamps between row-writer and non-row-writer writes
   
connection_options['hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled']
 = True
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to