qq726658006 opened a new issue, #18199:
URL: https://github.com/apache/hudi/issues/18199

   ### Describe the problem you faced
   
   Hi,
   Our organization is currently using Hudi 1.1.1 to attempt using the MOR 
table for CDC reading. However, during the test, we found that when inserting 
two identical data records repeatedly, both the first and second records would 
show as "insert". We expected that the first rowkind would be "insert" and the 
second one would be "update".
   Could you please help us check if there are any errors in our configuration? 
   Thanks!
   
   ### To Reproduce
   
   bucket index+ no partition+metadata enable
   Without rollback, consecutive republishing of query results: The second 
republishing is entirely composed of inserts. 
   
   bloom index+ partition+metadata
   Without rollback, continuous republishing of query results: The second 
republishing is approximately half insert operations. 
   
   global bloom index+ partition+metadata
   Continuous republish without rollback of query results: The second republish 
starts with all being updates, and subsequently continues to output inserts.
   
   1.
   
   Write Config:
   `
   hoodie.table.recordkey.fields: object_id
   hoodie.table.base.file.format: PARQUET
   hoodie.table.cdc.enabled: true
   hoodie.table.cdc.supplemental.logging.mode: DATA_BEFORE_AFTER
   hoodie.datasource.write.partitionpath.field: object_id_hash
   hoodie.table.partition.fields: object_id_hash
   
   write.precombine: true
   hoodie.datasource.write.precombine.field: data_version
   ordering.fields: data_version
   write.operation: upsert
   hoodie.datasource.write.operation: upsert
   cdc.enabled: true
   changelog.enabled: false
   
   compaction.async.enabled: true
   compaction.trigger.strategy: num_commits
   compaction.delta_commits: 10
   compaction.tasks: 8
   hoodie.table.services.incremental.enabled: true
   
   hoodie.datasource.write.recordkey.field: object_id
   table.type: MERGE_ON_READ
   
   write.rate.limit: 0
   hoodie.index.type: GLOBAL_BLOOM
   hoodie.bucket.index.hash.field: object_id
   hoodie.index.bucket.engine: SIMPLE
   hoodie.bucket.index.num.buckets: 64
   hoodie.parquet.small.file.limit: 134217728 # 128MB
   hoodie.parquet.max.file.size: 536870912 # 512MB
   
   hoodie.file.group.reader.enabled: true
   
   hoodie.memory.compaction.fraction: 0.2
   hoodie.memory.compaction.max.size: 268435456
   
   hoodie.memory.dfs.buffer.max.size: 12582912
   
   hoodie.metadata.enable: true
   
   hoodie.client.heartbeat.interval_in_ms: 300000
   hoodie.client.heartbeat.tolerable.misses: 5
   
   hoodie.clean.policy: KEEP_LATEST_BY_HOURS
   hoodie.clean.async.enabled: true
   hoodie.clean.automatic: true
   hoodie.clean.failed.writes.policy: LAZY
   hoodie.clean.hours.retained: 6
   hoodie.embed.timeline.server: true
   hoodie.filesystem.view.type: MEMORY
   hoodie.metrics.on: true
   hoodie.metrics.reporter.type: CLOUDWATCH
   hoodie.metrics.cloudwatch.report.period.seconds: 300
   hoodie.metrics.cloudwatch.metric.prefix: hudi
   hoodie.metrics.cloudwatch.metric.namespace: HudiMetrics
   hoodie.metrics.cloudwatch.max.datums.per.request: 20
   
   
   fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
   fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
   fs.s3a.aws.credentials.provider: 
org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider
   fs.s3a.connection.maximum: 500
   fs.s3a.threads.max: 100
   fs.s3a.connection.establish.timeout: 120000
   fs.s3a.connection.timeout: 600000
   fs.s3a.socket.timeout: 600000
   fs.s3a.attempts.maximum: 50
   fs.s3a.retry.limit: 50
   fs.s3a.retry.interval: 500
   fs.s3a.fast.upload: true
   fs.s3a.fast.upload.buffer: disk
   fs.s3a.multipart.size: 268435456
   fs.s3a.experimental.input.fadvise: sequential
   read.cdc.from.changelog: true
   hoodie.write.record.merge.mode: EVENT_TIME_ORDERING
   archive.max_commits: 1000
   index.bootstrap.enabled: true
   hoodie.bloom.index.update.partition.path: false
   hoodie.bloom.index.filter.type: SIMPLE
   hoodie.index.bloom.num_entries: 40000000
   hoodie.index.bloom.fpp: 0.001
   hoodie.bloom.index.filter.dynamic.max.entries: 40000000
   compaction.max_memory: 1024
   state.backend: rocksdb
   hoodie.record.merge.mode: EVENT_TIME_ORDERING
   hoodie.table.ordering.fields: data_version
   index.type: GLOBAL_BLOOM
   hoodie.simple.index.update.partition.path: false
   hoodie.metadata.index.bloom.filter.enable: true
   hoodie.metadata.bloom.filter.dynamic.max.entries: 40000000
   hoodie.metadata.bloom.filter.enable: true
   hoodie.metadata.bloom.filter.fpp: 0.001
   hoodie.metadata.bloom.filter.num.entries: 40000000
   hoodie.metadata.bloom.filter.type: SIMPLE
   hoodie.metadata.index.bloom.filter.file.group.count: 4
   hoodie.bloom.index.use.metadata: true
   restart-strategy: fixed-delay
   restart-strategy.fixed-delay.attempts: 3
   restart-strategy.fixed-delay.delay: 10s
   hoodie.write.concurrency.mode: single_writer
   hoodie.write.lock.provider: 
org.apache.hudi.client.transaction.lock.InProcessLockProvider
   hoodie.write.lock.inprocess.lock.timeout.ms: 600000
   `
   2. Read Config
   `
           Map<String, String> hudiOptions = new HashMap<>();
           hudiOptions.put(FlinkOptions.PATH.key(), 
hudiTableConfig.getTablePath());
           hudiOptions.put(FlinkOptions.TABLE_TYPE.key(), "");
           hudiOptions.put(FlinkOptions.RECORD_KEY_FIELD.key(), "object_id");
           hudiOptions.put(FlinkOptions.ORDERING_FIELDS.key(), "data_version");
           hudiOptions.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
           hudiOptions.put(FlinkOptions.READ_CDC_FROM_CHANGELOG.key(), "false");
           hudiOptions.put(FlinkOptions.READ_COMMITS_LIMIT.key(), "1");
           hudiOptions.put(FlinkOptions.READ_SPLITS_LIMIT.key(), "1");
           hudiOptions.put(FlinkOptions.READ_TASKS.key(), "8");
           hudiOptions.put(FlinkOptions.READ_START_COMMIT.key(), 
hudiTableConfig.getReadStartCommit());
   
           hudiOptions.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(),
                   hudiTableConfig.getReadStreamingCheckInterval());
           hudiOptions.put(FlinkOptions.CDC_ENABLED.key(), "true");
           hudiOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "false");
           hudiOptions.put(FlinkOptions.METADATA_ENABLED.key(), "false");
           hudiOptions.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
           hudiOptions.put("hoodie.datasource.query.incremental.format", "cdc");
           hudiOptions.put(FlinkOptions.MERGE_TYPE.key(), "fast_merge");
           hudiOptions.put(FlinkOptions.KEYGEN_TYPE.key(), "COMPLEX_AVRO");
           hudiOptions.put("fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
           hudiOptions.put("fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
           hudiOptions.put("fs.s3.aws.credentials.provider",
               "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
           hudiOptions.put("fs.s3a.aws.credentials.provider",
               "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
           hudiOptions.put("hoodie.common.diskmap.compression.enabled", 
"false");
   `
   
   ### Expected behavior
   
   when inserting two identical data records repeatedly, both the first and 
second records would show as "insert". We expected that the first rowkind would 
be "insert" and the second one would be "update".
   
   ### Environment Description
   
   * Hudi version: 1.1.1
   * Spark version:
   * Flink version:1.20.0
   * Hive version:
   * Hadoop version:3.3.6
   * Storage (HDFS/S3/GCS..):S3
   * Running on Docker? (yes/no):no
   
   
   ### Additional context
   
   _No response_
   
   ### Stacktrace
   
   ```shell
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to