[
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan resolved HUDI-2947.
---------------------------------------
> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config
> from CLI in continuous mode
> ------------------------------------------------------------------------------------------------------
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Ethan Guo
> Assignee: sivabalan narayanan
> Priority: Critical
> Labels: pull-request-available, sev:critical
> Fix For: 0.10.1
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong
> checkpoint later on. The wrong checkpoint (for 20211206203551080 commit)
> happens after the replacecommit and clean, which is reset to "0", instead of
> "5" after 20211206202728233.commit. More details below.
>
> The bug is due to the check here:
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null &&
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
> ||
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null"
> and
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))"
> are both true as "--checkpoint 0" is configured and last commit is
> replacecommit without checkpoint keys. This leads to the resume checkpoint
> string being reset to the configured checkpoint, skipping the timeline
> walk-back logic below, which is wrong.
>
> Timeline:
>
> {code:java}
> 189069 Dec 6 12:19 20211206201238649.commit
> 0 Dec 6 12:12 20211206201238649.commit.requested
> 0 Dec 6 12:12 20211206201238649.inflight
> 189069 Dec 6 12:27 20211206201959151.commit
> 0 Dec 6 12:20 20211206201959151.commit.requested
> 0 Dec 6 12:20 20211206201959151.inflight
> 189069 Dec 6 12:34 20211206202728233.commit
> 0 Dec 6 12:27 20211206202728233.commit.requested
> 0 Dec 6 12:27 20211206202728233.inflight
> 36662 Dec 6 12:35 20211206203449899.replacecommit
> 0 Dec 6 12:35 20211206203449899.replacecommit.inflight
> 34656 Dec 6 12:35 20211206203449899.replacecommit.requested
> 28013 Dec 6 12:35 20211206203503574.clean
> 19024 Dec 6 12:35 20211206203503574.clean.inflight
> 19024 Dec 6 12:35 20211206203503574.clean.requested
> 189069 Dec 6 12:43 20211206203551080.commit
> 0 Dec 6 12:35 20211206203551080.commit.requested
> 0 Dec 6 12:35 20211206203551080.inflight
> 189069 Dec 6 12:50 20211206204311612.commit
> 0 Dec 6 12:43 20211206204311612.commit.requested
> 0 Dec 6 12:43 20211206204311612.inflight
> 0 Dec 6 12:50 20211206205044595.commit.requested
> 0 Dec 6 12:50 20211206205044595.inflight
> 128 Dec 6 12:56 archived
> 483 Dec 6 11:52 hoodie.properties
> {code}
>
> Checkpoints in commits:
>
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit: "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit: "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit: "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit: "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit: "deltastreamer.checkpoint.key" : "2" {code}
>
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean
> enabled (some configs are masked).
>
> {code:java}
> spark-submit \
> --master yarn \
> --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores
> 4 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> \
> --conf spark.speculation=true \
> --conf spark.speculation.multiplier=1.0 \
> --conf spark.speculation.quantile=0.5 \
> --packages org.apache.spark:spark-avro_2.12:3.2.0 \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
> --props file:/home/hadoop/ethan/test.properties \
> --source-class ... \
> --source-ordering-field ts \
> --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
> --target-table test_table \
> --table-type COPY_ON_WRITE \
> --op BULK_INSERT \
> --checkpoint 0 \
> --continuous {code}
> test.properties:
>
>
> {code:java}
> hoodie.cleaner.commits.retained=4
> hoodie.keep.min.commits=5
> hoodie.keep.max.commits=7
> hoodie.clean.async=true
> hoodie.clustering.inline=true
> hoodie.clustering.async.max.commits=3
> hoodie.compact.inline.max.delta.commits=3
> hoodie.insert.shuffle.parallelism=10
> hoodie.upsert.shuffle.parallelism=10
> hoodie.bulk_insert.shuffle.parallelism=10
> hoodie.delete.shuffle.parallelism=10
> hoodie.bulkinsert.shuffle.parallelism=10
> hoodie.datasource.write.recordkey.field=key
> hoodie.datasource.write.partitionpath.field=partition
> # turn off any small file handling, for ease of testing
> hoodie.parquet.small.file.limit=1
> benchmark.input.source.path=...{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)