ccchenhe commented on issue #4881:
URL: https://github.com/apache/hudi/issues/4881#issuecomment-1135621886

   > @ccchenhe said that COW may lost the index data, let's see if we can have 
more details :)
   
   
   ### summary
   my flink application consume same kafka and sink multiple hudi tables. when 
we load history from spark, submit flink application, after first global index 
bootstrap, I found data duplication, such as same record key has 2 rows 
distributed in different files
   
   ### env
   flink: 1.13.12
   hudi: 0.10.0
   hadoop version:3.2.1(cdh)
   
   ### spark config
   ```scala
   .write
   .format("org.apache.hudi")
   .options(hoodieConfig)
   .option("write.precombine.field", "update_time")
   .option("hoodie.bucket.index.num.buckets", "16")
   .option("hoodie.bucket.index.hash.field", "id")
   .option(PRECOMBINE_FIELD_OPT_KEY, "update_time")
   .option("hive_sync.mode", "hms")
   .option("hoodie.bulkinsert.shuffle.parallelism", "40")
   .option("hoodie.clustering.inline", "true")
   .option("hoodie.clustering.inline.max.commits", "1")
   .option("hoodie.clustering.plan.strategy.max.bytes.per.group", "2147483648")
   .option("hoodie.clustering.plan.strategy.target.file.max.bytes", 
"1073741824")
   .option("hoodie.clustering.plan.strategy.small.file.limit", "314572800")
   .option("hive_sync.metastore.uris", "thrift://hms:9083")
   .option(RECORDKEY_FIELD_OPT_KEY, "id")
   .option(PARTITIONPATH_FIELD_OPT_KEY, "") // has no partitions
   .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
   .option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL)
   .option(HoodieWriteConfig.TABLE_NAME, "hudi_table_test")
   .option("hoodie.upsert.shuffle.parallelism", "2")
   .option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
   .option(HIVE_URL_OPT_KEY, "thrift://hms:9083")
   .option(HIVE_USE_JDBC_OPT_KEY, "false")
   .option(HIVE_DATABASE_OPT_KEY, "test_db")
   .option(HIVE_TABLE_OPT_KEY,  "hudi_table_test")
   .option(HIVE_PARTITION_FIELDS_OPT_KEY, "")
   .option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
   .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.NonPartitionedExtractor")
   .option(KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.CustomKeyGenerator")
   .option(PAYLOAD_CLASS_OPT_KEY, 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload")
   .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.BLOOM.name())
   .mode(SaveMode.Overwrite)
   .save("hdfs://location")
   ```
   
   ### flink config
   ```sql
   WITH(
         'compaction.schedule.enabled' = 'true'
        ,'compaction.async.enabled' = 'false'
        ,'compaction.tasks' = '4'
        ,'compaction.delta_commits' = '15'
        ,'hoodie.table.type' = 'COPY_ON_WRITE'
        ,'hoodie.parquet.max.file.size' = '268435456'
        ,'hoodie.datasource.write.recordkey.field' = 'id' 
        ,'hoodie.datasource.write.precombine.field' = 'update_time' 
        ,'hoodie.parquet.small.file.limit' = '104857600'
        ,'hoodie.parquet.compression.codec'= 'snappy'
        ,'connector' = 'hudi'
        ,'path' = 'hdfs://location' 
        ,'index.bootstrap.enabled' = 'true' 
        ,'index.state.ttl' = '0'
        ,'index.type' = 'BLOOM'
        ,'hoodie.index.type' = 'BLOOM'
        ,'hive_sync.partition_fields' = ''
        ,'hive_sync.metastore.uris' = 'thrift://hms:9083'
        ,'hive_sync.db' = 'test_db' 
        ,'hive_sync.table' = 'hudi_table_test' 
        ,'hive_sync.enable' = 'true'
        ,'hive_sync.use_jdbc' = 'false'
        ,'hive_sync.mode' = 'hms'
        ,'write.operation' = 'upsert'
        ,'write.tasks'='4'
        ,'write.index_bootstrap.tasks'='4' 
        ,'write.bucket_assign.tasks'='16'
        ,'write.rate.limit' = '64000'
        ,'write.precombine.field' = 'update_time'
        ,'hoodie.payload.ordering.field' = 'update_time'
        ,'write.payload.class' = 
'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
        ,'hoodie.datasource.write.partitionpath.field' = ''
        ,'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.keygen.CustomKeyGenerator'
        ,'hive_sync.partition_extractor_class' = 
'org.apache.hudi.hive.NonPartitionedExtractor'
   
   )
   ```
   
   ### flink example
   <img width="1071" alt="image" 
src="https://user-images.githubusercontent.com/20533543/169995136-a6b2f36e-6f6c-447e-824f-44d81de499ec.png";>
   
   ### data example
   <img width="1763" alt="image" 
src="https://user-images.githubusercontent.com/20533543/169996004-fbda2daa-a363-4b26-829f-67cf99128ff9.png";>
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to