mkk1490 edited a comment on issue #3313:
URL: https://github.com/apache/hudi/issues/3313#issuecomment-884708261
@nsivabalan during bulk_insert as well as insert for the first time into the
table, the hoodie_key value was in timestamp. But during the upsert operation,
it was converted into long. I even added a step just before write to convert to
timestamp. It still didn't work. But when the field's datatype is date, this
issue didn't come up.
Sample data with original code:
1. Hive external table with location pointing to s3 partitioned by
src_sys_nm string(static value) and yr_mth int(dynamic value) both string
2. This code was executed on EMR 5.33 with hudi 0.7.0
3. Sample data has 2 different partition values for yr_mth where update is
required
Step 1:
from pyspark.sql import functions as F
df = spark.createDataFrame([
('10490113453106300000','561295328','2018-02-26
00:00:00','129999','18078735','15889231','OTHER PLACE OF
SERVICE','53200900','RESP','201802'),
('10490120407605900000','559017519','2018-02-26
00:00:00','24290619','0','','OTHER PLACE OF
SERVICE','53214384','RESP','201802'),
('10490116725506700000','1275017082','2018-02-27
00:00:00','209999','7875173','56874023','INPATIENT
HOSPITAL','53201132','RESP','201802'),
('10420113469301100000','1705523399','2018-02-26
00:00:00','20045','7088459','55674640','EMERGENCY ROOM -
HOSPITAL','53199046','RESP','201802'),
('10419114183800600000','29764982','2018-02-26
00:00:00','11659999','7786313','56694289','OFFICE','53207492','RESP','201802'),
('10490115064904900000','5007290360','2018-02-27
00:00:00','11720002','6937467','57968295','INPATIENT
HOSPITAL','53207546','RESP','201803'),
('10419114186102800000','4968929315','2018-02-27
00:00:00','38930013','7311494','55315237','OFFICE','1002607736','RESP','201803'),
('10236114374904400000','320648979','2018-02-26
00:00:00','35160001','0','','INDEPENDENT
LABORATORY','53217687','RESP','201803'),
('10809131289117500000','505439892','2018-02-28
00:00:00','20860001','2492213','52847481','OTHER PLACE OF
SERVICE','53212703','RESP','201803'),
('10419123011601800000','3138080815','2018-02-25
00:00:00','32060002','7574212','58923898','INPATIENT
HOSPITAL','1002608174','RESP','201803') ],
['claim_id','pat_id','claim_subm_dt','src_plan_id','src_pri_psbr_id','pri_az_cust_id','plac_of_srvc_cd','az_plan_id','src_sys_nm','yr_mth']
).withColumn('claim_subm_dt', F.expr("cast(claim_subm_dt as
timestamp)")).withColumn('yr_mth', F.expr("cast(yr_mth as int)"))
hudi_options = {
'hoodie.table.name': 'f_hudi_cow',
'hoodie.datasource.write.recordkey.field':
'claim_id,pat_id,claim_subm_dt,plac_of_srvc_cd,src_pri_psbr_id,src_plan_id'
'hoodie.datasource.write.partitionpath.field': 'src_sys_nm,yr_mth',
'hoodie.datasource.write.table.Type': 'COPY_ON_WRITE',
'hoodie.datasource.write.table.name': 'f_hudi_cow',
'hoodie.combine.before.upsert': 'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'f_hudi_cow',
'hoodie.datasource.hive_sync.partition_fields': 'src_sys_nm,yr_mth',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.database':
'us_commercial_datalake_app_commons_dev',
'hoodie.datasource.hive_sync.support_timestamp': 'true',
'hoodie.datasource.hive_sync.auto_create_db':'false',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.row.writer.enable': 'true',
'hoodie.parquet.small.file.limit': '600000000',
'hoodie.parquet.max.file.size': '1000000000',
'hoodie.upsert.shuffle.parallelism': '10000',
'hoodie.insert.shuffle.parallelism': '10000',
'hoodie.bulkinsert.shuffle.parallelism': '25',
'hoodie.clean.automatic': 'false',
'hoodie.cleaner.commits.retained': 3,
'hoodie.index.type': 'GLOBAL_SIMPLE',
'hoodie.simple.index.update.partition.path':'true',
'hoodie.metadata.enable': 'true'
}
df.write.format("org.apache.hudi").
options(**hudi_options).option('hoodie.datasource.write.operation',
'bulk_insert').
mode("overwrite").
save("{s3_path}")
Step 2:
**Upsert has values 0 changed to 100. Updated Field name: src_pri_psbr_id**
df_upsert =spark.createDataFrame([
('10490113453106300000','561295328','2018-02-26
00:00:00','129999','18078735','15889231','OTHER PLACE OF
SERVICE','53200900','RESP','201802'),
('10490120407605900000','559017519','2018-02-26
00:00:00','24290619','100','','OTHER PLACE OF
SERVICE','53214384','RESP','201802'),
('10490116725506700000','1275017082','2018-02-27
00:00:00','209999','7875173','56874023','INPATIENT
HOSPITAL','53201132','RESP','201802'),
('10420113469301100000','1705523399','2018-02-26
00:00:00','20045','7088459','55674640','EMERGENCY ROOM -
HOSPITAL','53199046','RESP','201802'),
('10419114183800600000','29764982','2018-02-26
00:00:00','11659999','7786313','56694289','OFFICE','53207492','RESP','201802'),
('10490115064904900000','5007290360','2018-02-27
00:00:00','11720002','6937467','57968295','INPATIENT
HOSPITAL','53207546','RESP','201803'),
('10419114186102800000','4968929315','2018-02-27
00:00:00','38930013','7311494','55315237','OFFICE','1002607736','RESP','201803'),
('10236114374904400000','320648979','2018-02-26
00:00:00','35160001','100','','INDEPENDENT
LABORATORY','53217687','RESP','201803'),
('10809131289117500000','505439892','2018-02-28
00:00:00','20860001','2492213','52847481','OTHER PLACE OF
SERVICE','53212703','RESP','201803'),
('10419123011601800000','3138080815','2018-02-25
00:00:00','32060002','7574212','58923898','INPATIENT
HOSPITAL','1002608174','RESP','201803') ],
['claim_id','pat_id','claim_subm_dt','src_plan_id','src_pri_psbr_id','pri_az_cust_id','plac_of_srvc_cd','az_plan_id','src_sys_nm','yr_mth']
).withColumn('claim_subm_dt', F.expr("cast(claim_subm_dt as
timestamp)")).withColumn('yr_mth', F.expr("cast(yr_mth as int)"))
hudi_options = {
'hoodie.table.name': 'f_hudi_cow',
'hoodie.datasource.write.recordkey.field':
'claim_id,pat_id,claim_subm_dt,plac_of_srvc_cd,src_pri_psbr_id,src_plan_id'
'hoodie.datasource.write.partitionpath.field': 'src_sys_nm,yr_mth',
'hoodie.datasource.write.table.Type': 'COPY_ON_WRITE',
'hoodie.datasource.write.table.name': 'f_hudi_cow',
'hoodie.combine.before.upsert': 'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'f_hudi_cow',
'hoodie.datasource.hive_sync.partition_fields': 'src_sys_nm,yr_mth',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.database':
'us_commercial_datalake_app_commons_dev',
'hoodie.datasource.hive_sync.support_timestamp': 'true',
'hoodie.datasource.hive_sync.auto_create_db':'false',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.row.writer.enable': 'true',
'hoodie.parquet.small.file.limit': '600000000',
'hoodie.parquet.max.file.size': '1000000000',
'hoodie.upsert.shuffle.parallelism': '10000',
'hoodie.insert.shuffle.parallelism': '10000',
'hoodie.bulkinsert.shuffle.parallelism': '25',
'hoodie.clean.automatic': 'false',
'hoodie.cleaner.commits.retained': 3,
'hoodie.index.type': 'GLOBAL_SIMPLE',
'hoodie.simple.index.update.partition.path':'true',
'hoodie.metadata.enable': 'true'
}
df.write.format("org.apache.hudi").
options(**hudi_options).option('hoodie.datasource.write.operation',
'upsert').
mode("append").
save("{s3_path}")
Expected output: src_pri_psbr_id field value- 0 to be updated to 100
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]