jjtjiang commented on issue #5777:
URL: https://github.com/apache/hudi/issues/5777#issuecomment-1158711388

   > Yes, it should do dedup for log files. I'll test the default behavior when 
reading, to see if there is any potential bug.
   
   this is my code  and  some test data
   code :
   
           val dsNew = spark.createDataFrame(rdd, schemaNew)
   //        dsNew.show(10,false)
           HudiForeachBatchDeleteFunction.updateDsw(dsNew, hivePropertiesBean, 
hudiPropertiesBean)
   case class HivePropertiesBean(
                                  var hiveUrl: String,
                                  var hiveUsername: String,
                                  var hivePassword: String)
   case class HudiPropertiesBean(
                                  var hudiTableType: String,
                                  var hudiHiveDatabase: String,
                                  var hudiTableName: String,
                                  var hudiLoadPath: String,
                                  var hudiIndexId: String,
                                  var hudiTimestamp: String,
                                  var hasPartition: String,
                                  var hudiPartition: String,
                                  var partitionDateField: String,
                                  var partitionDateType: String,
                                  var initHdfsPath: String,
                                  var payload: String
                                )
   object HudiForeachBatchDeleteFunction {
   
     val hudiOrgPro = "hudi"
   
     def writeOptions(dfRow: DataFrameWriter[Row], hivePropertiesBean: 
HivePropertiesBean, hudiPropertiesBean: HudiPropertiesBean): Unit = {
       val newDfRow: DataFrameWriter[Row] = partitionType(dfRow, 
hudiPropertiesBean)
       newDfRow
         .option(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), 
hudiPropertiesBean.payload)
         .option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key(), 
"false")
         .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
hudiPropertiesBean.hudiTableType)
         .option(HoodieWriteConfig.TBL_NAME.key(), 
hudiPropertiesBean.hudiTableName)
         .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), 
hudiPropertiesBean.hudiIndexId)
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), 
hudiPropertiesBean.hudiTimestamp)
         .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true")
         .option(DataSourceWriteOptions.HIVE_DATABASE.key(), 
hudiPropertiesBean.hudiHiveDatabase)
         .option(DataSourceWriteOptions.HIVE_TABLE.key(), 
hudiPropertiesBean.hudiTableName)
         .option(DataSourceWriteOptions.HIVE_URL.key(), 
hivePropertiesBean.hiveUrl)
         .option(DataSourceWriteOptions.HIVE_USER.key(), 
hivePropertiesBean.hiveUsername)
         .option(DataSourceWriteOptions.HIVE_PASS.key(), 
hivePropertiesBean.hivePassword)
         .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key,"40")
         .option(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key,"40")
         .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key,"40")
         .option(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key,"40")
         .option(DataSourceWriteOptions.INSERT_DROP_DUPS.key,"true")
   //      .option("hoodie.sql.insert.mode","strict")
         //index options
         .option(HoodieIndexConfig.INDEX_TYPE.key(), 
HoodieIndex.IndexType.BLOOM.name())
         
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), 
"true")
         //.option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(), 
"snappy")
         //others
         //.option(DataSourceWriteOptions.RECONCILE_SCHEMA.key(), value = true)
         
.option(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key(),
 value = true)
         //clustering
         //      .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key, 
value = true)
         //      .option(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE.key(), 
value = true)
         //      
.option(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key, "4")
         //      
.option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "4")
         //      
.option(HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(), 
String.valueOf(120 * 1024 * 1024L))
         //      
.option(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key(), 
String.valueOf(80 * 1024 * 1024L))
         //      
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), 
hudiPropertiesBean.hudiTimestamp)
         //      .option(HoodieClusteringConfig.UPDATES_STRATEGY.key(), 
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
         //      .option(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), 
value = true)
         //merge compaction
         .option(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE.key(), "0.8")
         .option(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key(), 
"0.8")
         //metadata config
         .option(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "240")
         //compaction
               
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "5")
         .option(HoodieCompactionConfig.ASYNC_CLEAN.key(), "true")
         //                  
