mkk1490 edited a comment on issue #3313:
URL: https://github.com/apache/hudi/issues/3313#issuecomment-884708261
@nsivabalan during bulk_insert as well as insert for the first time into the
table, the hoodie_key value was in timestamp. But during the upsert operation,
it was converted into long. I even added a step just before write to convert to
timestamp. It still didn't work. But when the field's datatype is date, this
issue didn't come up.
Sample data with original code:
1. Hive external table with location pointing to s3
2. This code was executed on EMR 5.33 with hudi 0.7.0
Step 1:
from pyspark.sql import functions as F
df = spark.createDataFrame([
('10490113453106300000','561295328','2018-02-26
00:00:00','129999','18078735','15889231','OTHER PLACE OF
SERVICE','53200900','RESP','201802'),
('10490120407605900000','559017519','2018-02-26
00:00:00','24290619','0','','OTHER PLACE OF
SERVICE','53214384','RESP','201802'),
('10490116725506700000','1275017082','2018-02-27
00:00:00','209999','7875173','56874023','INPATIENT
HOSPITAL','53201132','RESP','201802'),
('10420113469301100000','1705523399','2018-02-26
00:00:00','20045','7088459','55674640','EMERGENCY ROOM -
HOSPITAL','53199046','RESP','201802'),
('10419114183800600000','29764982','2018-02-26
00:00:00','11659999','7786313','56694289','OFFICE','53207492','RESP','201802'),
('10490115064904900000','5007290360','2018-02-27
00:00:00','11720002','6937467','57968295','INPATIENT
HOSPITAL','53207546','RESP','201803'),
('10419114186102800000','4968929315','2018-02-27
00:00:00','38930013','7311494','55315237','OFFICE','1002607736','RESP','201803'),
('10236114374904400000','320648979','2018-02-26
00:00:00','35160001','0','','INDEPENDENT
LABORATORY','53217687','RESP','201803'),
('10809131289117500000','505439892','2018-02-28
00:00:00','20860001','2492213','52847481','OTHER PLACE OF
SERVICE','53212703','RESP','201803'),
('10419123011601800000','3138080815','2018-02-25
00:00:00','32060002','7574212','58923898','INPATIENT
HOSPITAL','1002608174','RESP','201803') ],
['claim_id','pat_id','claim_subm_dt','src_plan_id','src_pri_psbr_id','pri_az_cust_id','plac_of_srvc_cd','az_plan_id','src_sys_nm','yr_mth']
).withColumn('claim_subm_dt', F.expr("cast(claim_subm_dt as timestamp)"))
hudi_options = {
'hoodie.table.name': 'f_hudi_cow',
'hoodie.datasource.write.recordkey.field':
'claim_id,pat_id,claim_subm_dt,plac_of_srvc_cd,src_pri_psbr_id,src_plan_id'
'hoodie.datasource.write.partitionpath.field': 'src_sys_nm,yr_mth',
'hoodie.datasource.write.table.Type': 'COPY_ON_WRITE',
'hoodie.datasource.write.table.name': 'f_hudi_cow',
'hoodie.combine.before.upsert': 'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'f_hudi_cow',
'hoodie.datasource.hive_sync.partition_fields': 'src_sys_nm,yr_mth',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.database':
'us_commercial_datalake_app_commons_dev',
'hoodie.datasource.hive_sync.support_timestamp': 'true',
'hoodie.datasource.hive_sync.auto_create_db':'false',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.row.writer.enable': 'true',
'hoodie.parquet.small.file.limit': '600000000',
'hoodie.parquet.max.file.size': '1000000000',
'hoodie.upsert.shuffle.parallelism': '10000',
'hoodie.insert.shuffle.parallelism': '10000',
'hoodie.bulkinsert.shuffle.parallelism': '25',
'hoodie.clean.automatic': 'false',
'hoodie.cleaner.commits.retained': 3,
'hoodie.index.type': 'GLOBAL_SIMPLE',
'hoodie.simple.index.update.partition.path':'true',
'hoodie.metadata.enable': 'true'
}
df.write.format("org.apache.hudi").
options(**hudi_options).option('hoodie.datasource.write.operation',
'bulk_insert').
mode("overwrite").
save("{s3_path}")
Step 2:
**Upsert has values 0 changed to 100. Updated Field name: src_pri_psbr_id**
df_upsert =spark.createDataFrame([
('10490113453106300000','561295328','2018-02-26
00:00:00','129999','18078735','15889231','OTHER PLACE OF
SERVICE','53200900','RESP','201802'),
('10490120407605900000','559017519','2018-02-26
00:00:00','24290619','100','','OTHER PLACE OF
SERVICE','53214384','RESP','201802'),
('10490116725506700000','1275017082','2018-02-27
00:00:00','209999','7875173','56874023','INPATIENT
HOSPITAL','53201132','RESP','201802'),
('10420113469301100000','1705523399','2018-02-26
00:00:00','20045','7088459','55674640','EMERGENCY ROOM -
HOSPITAL','53199046','RESP','201802'),
('10419114183800600000','29764982','2018-02-26
00:00:00','11659999','7786313','56694289','OFFICE','53207492','RESP','201802'),
('10490115064904900000','5007290360','2018-02-27
00:00:00','11720002','6937467','57968295','INPATIENT
HOSPITAL','53207546','RESP','201803'),
('10419114186102800000','4968929315','2018-02-27
00:00:00','38930013','7311494','55315237','OFFICE','1002607736','RESP','201803'),
('10236114374904400000','320648979','2018-02-26
00:00:00','35160001','100','','INDEPENDENT
LABORATORY','53217687','RESP','201803'),
('10809131289117500000','505439892','2018-02-28
00:00:00','20860001','2492213','52847481','OTHER PLACE OF
SERVICE','53212703','RESP','201803'),
('10419123011601800000','3138080815','2018-02-25
00:00:00','32060002','7574212','58923898','INPATIENT
HOSPITAL','1002608174','RESP','201803') ],
['claim_id','pat_id','claim_subm_dt','src_plan_id','src_pri_psbr_id','pri_az_cust_id','plac_of_srvc_cd','az_plan_id','src_sys_nm','yr_mth']
).withColumn('claim_subm_dt', F.expr("cast(claim_subm_dt as timestamp)"))
hudi_options = {
'hoodie.table.name': 'f_hudi_cow',
'hoodie.datasource.write.recordkey.field':
'claim_id,pat_id,claim_subm_dt,plac_of_srvc_cd,src_pri_psbr_id,src_plan_id'
'hoodie.datasource.write.partitionpath.field': 'src_sys_nm,yr_mth',
'hoodie.datasource.write.table.Type': 'COPY_ON_WRITE',
'hoodie.datasource.write.table.name': 'f_hudi_cow',
'hoodie.combine.before.upsert': 'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'f_hudi_cow',
'hoodie.datasource.hive_sync.partition_fields': 'src_sys_nm,yr_mth',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.database':
'us_commercial_datalake_app_commons_dev',
'hoodie.datasource.hive_sync.support_timestamp': 'true',
'hoodie.datasource.hive_sync.auto_create_db':'false',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.row.writer.enable': 'true',
'hoodie.parquet.small.file.limit': '600000000',
'hoodie.parquet.max.file.size': '1000000000',
'hoodie.upsert.shuffle.parallelism': '10000',
'hoodie.insert.shuffle.parallelism': '10000',
'hoodie.bulkinsert.shuffle.parallelism': '25',
'hoodie.clean.automatic': 'false',
'hoodie.cleaner.commits.retained': 3,
'hoodie.index.type': 'GLOBAL_SIMPLE',
'hoodie.simple.index.update.partition.path':'true',
'hoodie.metadata.enable': 'true'
}
df.write.format("org.apache.hudi").
options(**hudi_options).option('hoodie.datasource.write.operation',
'upsert').
mode("append").
save("{s3_path}")
Expected output: 0 to be updated to 100.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]