samirakarki opened a new issue, #8135:
URL: https://github.com/apache/hudi/issues/8135
We are using AWS Glue and Hudi connector 0.10.1.2. Spark version:3.1
Our ELT pipeline full load is capturing all the output records based out of
join among different tables. However, on the incremental load, we are having a
large discrepancy in the output table.
Here's one scenario for incremental load where the discrepancy is happening:
We join multiple tables to create a view in s3 bucket.
Spark sql join : This is inner join
when the first table in join does not have any update in any of the records,
the last_commit_time remains unchanged, returns empty dataframe.
The second table does have some update in records with same join key
We have our built in function for get_dynamic_frame
def hudi_get_dynamic_frame(glue_context: glue_context,
glue_table_name:glue_catalog,
input_bucket:raw_bucket,
input_path:bucket_to_read,
partitioned:true,
Transformation_ctx: last_hudi_commit_time):
First_table_in _join = old timestamp (since no change in any records,
returns empty dataframe)
second_table_in _join = latest timestamp (record changed in the incremental
load)
Here,the join logic doesn't work properly because the second table doesn’t
know which table/key to join with the first_table since that returns a none
dynamic frame. Hence the output table doesn't capture update in second table
based on the join key.
In the above scenario I have just stated inner join as an example. We have
inner and left join among different tables. But we are missing records or there
are duplicate records for primary_key in the final joined table.
We want join logic to get all updates & inserts from first_table in the join
with the corresponding updates and inserts from second_table in join based on
the join_keys in the incremental load.
Hudi Configurations:
HUDI_COMMON_CONFIG = {
'connectionName': 'hudi-connection',
'className': 'org.apache.hudi',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.database': 'default',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.combine.before.insert': 'true',
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
'hoodie.datasource.hive_sync.mode': "hms"
}
we have custom function to get Last_read_commit
Hudi config:
connection_options['hoodie.datasource.query.type'] = 'incremental'
connection_options['hoodie.datasource.read.begin.instanttime']=
last_read_commit
Partitioned Hudi config:
connection_options['hoodie.datasource.hive_sync.partition_extractor_class']
= 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
connection_options['hoodie.datasource.hive_sync.partition_fields'] =
partition_field
# Add hudi connection options specific to partitions
connection_options['hoodie.datasource.hive_sync.partition_extractor_class']
= \
'org.apache.hudi.hive.MultiPartKeysValueExtractor'
connection_options['hoodie.datasource.hive_sync.partition_fields'] =
'_hoodie_partition_path'
# Use consistent timestamps between row-writer and non-row-writer writes
connection_options['hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled']
= True
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]