.option(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.key(), "true")
         .option(HoodieMetadataConfig.ENABLE.key(), "true")
         .mode(SaveMode.Append)
         .save(hudiPropertiesBean.hudiLoadPath)
     }
   
     def partitionType(dfRow: DataFrameWriter[Row], hudiPropertiesBean: 
HudiPropertiesBean): DataFrameWriter[Row] = {
       if (hudiPropertiesBean.hasPartition == "true") {
         dfRow
           .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), 
hudiPropertiesBean.hudiPartition)
           .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), 
classOf[ComplexKeyGenerator].getName)
           .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), 
hudiPropertiesBean.hudiPartition)
           .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), 
classOf[MultiPartKeysValueExtractor].getName)
       } else {
         dfRow
           .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "")
           .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), 
classOf[NonpartitionedKeyGenerator].getName)
           .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "")
           .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), 
classOf[NonPartitionedExtractor].getName)
       }
       dfRow
     }
   
   
     def updateDsw(upsertDs: Dataset[Row], hivePropertiesBean: 
HivePropertiesBean, hudiPropertiesBean: HudiPropertiesBean): Unit = {
       val upsertDsw = upsertDs.toDF().write.format(hudiOrgPro)
         .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       HudiForeachBatchDeleteFunction.writeOptions(upsertDsw, 
hivePropertiesBean, hudiPropertiesBean)
     }
   
     //  def deleteDsw(deleteDs: Dataset[Row], hivePropertiesBean: 
HivePropertiesBean, hudiPropertiesBean: HudiPropertiesBean): Unit = {
     //    val deleteDsDsw = deleteDs.toDF().write.format(hudiOrgPro)
     //      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
     //      .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), 
classOf[EmptyHoodieRecordPayload].getName)
     //    HudiForeachBatchDeleteFunction.writeOptions(deleteDsDsw, 
hivePropertiesBean, hudiPropertiesBean)
     //  }
   }
   test datas: 
   
