ccchenhe commented on issue #4881:
URL: https://github.com/apache/hudi/issues/4881#issuecomment-1135621886
> @ccchenhe said that COW may lost the index data, let's see if we can have
more details :)
### summary
my flink application consume same kafka and sink multiple hudi tables. when
we load history from spark, submit flink application, after first global index
bootstrap, I found data duplication, such as same record key has 2 rows
distributed in different files
### env
flink: 1.13.12
hudi: 0.10.0
hadoop version:3.2.1(cdh)
### spark config
```scala
.write
.format("org.apache.hudi")
.options(hoodieConfig)
.option("write.precombine.field", "update_time")
.option("hoodie.bucket.index.num.buckets", "16")
.option("hoodie.bucket.index.hash.field", "id")
.option(PRECOMBINE_FIELD_OPT_KEY, "update_time")
.option("hive_sync.mode", "hms")
.option("hoodie.bulkinsert.shuffle.parallelism", "40")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "1")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", "2147483648")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes",
"1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "314572800")
.option("hive_sync.metastore.uris", "thrift://hms:9083")
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PARTITIONPATH_FIELD_OPT_KEY, "") // has no partitions
.option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
.option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL)
.option(HoodieWriteConfig.TABLE_NAME, "hudi_table_test")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HIVE_URL_OPT_KEY, "thrift://hms:9083")
.option(HIVE_USE_JDBC_OPT_KEY, "false")
.option(HIVE_DATABASE_OPT_KEY, "test_db")
.option(HIVE_TABLE_OPT_KEY, "hudi_table_test")
.option(HIVE_PARTITION_FIELDS_OPT_KEY, "")
.option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.NonPartitionedExtractor")
.option(KEYGENERATOR_CLASS_OPT_KEY,
"org.apache.hudi.keygen.CustomKeyGenerator")
.option(PAYLOAD_CLASS_OPT_KEY,
"org.apache.hudi.common.model.DefaultHoodieRecordPayload")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.BLOOM.name())
.mode(SaveMode.Overwrite)
.save("hdfs://location")
```
### flink config
```sql
WITH(
'compaction.schedule.enabled' = 'true'
,'compaction.async.enabled' = 'false'
,'compaction.tasks' = '4'
,'compaction.delta_commits' = '15'
,'hoodie.table.type' = 'COPY_ON_WRITE'
,'hoodie.parquet.max.file.size' = '268435456'
,'hoodie.datasource.write.recordkey.field' = 'id'
,'hoodie.datasource.write.precombine.field' = 'update_time'
,'hoodie.parquet.small.file.limit' = '104857600'
,'hoodie.parquet.compression.codec'= 'snappy'
,'connector' = 'hudi'
,'path' = 'hdfs://location'
,'index.bootstrap.enabled' = 'true'
,'index.state.ttl' = '0'
,'index.type' = 'BLOOM'
,'hoodie.index.type' = 'BLOOM'
,'hive_sync.partition_fields' = ''
,'hive_sync.metastore.uris' = 'thrift://hms:9083'
,'hive_sync.db' = 'test_db'
,'hive_sync.table' = 'hudi_table_test'
,'hive_sync.enable' = 'true'
,'hive_sync.use_jdbc' = 'false'
,'hive_sync.mode' = 'hms'
,'write.operation' = 'upsert'
,'write.tasks'='4'
,'write.index_bootstrap.tasks'='4'
,'write.bucket_assign.tasks'='16'
,'write.rate.limit' = '64000'
,'write.precombine.field' = 'update_time'
,'hoodie.payload.ordering.field' = 'update_time'
,'write.payload.class' =
'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
,'hoodie.datasource.write.partitionpath.field' = ''
,'hoodie.datasource.write.keygenerator.class' =
'org.apache.hudi.keygen.CustomKeyGenerator'
,'hive_sync.partition_extractor_class' =
'org.apache.hudi.hive.NonPartitionedExtractor'
)
```
### flink example
<img width="1071" alt="image"
src="https://user-images.githubusercontent.com/20533543/169995136-a6b2f36e-6f6c-447e-824f-44d81de499ec.png">
### data example
<img width="1763" alt="image"
src="https://user-images.githubusercontent.com/20533543/169996004-fbda2daa-a363-4b26-829f-67cf99128ff9.png">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]