mkk1490 edited a comment on issue #3313:
URL: https://github.com/apache/hudi/issues/3313#issuecomment-884708261


   @nsivabalan during bulk_insert as well as insert for the first time into the 
table, the hoodie_key value was in timestamp. But during the upsert operation, 
it was converted into long. I even added a step just before write to convert to 
timestamp. It still didn't work. But when the field's datatype is date, this 
issue didn't come up.
   
   Sample data with original code:
   1. Hive external table with location pointing to s3 partitioned by 
src_sys_nm string(static value) and yr_mth int(dynamic value) both string
   2. This code was executed on EMR 5.33 with hudi 0.7.0
   3. Sample data has 2 different partition values for yr_mth where update is 
required
   Step 1:
   from pyspark.sql import functions as F
   df = spark.createDataFrame([
   ('10490113453106300000','561295328','2018-02-26 
00:00:00','129999','18078735','15889231','OTHER PLACE OF 
SERVICE','53200900','RESP','201802'),
   ('10490120407605900000','559017519','2018-02-26 
00:00:00','24290619','0','','OTHER PLACE OF 
SERVICE','53214384','RESP','201802'),
   ('10490116725506700000','1275017082','2018-02-27 
00:00:00','209999','7875173','56874023','INPATIENT 
HOSPITAL','53201132','RESP','201802'),
   ('10420113469301100000','1705523399','2018-02-26 
00:00:00','20045','7088459','55674640','EMERGENCY ROOM - 
HOSPITAL','53199046','RESP','201802'),
   ('10419114183800600000','29764982','2018-02-26 
00:00:00','11659999','7786313','56694289','OFFICE','53207492','RESP','201802'),
   ('10490115064904900000','5007290360','2018-02-27 
00:00:00','11720002','6937467','57968295','INPATIENT 
HOSPITAL','53207546','RESP','201803'),
   ('10419114186102800000','4968929315','2018-02-27 
00:00:00','38930013','7311494','55315237','OFFICE','1002607736','RESP','201803'),
   ('10236114374904400000','320648979','2018-02-26 
00:00:00','35160001','0','','INDEPENDENT 
LABORATORY','53217687','RESP','201803'),
   ('10809131289117500000','505439892','2018-02-28 
00:00:00','20860001','2492213','52847481','OTHER PLACE OF 
SERVICE','53212703','RESP','201803'),
   ('10419123011601800000','3138080815','2018-02-25 
00:00:00','32060002','7574212','58923898','INPATIENT 
HOSPITAL','1002608174','RESP','201803') ],
   
['claim_id','pat_id','claim_subm_dt','src_plan_id','src_pri_psbr_id','pri_az_cust_id','plac_of_srvc_cd','az_plan_id','src_sys_nm','yr_mth']
 ).withColumn('claim_subm_dt', F.expr("cast(claim_subm_dt as 
timestamp)")).withColumn('yr_mth', F.expr("cast(yr_mth as int)"))
   
   hudi_options = {
   'hoodie.table.name': 'f_hudi_cow',
   'hoodie.datasource.write.recordkey.field': 
'claim_id,pat_id,claim_subm_dt,plac_of_srvc_cd,src_pri_psbr_id,src_plan_id'
   'hoodie.datasource.write.partitionpath.field': 'src_sys_nm,yr_mth',
   'hoodie.datasource.write.table.Type': 'COPY_ON_WRITE',
   'hoodie.datasource.write.table.name': 'f_hudi_cow',
   'hoodie.combine.before.upsert': 'true',
   'hoodie.datasource.hive_sync.enable': 'true',
   'hoodie.datasource.hive_sync.table': 'f_hudi_cow',
   'hoodie.datasource.hive_sync.partition_fields': 'src_sys_nm,yr_mth',
   'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
   'hoodie.datasource.write.hive_style_partitioning': 'true',
   'hoodie.datasource.hive_sync.database': 
'us_commercial_datalake_app_commons_dev',
   'hoodie.datasource.hive_sync.support_timestamp': 'true',
   'hoodie.datasource.hive_sync.auto_create_db':'false',
   'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.ComplexKeyGenerator',
   'hoodie.datasource.write.row.writer.enable': 'true',
   'hoodie.parquet.small.file.limit': '600000000',
   'hoodie.parquet.max.file.size': '1000000000',
   'hoodie.upsert.shuffle.parallelism': '10000',
   'hoodie.insert.shuffle.parallelism': '10000',
   'hoodie.bulkinsert.shuffle.parallelism': '25',
   'hoodie.clean.automatic': 'false',
   'hoodie.cleaner.commits.retained': 3,
   'hoodie.index.type': 'GLOBAL_SIMPLE',
   'hoodie.simple.index.update.partition.path':'true',
   'hoodie.metadata.enable': 'true'
   }
   
   df.write.format("org.apache.hudi").
   options(**hudi_options).option('hoodie.datasource.write.operation', 
'bulk_insert').
   mode("overwrite").
   save("{s3_path}")
   
   Step 2:
   **Upsert has values 0 changed to 100. Updated Field name: src_pri_psbr_id**
   
   df_upsert  =spark.createDataFrame([
   ('10490113453106300000','561295328','2018-02-26 
00:00:00','129999','18078735','15889231','OTHER PLACE OF 
SERVICE','53200900','RESP','201802'),
   ('10490120407605900000','559017519','2018-02-26 
00:00:00','24290619','100','','OTHER PLACE OF 
SERVICE','53214384','RESP','201802'),
   ('10490116725506700000','1275017082','2018-02-27 
00:00:00','209999','7875173','56874023','INPATIENT 
HOSPITAL','53201132','RESP','201802'),
   ('10420113469301100000','1705523399','2018-02-26 
00:00:00','20045','7088459','55674640','EMERGENCY ROOM - 
HOSPITAL','53199046','RESP','201802'),
   ('10419114183800600000','29764982','2018-02-26 
00:00:00','11659999','7786313','56694289','OFFICE','53207492','RESP','201802'),
   ('10490115064904900000','5007290360','2018-02-27 
00:00:00','11720002','6937467','57968295','INPATIENT 
HOSPITAL','53207546','RESP','201803'),
   ('10419114186102800000','4968929315','2018-02-27 
00:00:00','38930013','7311494','55315237','OFFICE','1002607736','RESP','201803'),
   ('10236114374904400000','320648979','2018-02-26 
00:00:00','35160001','100','','INDEPENDENT 
LABORATORY','53217687','RESP','201803'),
   ('10809131289117500000','505439892','2018-02-28 
00:00:00','20860001','2492213','52847481','OTHER PLACE OF 
SERVICE','53212703','RESP','201803'),
   ('10419123011601800000','3138080815','2018-02-25 
00:00:00','32060002','7574212','58923898','INPATIENT 
HOSPITAL','1002608174','RESP','201803') ],
   
['claim_id','pat_id','claim_subm_dt','src_plan_id','src_pri_psbr_id','pri_az_cust_id','plac_of_srvc_cd','az_plan_id','src_sys_nm','yr_mth']
 ).withColumn('claim_subm_dt', F.expr("cast(claim_subm_dt as 
timestamp)")).withColumn('yr_mth', F.expr("cast(yr_mth as int)"))
   
   hudi_options = {
   'hoodie.table.name': 'f_hudi_cow',
   'hoodie.datasource.write.recordkey.field': 
'claim_id,pat_id,claim_subm_dt,plac_of_srvc_cd,src_pri_psbr_id,src_plan_id'
   'hoodie.datasource.write.partitionpath.field': 'src_sys_nm,yr_mth',
   'hoodie.datasource.write.table.Type': 'COPY_ON_WRITE',
   'hoodie.datasource.write.table.name': 'f_hudi_cow',
   'hoodie.combine.before.upsert': 'true',
   'hoodie.datasource.hive_sync.enable': 'true',
   'hoodie.datasource.hive_sync.table': 'f_hudi_cow',
   'hoodie.datasource.hive_sync.partition_fields': 'src_sys_nm,yr_mth',
   'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
   'hoodie.datasource.write.hive_style_partitioning': 'true',
   'hoodie.datasource.hive_sync.database': 
'us_commercial_datalake_app_commons_dev',
   'hoodie.datasource.hive_sync.support_timestamp': 'true',
   'hoodie.datasource.hive_sync.auto_create_db':'false',
   'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.ComplexKeyGenerator',
   'hoodie.datasource.write.row.writer.enable': 'true',
   'hoodie.parquet.small.file.limit': '600000000',
   'hoodie.parquet.max.file.size': '1000000000',
   'hoodie.upsert.shuffle.parallelism': '10000',
   'hoodie.insert.shuffle.parallelism': '10000',
   'hoodie.bulkinsert.shuffle.parallelism': '25',
   'hoodie.clean.automatic': 'false',
   'hoodie.cleaner.commits.retained': 3,
   'hoodie.index.type': 'GLOBAL_SIMPLE',
   'hoodie.simple.index.update.partition.path':'true',
   'hoodie.metadata.enable': 'true'
   }
   
   df.write.format("org.apache.hudi").
   options(**hudi_options).option('hoodie.datasource.write.operation', 
'upsert').
   mode("append").
   save("{s3_path}")
   
   Expected output: src_pri_psbr_id field value- 0 to be updated to 100
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to