jjtjiang commented on issue #5777:
URL: https://github.com/apache/hudi/issues/5777#issuecomment-1158711388
> Yes, it should do dedup for log files. I'll test the default behavior when
reading, to see if there is any potential bug.
this is my code and some test data
code :
val dsNew = spark.createDataFrame(rdd, schemaNew)
// dsNew.show(10,false)
HudiForeachBatchDeleteFunction.updateDsw(dsNew, hivePropertiesBean,
hudiPropertiesBean)
case class HivePropertiesBean(
var hiveUrl: String,
var hiveUsername: String,
var hivePassword: String)
case class HudiPropertiesBean(
var hudiTableType: String,
var hudiHiveDatabase: String,
var hudiTableName: String,
var hudiLoadPath: String,
var hudiIndexId: String,
var hudiTimestamp: String,
var hasPartition: String,
var hudiPartition: String,
var partitionDateField: String,
var partitionDateType: String,
var initHdfsPath: String,
var payload: String
)
object HudiForeachBatchDeleteFunction {
val hudiOrgPro = "hudi"
def writeOptions(dfRow: DataFrameWriter[Row], hivePropertiesBean:
HivePropertiesBean, hudiPropertiesBean: HudiPropertiesBean): Unit = {
val newDfRow: DataFrameWriter[Row] = partitionType(dfRow,
hudiPropertiesBean)
newDfRow
.option(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(),
hudiPropertiesBean.payload)
.option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key(),
"false")
.option(DataSourceWriteOptions.TABLE_TYPE.key(),
hudiPropertiesBean.hudiTableType)
.option(HoodieWriteConfig.TBL_NAME.key(),
hudiPropertiesBean.hudiTableName)
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
hudiPropertiesBean.hudiIndexId)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(),
hudiPropertiesBean.hudiTimestamp)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true")
.option(DataSourceWriteOptions.HIVE_DATABASE.key(),
hudiPropertiesBean.hudiHiveDatabase)
.option(DataSourceWriteOptions.HIVE_TABLE.key(),
hudiPropertiesBean.hudiTableName)
.option(DataSourceWriteOptions.HIVE_URL.key(),
hivePropertiesBean.hiveUrl)
.option(DataSourceWriteOptions.HIVE_USER.key(),
hivePropertiesBean.hiveUsername)
.option(DataSourceWriteOptions.HIVE_PASS.key(),
hivePropertiesBean.hivePassword)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key,"40")
.option(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key,"40")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key,"40")
.option(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key,"40")
.option(DataSourceWriteOptions.INSERT_DROP_DUPS.key,"true")
// .option("hoodie.sql.insert.mode","strict")
//index options
.option(HoodieIndexConfig.INDEX_TYPE.key(),
HoodieIndex.IndexType.BLOOM.name())
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(),
"true")
//.option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(),
"snappy")
//others
//.option(DataSourceWriteOptions.RECONCILE_SCHEMA.key(), value = true)
.option(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key(),
value = true)
//clustering
// .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key,
value = true)
// .option(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE.key(),
value = true)
//
.option(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key, "4")
//
.option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "4")
//
.option(HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(),
String.valueOf(120 * 1024 * 1024L))
//
.option(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key(),
String.valueOf(80 * 1024 * 1024L))
//
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(),
hudiPropertiesBean.hudiTimestamp)
// .option(HoodieClusteringConfig.UPDATES_STRATEGY.key(),
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
// .option(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(),
value = true)
//merge compaction
.option(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE.key(), "0.8")
.option(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key(),
"0.8")
//metadata config
.option(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "240")
//compaction
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "5")
.option(HoodieCompactionConfig.ASYNC_CLEAN.key(), "true")
//
.option(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.key(), "true")
.option(HoodieMetadataConfig.ENABLE.key(), "true")
.mode(SaveMode.Append)
.save(hudiPropertiesBean.hudiLoadPath)
}
def partitionType(dfRow: DataFrameWriter[Row], hudiPropertiesBean:
HudiPropertiesBean): DataFrameWriter[Row] = {
if (hudiPropertiesBean.hasPartition == "true") {
dfRow
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(),
hudiPropertiesBean.hudiPartition)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
classOf[ComplexKeyGenerator].getName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(),
hudiPropertiesBean.hudiPartition)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(),
classOf[MultiPartKeysValueExtractor].getName)
} else {
dfRow
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
classOf[NonpartitionedKeyGenerator].getName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(),
classOf[NonPartitionedExtractor].getName)
}
dfRow
}
def updateDsw(upsertDs: Dataset[Row], hivePropertiesBean:
HivePropertiesBean, hudiPropertiesBean: HudiPropertiesBean): Unit = {
val upsertDsw = upsertDs.toDF().write.format(hudiOrgPro)
.option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
HudiForeachBatchDeleteFunction.writeOptions(upsertDsw,
hivePropertiesBean, hudiPropertiesBean)
}
// def deleteDsw(deleteDs: Dataset[Row], hivePropertiesBean:
HivePropertiesBean, hudiPropertiesBean: HudiPropertiesBean): Unit = {
// val deleteDsDsw = deleteDs.toDF().write.format(hudiOrgPro)
// .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
// .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(),
classOf[EmptyHoodieRecordPayload].getName)
// HudiForeachBatchDeleteFunction.writeOptions(deleteDsDsw,
hivePropertiesBean, hudiPropertiesBean)
// }
}
test datas:
{"cdc_trans_id":"0c72a1a3-9c22-4e2f-972f-5a26b75c3e7d","src_db_type":"sybase","cdc_system_time":"2022-06-17
02:55:07.927","src_timezone":"America/Los_Angeles","src_table_schema":"CIS","src_table_alias":"corp","src_table_name":"order_header","cdc_system_time_long":1655459707927,"src_table_jndi":"CORPSB","json_data":{"prod_exp_date":null,"entry_datetime":"2022-06-15
12:03:01.353","closed_date":"2022-06-16
18:05:31.447","printed_date":"2022-06-17
02:55:06.040","total_cost":221.6400,"from_inv_type":1,"ship_to_po_box":"C/O VDC
CORP.","hold_id":null,"label_printed":null,"fx_detail_price_total":0.0000,"ship_to_loc":null,"fx_detail_exp_total":0.0000,"ship_to_zip":"78852","bol_date":null,"ship_method":"FG","invoice_id":null,"to_dept_no":null,"bol_printed":null,"order_type":1,"to_contact_no":1,"credit_rel_code":"B","trigger_order_no":131610413,"trigger_cdc_content":null,"from_acct_no":null,"head_exp_total":0.0000,"trigger_sid":955534244,"sales_terr":4592,"trigger_order_type":1,"it_cost_code
":null,"ship_to_country":"US","schedule_date":null,"from_contact_no":null,"ship_to_state":"TX","account_rep":9396,"receiving_date":null,"from_loc_no":5,"hold_date":null,"fx_total_cost":0.0000,"delete_id":null,"repick_counter":null,"expected_date":null,"sales_total":229.4400,"fx_total_order":0.0000,"from_dept_no":null,"dist_exp_date":null,"fx_head_exp_total":0.0000,"resale":"Y","detail_price_total":0.0000,"repick_id":null,"company_no":1,"entry_id":696828,"source_table":"order_header","label_date":null,"order_no":131610413,"sales_rel_date":"2022-06-15
12:04:15.960","qc_date":"2022-06-16
08:40:34.790","trigger_cdc_action":"U","u_version":"N","rma_disp_type":null,"freight":"P","ship_to_addr":"1025
ADAMS CIRCLE","q_userid":696828,"ship_to_name":"LITTELFUSE MEX. MFG. B.V.
80,2747152/ISMAEL
JULIAN","to_loc_no":1,"total_weight":1.0000,"invoice_date":"2022-06-16
23:00:00.000","total_order":229.4400,"h_version":1,"issue_date":"2022-06-15
12:03:03.797","ship_to_city":"EAGLE PASS","delete_date"
:null,"pick_date":"2022-06-15 12:57:59.000","posting_date":"2022-06-16
23:00:00.000","manifest_date":null,"int_ref_type":8,"ext_ref":"P22574301","terms_no":"JJ","ship_to_loc_change":null,"fx_currency":null,"credit_rel_date":"2022-06-15
12:08:25.230","profile_special_handle":null,"approval":"Autocred","drop_ship":"D","trigger_entry_datetime":"2022-06-17
02:55:06.580","mt_expense_code":null,"to_acct_no":107685,"sales_tax":null,"detail_exp_total":0.0000,"to_inv_type":null,"date_flag":"2022-06-15
00:00:00.000","carrier_no":1,"int_ref_no":118365916,"invoice_counter":-100,"ship_date":"2022-06-16
08:45:31.940","fx_sales_total":0.0000}}
{"cdc_trans_id":"0c72a1a3-9c22-4e2f-972f-5a26b75c3e7d","src_db_type":"sybase","cdc_system_time":"2022-06-17
02:55:07.927","src_timezone":"America/Los_Angeles","src_table_schema":"CIS","src_table_alias":"corp","src_table_name":"order_header","cdc_system_time_long":1655459707927,"src_table_jndi":"CORPSB","json_data":{"prod_exp_date":null,"entry_datetime":"2022-06-15
12:42:05.427","closed_date":"2022-06-16
18:05:20.860","printed_date":"2022-06-17
02:55:06.663","total_cost":85.6100,"from_inv_type":1,"ship_to_po_box":"S56109749/Scott
Stockton","hold_id":null,"label_printed":null,"fx_detail_price_total":0.0000,"ship_to_loc":null,"fx_detail_exp_total":0.0000,"ship_to_zip":"37914","bol_date":null,"ship_method":"FG","invoice_id":null,"to_dept_no":null,"bol_printed":null,"order_type":1,"to_contact_no":null,"credit_rel_code":"B","trigger_order_no":131611668,"trigger_cdc_content":null,"from_acct_no":null,"head_exp_total":0.0000,"trigger_sid":955534245,"sales_terr":4502,"trigger_order_type":1,
"it_cost_code":null,"ship_to_country":"US","schedule_date":null,"from_contact_no":null,"ship_to_state":"TN","account_rep":9396,"receiving_date":null,"from_loc_no":7,"hold_date":null,"fx_total_cost":0.0000,"delete_id":null,"repick_counter":null,"expected_date":null,"sales_total":91.0800,"fx_total_order":0.0000,"from_dept_no":null,"dist_exp_date":null,"fx_head_exp_total":0.0000,"resale":"Y","detail_price_total":-22.3500,"repick_id":null,"company_no":1,"entry_id":-10706,"source_table":"order_header","label_date":null,"order_no":131611668,"sales_rel_date":"2022-06-15
12:43:20.837","qc_date":"2022-06-16
07:35:22.810","trigger_cdc_action":"U","u_version":"+","rma_disp_type":null,"freight":"P","ship_to_addr":"3015
E Governor John Sevier Hwy","q_userid":null,"ship_to_name":"ATC
Drivetrain","to_loc_no":1,"total_weight":2.1000,"invoice_date":"2022-06-16
23:00:00.000","total_order":68.7300,"h_version":1,"issue_date":"2022-06-15
12:42:08.913","ship_to_city":"KNOXVILLE","delete_date":null,"pick_
date":"2022-06-15 12:48:40.787","posting_date":"2022-06-16
23:00:00.000","manifest_date":null,"int_ref_type":8,"ext_ref":"P22575455","terms_no":"JJ","ship_to_loc_change":null,"fx_currency":null,"credit_rel_date":"2022-06-15
12:48:14.787","profile_special_handle":null,"approval":"Autocred","drop_ship":"D","trigger_entry_datetime":"2022-06-17
02:55:07.127","mt_expense_code":null,"to_acct_no":107685,"sales_tax":null,"detail_exp_total":0.0000,"to_inv_type":null,"date_flag":"2022-06-15
00:00:00.000","carrier_no":1,"int_ref_no":118368455,"invoice_counter":-100,"ship_date":"2022-06-16
07:40:32.010","fx_sales_total":0.0000}}
{"cdc_trans_id":"0c72a1a3-9c22-4e2f-972f-5a26b75c3e7d","src_db_type":"sybase","cdc_system_time":"2022-06-17
02:55:07.927","src_timezone":"America/Los_Angeles","src_table_schema":"CIS","src_table_alias":"corp","src_table_name":"order_header","cdc_system_time_long":1655459707927,"src_table_jndi":"CORPSB","json_data":{"prod_exp_date":null,"entry_datetime":"2022-06-15
12:58:38.647","closed_date":"2022-06-16
18:05:36.467","printed_date":"2022-06-17
02:55:07.297","total_cost":117.1200,"from_inv_type":1,"ship_to_po_box":"06152022/","hold_id":null,"label_printed":null,"fx_detail_price_total":0.0000,"ship_to_loc":null,"fx_detail_exp_total":0.0000,"ship_to_zip":"80214","bol_date":null,"ship_method":"FG","invoice_id":null,"to_dept_no":null,"bol_printed":null,"order_type":1,"to_contact_no":null,"credit_rel_code":"B","trigger_order_no":131612158,"trigger_cdc_content":null,"from_acct_no":null,"head_exp_total":0.0000,"trigger_sid":955534246,"sales_terr":4502,"trigger_order_type":1,"it_cost_code"
:null,"ship_to_country":"US","schedule_date":null,"from_contact_no":null,"ship_to_state":"CO","account_rep":9396,"receiving_date":null,"from_loc_no":7,"hold_date":null,"fx_total_cost":0.0000,"delete_id":null,"repick_counter":null,"expected_date":null,"sales_total":117.7000,"fx_total_order":0.0000,"from_dept_no":null,"dist_exp_date":null,"fx_head_exp_total":0.0000,"resale":"Y","detail_price_total":0.0000,"repick_id":null,"company_no":1,"entry_id":-10706,"source_table":"order_header","label_date":null,"order_no":131612158,"sales_rel_date":"2022-06-15
12:58:47.410","qc_date":"2022-06-16
07:54:02.423","trigger_cdc_action":"U","u_version":"+","rma_disp_type":null,"freight":"P","ship_to_addr":"10811
W Collins Ave","q_userid":null,"ship_to_name":"Terumo
Bct","to_loc_no":1,"total_weight":0.7500,"invoice_date":"2022-06-16
23:00:00.000","total_order":117.7000,"h_version":1,"issue_date":"2022-06-15
12:58:42.460","ship_to_city":"LAKEWOOD","delete_date":null,"pick_date":"2022-06-15
13:04:41.063"
,"posting_date":"2022-06-16
23:00:00.000","manifest_date":null,"int_ref_type":8,"ext_ref":"P22575671","terms_no":"JJ","ship_to_loc_change":null,"fx_currency":null,"credit_rel_date":"2022-06-15
13:03:17.197","profile_special_handle":null,"approval":"Autocred","drop_ship":"D","trigger_entry_datetime":"2022-06-17
02:55:07.607","mt_expense_code":null,"to_acct_no":107685,"sales_tax":null,"detail_exp_total":0.0000,"to_inv_type":null,"date_flag":"2022-06-15
00:00:00.000","carrier_no":1,"int_ref_no":118369157,"invoice_counter":-100,"ship_date":"2022-06-16
07:55:06.710","fx_sales_total":0.0000}}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]