{"cdc_trans_id":"0c72a1a3-9c22-4e2f-972f-5a26b75c3e7d","src_db_type":"sybase","cdc_system_time":"2022-06-17
 
02:55:07.927","src_timezone":"America/Los_Angeles","src_table_schema":"CIS","src_table_alias":"corp","src_table_name":"order_header","cdc_system_time_long":1655459707927,"src_table_jndi":"CORPSB","json_data":{"prod_exp_date":null,"entry_datetime":"2022-06-15
 12:03:01.353","closed_date":"2022-06-16 
18:05:31.447","printed_date":"2022-06-17 
02:55:06.040","total_cost":221.6400,"from_inv_type":1,"ship_to_po_box":"C/O VDC 
CORP.","hold_id":null,"label_printed":null,"fx_detail_price_total":0.0000,"ship_to_loc":null,"fx_detail_exp_total":0.0000,"ship_to_zip":"78852","bol_date":null,"ship_method":"FG","invoice_id":null,"to_dept_no":null,"bol_printed":null,"order_type":1,"to_contact_no":1,"credit_rel_code":"B","trigger_order_no":131610413,"trigger_cdc_content":null,"from_acct_no":null,"head_exp_total":0.0000,"trigger_sid":955534244,"sales_terr":4592,"trigger_order_type":1,"it_cost_code
 
":null,"ship_to_country":"US","schedule_date":null,"from_contact_no":null,"ship_to_state":"TX","account_rep":9396,"receiving_date":null,"from_loc_no":5,"hold_date":null,"fx_total_cost":0.0000,"delete_id":null,"repick_counter":null,"expected_date":null,"sales_total":229.4400,"fx_total_order":0.0000,"from_dept_no":null,"dist_exp_date":null,"fx_head_exp_total":0.0000,"resale":"Y","detail_price_total":0.0000,"repick_id":null,"company_no":1,"entry_id":696828,"source_table":"order_header","label_date":null,"order_no":131610413,"sales_rel_date":"2022-06-15
 12:04:15.960","qc_date":"2022-06-16 
08:40:34.790","trigger_cdc_action":"U","u_version":"N","rma_disp_type":null,"freight":"P","ship_to_addr":"1025
 ADAMS CIRCLE","q_userid":696828,"ship_to_name":"LITTELFUSE MEX. MFG. B.V. 
80,2747152/ISMAEL 
JULIAN","to_loc_no":1,"total_weight":1.0000,"invoice_date":"2022-06-16 
23:00:00.000","total_order":229.4400,"h_version":1,"issue_date":"2022-06-15 
12:03:03.797","ship_to_city":"EAGLE PASS","delete_date"
 :null,"pick_date":"2022-06-15 12:57:59.000","posting_date":"2022-06-16 
23:00:00.000","manifest_date":null,"int_ref_type":8,"ext_ref":"P22574301","terms_no":"JJ","ship_to_loc_change":null,"fx_currency":null,"credit_rel_date":"2022-06-15
 
12:08:25.230","profile_special_handle":null,"approval":"Autocred","drop_ship":"D","trigger_entry_datetime":"2022-06-17
 
02:55:06.580","mt_expense_code":null,"to_acct_no":107685,"sales_tax":null,"detail_exp_total":0.0000,"to_inv_type":null,"date_flag":"2022-06-15
 
00:00:00.000","carrier_no":1,"int_ref_no":118365916,"invoice_counter":-100,"ship_date":"2022-06-16
 08:45:31.940","fx_sales_total":0.0000}}
   
   
{"cdc_trans_id":"0c72a1a3-9c22-4e2f-972f-5a26b75c3e7d","src_db_type":"sybase","cdc_system_time":"2022-06-17
 
02:55:07.927","src_timezone":"America/Los_Angeles","src_table_schema":"CIS","src_table_alias":"corp","src_table_name":"order_header","cdc_system_time_long":1655459707927,"src_table_jndi":"CORPSB","json_data":{"prod_exp_date":null,"entry_datetime":"2022-06-15
 12:42:05.427","closed_date":"2022-06-16 
18:05:20.860","printed_date":"2022-06-17 
02:55:06.663","total_cost":85.6100,"from_inv_type":1,"ship_to_po_box":"S56109749/Scott
 
Stockton","hold_id":null,"label_printed":null,"fx_detail_price_total":0.0000,"ship_to_loc":null,"fx_detail_exp_total":0.0000,"ship_to_zip":"37914","bol_date":null,"ship_method":"FG","invoice_id":null,"to_dept_no":null,"bol_printed":null,"order_type":1,"to_contact_no":null,"credit_rel_code":"B","trigger_order_no":131611668,"trigger_cdc_content":null,"from_acct_no":null,"head_exp_total":0.0000,"trigger_sid":955534245,"sales_terr":4502,"trigger_order_type":1,
 
"it_cost_code":null,"ship_to_country":"US","schedule_date":null,"from_contact_no":null,"ship_to_state":"TN","account_rep":9396,"receiving_date":null,"from_loc_no":7,"hold_date":null,"fx_total_cost":0.0000,"delete_id":null,"repick_counter":null,"expected_date":null,"sales_total":91.0800,"fx_total_order":0.0000,"from_dept_no":null,"dist_exp_date":null,"fx_head_exp_total":0.0000,"resale":"Y","detail_price_total":-22.3500,"repick_id":null,"company_no":1,"entry_id":-10706,"source_table":"order_header","label_date":null,"order_no":131611668,"sales_rel_date":"2022-06-15
 12:43:20.837","qc_date":"2022-06-16 
07:35:22.810","trigger_cdc_action":"U","u_version":"+","rma_disp_type":null,"freight":"P","ship_to_addr":"3015
 E Governor John Sevier Hwy","q_userid":null,"ship_to_name":"ATC 
Drivetrain","to_loc_no":1,"total_weight":2.1000,"invoice_date":"2022-06-16 
23:00:00.000","total_order":68.7300,"h_version":1,"issue_date":"2022-06-15 
12:42:08.913","ship_to_city":"KNOXVILLE","delete_date":null,"pick_
 date":"2022-06-15 12:48:40.787","posting_date":"2022-06-16 
23:00:00.000","manifest_date":null,"int_ref_type":8,"ext_ref":"P22575455","terms_no":"JJ","ship_to_loc_change":null,"fx_currency":null,"credit_rel_date":"2022-06-15
 
12:48:14.787","profile_special_handle":null,"approval":"Autocred","drop_ship":"D","trigger_entry_datetime":"2022-06-17
 
02:55:07.127","mt_expense_code":null,"to_acct_no":107685,"sales_tax":null,"detail_exp_total":0.0000,"to_inv_type":null,"date_flag":"2022-06-15
 
00:00:00.000","carrier_no":1,"int_ref_no":118368455,"invoice_counter":-100,"ship_date":"2022-06-16
 07:40:32.010","fx_sales_total":0.0000}}
   
   
{"cdc_trans_id":"0c72a1a3-9c22-4e2f-972f-5a26b75c3e7d","src_db_type":"sybase","cdc_system_time":"2022-06-17
 
02:55:07.927","src_timezone":"America/Los_Angeles","src_table_schema":"CIS","src_table_alias":"corp","src_table_name":"order_header","cdc_system_time_long":1655459707927,"src_table_jndi":"CORPSB","json_data":{"prod_exp_date":null,"entry_datetime":"2022-06-15
 12:58:38.647","closed_date":"2022-06-16 
18:05:36.467","printed_date":"2022-06-17 
02:55:07.297","total_cost":117.1200,"from_inv_type":1,"ship_to_po_box":"06152022/","hold_id":null,"label_printed":null,"fx_detail_price_total":0.0000,"ship_to_loc":null,"fx_detail_exp_total":0.0000,"ship_to_zip":"80214","bol_date":null,"ship_method":"FG","invoice_id":null,"to_dept_no":null,"bol_printed":null,"order_type":1,"to_contact_no":null,"credit_rel_code":"B","trigger_order_no":131612158,"trigger_cdc_content":null,"from_acct_no":null,"head_exp_total":0.0000,"trigger_sid":955534246,"sales_terr":4502,"trigger_order_type":1,"it_cost_code"
 
:null,"ship_to_country":"US","schedule_date":null,"from_contact_no":null,"ship_to_state":"CO","account_rep":9396,"receiving_date":null,"from_loc_no":7,"hold_date":null,"fx_total_cost":0.0000,"delete_id":null,"repick_counter":null,"expected_date":null,"sales_total":117.7000,"fx_total_order":0.0000,"from_dept_no":null,"dist_exp_date":null,"fx_head_exp_total":0.0000,"resale":"Y","detail_price_total":0.0000,"repick_id":null,"company_no":1,"entry_id":-10706,"source_table":"order_header","label_date":null,"order_no":131612158,"sales_rel_date":"2022-06-15
 12:58:47.410","qc_date":"2022-06-16 
07:54:02.423","trigger_cdc_action":"U","u_version":"+","rma_disp_type":null,"freight":"P","ship_to_addr":"10811
 W Collins Ave","q_userid":null,"ship_to_name":"Terumo 
Bct","to_loc_no":1,"total_weight":0.7500,"invoice_date":"2022-06-16 
23:00:00.000","total_order":117.7000,"h_version":1,"issue_date":"2022-06-15 
12:58:42.460","ship_to_city":"LAKEWOOD","delete_date":null,"pick_date":"2022-06-15
 13:04:41.063"
 ,"posting_date":"2022-06-16 
23:00:00.000","manifest_date":null,"int_ref_type":8,"ext_ref":"P22575671","terms_no":"JJ","ship_to_loc_change":null,"fx_currency":null,"credit_rel_date":"2022-06-15
 
13:03:17.197","profile_special_handle":null,"approval":"Autocred","drop_ship":"D","trigger_entry_datetime":"2022-06-17
 
02:55:07.607","mt_expense_code":null,"to_acct_no":107685,"sales_tax":null,"detail_exp_total":0.0000,"to_inv_type":null,"date_flag":"2022-06-15
 
00:00:00.000","carrier_no":1,"int_ref_no":118369157,"invoice_counter":-100,"ship_date":"2022-06-16
 07:55:06.710","fx_sales_total":0.0000}}
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to