qq726658006 opened a new issue, #18199:
URL: https://github.com/apache/hudi/issues/18199
### Describe the problem you faced
Hi,
Our organization is currently using Hudi 1.1.1 to attempt using the MOR
table for CDC reading. However, during the test, we found that when inserting
two identical data records repeatedly, both the first and second records would
show as "insert". We expected that the first rowkind would be "insert" and the
second one would be "update".
Could you please help us check if there are any errors in our configuration?
Thanks!
### To Reproduce
bucket index+ no partition+metadata enable
Without rollback, consecutive republishing of query results: The second
republishing is entirely composed of inserts.
bloom index+ partition+metadata
Without rollback, continuous republishing of query results: The second
republishing is approximately half insert operations.
global bloom index+ partition+metadata
Continuous republish without rollback of query results: The second republish
starts with all being updates, and subsequently continues to output inserts.
1.
Write Config:
`
hoodie.table.recordkey.fields: object_id
hoodie.table.base.file.format: PARQUET
hoodie.table.cdc.enabled: true
hoodie.table.cdc.supplemental.logging.mode: DATA_BEFORE_AFTER
hoodie.datasource.write.partitionpath.field: object_id_hash
hoodie.table.partition.fields: object_id_hash
write.precombine: true
hoodie.datasource.write.precombine.field: data_version
ordering.fields: data_version
write.operation: upsert
hoodie.datasource.write.operation: upsert
cdc.enabled: true
changelog.enabled: false
compaction.async.enabled: true
compaction.trigger.strategy: num_commits
compaction.delta_commits: 10
compaction.tasks: 8
hoodie.table.services.incremental.enabled: true
hoodie.datasource.write.recordkey.field: object_id
table.type: MERGE_ON_READ
write.rate.limit: 0
hoodie.index.type: GLOBAL_BLOOM
hoodie.bucket.index.hash.field: object_id
hoodie.index.bucket.engine: SIMPLE
hoodie.bucket.index.num.buckets: 64
hoodie.parquet.small.file.limit: 134217728 # 128MB
hoodie.parquet.max.file.size: 536870912 # 512MB
hoodie.file.group.reader.enabled: true
hoodie.memory.compaction.fraction: 0.2
hoodie.memory.compaction.max.size: 268435456
hoodie.memory.dfs.buffer.max.size: 12582912
hoodie.metadata.enable: true
hoodie.client.heartbeat.interval_in_ms: 300000
hoodie.client.heartbeat.tolerable.misses: 5
hoodie.clean.policy: KEEP_LATEST_BY_HOURS
hoodie.clean.async.enabled: true
hoodie.clean.automatic: true
hoodie.clean.failed.writes.policy: LAZY
hoodie.clean.hours.retained: 6
hoodie.embed.timeline.server: true
hoodie.filesystem.view.type: MEMORY
hoodie.metrics.on: true
hoodie.metrics.reporter.type: CLOUDWATCH
hoodie.metrics.cloudwatch.report.period.seconds: 300
hoodie.metrics.cloudwatch.metric.prefix: hudi
hoodie.metrics.cloudwatch.metric.namespace: HudiMetrics
hoodie.metrics.cloudwatch.max.datums.per.request: 20
fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.aws.credentials.provider:
org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider
fs.s3a.connection.maximum: 500
fs.s3a.threads.max: 100
fs.s3a.connection.establish.timeout: 120000
fs.s3a.connection.timeout: 600000
fs.s3a.socket.timeout: 600000
fs.s3a.attempts.maximum: 50
fs.s3a.retry.limit: 50
fs.s3a.retry.interval: 500
fs.s3a.fast.upload: true
fs.s3a.fast.upload.buffer: disk
fs.s3a.multipart.size: 268435456
fs.s3a.experimental.input.fadvise: sequential
read.cdc.from.changelog: true
hoodie.write.record.merge.mode: EVENT_TIME_ORDERING
archive.max_commits: 1000
index.bootstrap.enabled: true
hoodie.bloom.index.update.partition.path: false
hoodie.bloom.index.filter.type: SIMPLE
hoodie.index.bloom.num_entries: 40000000
hoodie.index.bloom.fpp: 0.001
hoodie.bloom.index.filter.dynamic.max.entries: 40000000
compaction.max_memory: 1024
state.backend: rocksdb
hoodie.record.merge.mode: EVENT_TIME_ORDERING
hoodie.table.ordering.fields: data_version
index.type: GLOBAL_BLOOM
hoodie.simple.index.update.partition.path: false
hoodie.metadata.index.bloom.filter.enable: true
hoodie.metadata.bloom.filter.dynamic.max.entries: 40000000
hoodie.metadata.bloom.filter.enable: true
hoodie.metadata.bloom.filter.fpp: 0.001
hoodie.metadata.bloom.filter.num.entries: 40000000
hoodie.metadata.bloom.filter.type: SIMPLE
hoodie.metadata.index.bloom.filter.file.group.count: 4
hoodie.bloom.index.use.metadata: true
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10s
hoodie.write.concurrency.mode: single_writer
hoodie.write.lock.provider:
org.apache.hudi.client.transaction.lock.InProcessLockProvider
hoodie.write.lock.inprocess.lock.timeout.ms: 600000
`
2. Read Config
`
Map<String, String> hudiOptions = new HashMap<>();
hudiOptions.put(FlinkOptions.PATH.key(),
hudiTableConfig.getTablePath());
hudiOptions.put(FlinkOptions.TABLE_TYPE.key(), "");
hudiOptions.put(FlinkOptions.RECORD_KEY_FIELD.key(), "object_id");
hudiOptions.put(FlinkOptions.ORDERING_FIELDS.key(), "data_version");
hudiOptions.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
hudiOptions.put(FlinkOptions.READ_CDC_FROM_CHANGELOG.key(), "false");
hudiOptions.put(FlinkOptions.READ_COMMITS_LIMIT.key(), "1");
hudiOptions.put(FlinkOptions.READ_SPLITS_LIMIT.key(), "1");
hudiOptions.put(FlinkOptions.READ_TASKS.key(), "8");
hudiOptions.put(FlinkOptions.READ_START_COMMIT.key(),
hudiTableConfig.getReadStartCommit());
hudiOptions.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(),
hudiTableConfig.getReadStreamingCheckInterval());
hudiOptions.put(FlinkOptions.CDC_ENABLED.key(), "true");
hudiOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "false");
hudiOptions.put(FlinkOptions.METADATA_ENABLED.key(), "false");
hudiOptions.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
hudiOptions.put("hoodie.datasource.query.incremental.format", "cdc");
hudiOptions.put(FlinkOptions.MERGE_TYPE.key(), "fast_merge");
hudiOptions.put(FlinkOptions.KEYGEN_TYPE.key(), "COMPLEX_AVRO");
hudiOptions.put("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
hudiOptions.put("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
hudiOptions.put("fs.s3.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
hudiOptions.put("fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
hudiOptions.put("hoodie.common.diskmap.compression.enabled",
"false");
`
### Expected behavior
when inserting two identical data records repeatedly, both the first and
second records would show as "insert". We expected that the first rowkind would
be "insert" and the second one would be "update".
### Environment Description
* Hudi version: 1.1.1
* Spark version:
* Flink version:1.20.0
* Hive version:
* Hadoop version:3.3.6
* Storage (HDFS/S3/GCS..):S3
* Running on Docker? (yes/no):no
### Additional context
_No response_
### Stacktrace
```shell
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]