[
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Guo updated HUDI-2947:
----------------------------
Description:
*Problem:*
When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 0`,
in the continuous mode, the deltastreamer job may pick up the wrong checkpoint
later on. The wrong checkpoint (for 20211206203551080 commit) happens after
the replacecommit and clean, which is reset to "0", instead of "5" after
20211206202728233.commit. More details below.
The bug is due to the check here:
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
{code:java}
if (cfg.checkpoint != null &&
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
||
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} {code}
In this case of resuming after a clustering commit, "cfg.checkpoint != null"
and
"StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))"
are both true as "--checkpoint 0" is configured and last commit is
replacecommit without checkpoint keys. This leads to the resume checkpoint
string being reset to the configured checkpoint, skipping the timeline
walk-back logic below, which is wrong.
Timeline:
{code:java}
189069 Dec 6 12:19 20211206201238649.commit
0 Dec 6 12:12 20211206201238649.commit.requested
0 Dec 6 12:12 20211206201238649.inflight
189069 Dec 6 12:27 20211206201959151.commit
0 Dec 6 12:20 20211206201959151.commit.requested
0 Dec 6 12:20 20211206201959151.inflight
189069 Dec 6 12:34 20211206202728233.commit
0 Dec 6 12:27 20211206202728233.commit.requested
0 Dec 6 12:27 20211206202728233.inflight
36662 Dec 6 12:35 20211206203449899.replacecommit
0 Dec 6 12:35 20211206203449899.replacecommit.inflight
34656 Dec 6 12:35 20211206203449899.replacecommit.requested
28013 Dec 6 12:35 20211206203503574.clean
19024 Dec 6 12:35 20211206203503574.clean.inflight
19024 Dec 6 12:35 20211206203503574.clean.requested
189069 Dec 6 12:43 20211206203551080.commit
0 Dec 6 12:35 20211206203551080.commit.requested
0 Dec 6 12:35 20211206203551080.inflight
189069 Dec 6 12:50 20211206204311612.commit
0 Dec 6 12:43 20211206204311612.commit.requested
0 Dec 6 12:43 20211206204311612.inflight
0 Dec 6 12:50 20211206205044595.commit.requested
0 Dec 6 12:50 20211206205044595.inflight
128 Dec 6 12:56 archived
483 Dec 6 11:52 hoodie.properties
{code}
Checkpoints in commits:
{code:java}
grep "deltastreamer.checkpoint.key" *
20211206201238649.commit: "deltastreamer.checkpoint.key" : "2"
20211206201959151.commit: "deltastreamer.checkpoint.key" : "3"
20211206202728233.commit: "deltastreamer.checkpoint.key" : "4"
20211206203551080.commit: "deltastreamer.checkpoint.key" : "1"
20211206204311612.commit: "deltastreamer.checkpoint.key" : "2" {code}
*Steps to reproduce:*
Run HoodieDeltaStreamer in the continuous mode, by providing both `--checkpoint
0` and `--continuous`, with inline clustering and sync clean enabled (some
configs are masked).
{code:java}
spark-submit \
--master yarn \
--driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 4 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
--conf spark.speculation=true \
--conf spark.speculation.multiplier=1.0 \
--conf spark.speculation.quantile=0.5 \
--packages org.apache.spark:spark-avro_2.12:3.2.0 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
--props file:/home/hadoop/ethan/test.properties \
--source-class ... \
--source-ordering-field ts \
--target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
--target-table test_table \
--table-type COPY_ON_WRITE \
--op BULK_INSERT \
--checkpoint 0 \
--continuous {code}
test.properties:
{code:java}
hoodie.cleaner.commits.retained=4
hoodie.keep.min.commits=5
hoodie.keep.max.commits=7
hoodie.clean.async=true
hoodie.clustering.inline=true
hoodie.clustering.async.max.commits=3
hoodie.compact.inline.max.delta.commits=3
hoodie.insert.shuffle.parallelism=10
hoodie.upsert.shuffle.parallelism=10
hoodie.bulk_insert.shuffle.parallelism=10
hoodie.delete.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=partition
# turn off any small file handling, for ease of testing
hoodie.parquet.small.file.limit=1
benchmark.input.source.path=...{code}
> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config
> from CLI in continuous mode
> ------------------------------------------------------------------------------------------------------
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Ethan Guo
> Priority: Major
> Fix For: 0.11.0
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong
> checkpoint later on. The wrong checkpoint (for 20211206203551080 commit)
> happens after the replacecommit and clean, which is reset to "0", instead of
> "5" after 20211206202728233.commit. More details below.
>
> The bug is due to the check here:
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null &&
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
> ||
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null"
> and
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))"
> are both true as "--checkpoint 0" is configured and last commit is
> replacecommit without checkpoint keys. This leads to the resume checkpoint
> string being reset to the configured checkpoint, skipping the timeline
> walk-back logic below, which is wrong.
>
> Timeline:
>
> {code:java}
> 189069 Dec 6 12:19 20211206201238649.commit
> 0 Dec 6 12:12 20211206201238649.commit.requested
> 0 Dec 6 12:12 20211206201238649.inflight
> 189069 Dec 6 12:27 20211206201959151.commit
> 0 Dec 6 12:20 20211206201959151.commit.requested
> 0 Dec 6 12:20 20211206201959151.inflight
> 189069 Dec 6 12:34 20211206202728233.commit
> 0 Dec 6 12:27 20211206202728233.commit.requested
> 0 Dec 6 12:27 20211206202728233.inflight
> 36662 Dec 6 12:35 20211206203449899.replacecommit
> 0 Dec 6 12:35 20211206203449899.replacecommit.inflight
> 34656 Dec 6 12:35 20211206203449899.replacecommit.requested
> 28013 Dec 6 12:35 20211206203503574.clean
> 19024 Dec 6 12:35 20211206203503574.clean.inflight
> 19024 Dec 6 12:35 20211206203503574.clean.requested
> 189069 Dec 6 12:43 20211206203551080.commit
> 0 Dec 6 12:35 20211206203551080.commit.requested
> 0 Dec 6 12:35 20211206203551080.inflight
> 189069 Dec 6 12:50 20211206204311612.commit
> 0 Dec 6 12:43 20211206204311612.commit.requested
> 0 Dec 6 12:43 20211206204311612.inflight
> 0 Dec 6 12:50 20211206205044595.commit.requested
> 0 Dec 6 12:50 20211206205044595.inflight
> 128 Dec 6 12:56 archived
> 483 Dec 6 11:52 hoodie.properties
> {code}
>
> Checkpoints in commits:
>
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit: "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit: "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit: "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit: "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit: "deltastreamer.checkpoint.key" : "2" {code}
>
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both
> `--checkpoint 0` and `--continuous`, with inline clustering and sync clean
> enabled (some configs are masked).
>
> {code:java}
> spark-submit \
> --master yarn \
> --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores
> 4 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> \
> --conf spark.speculation=true \
> --conf spark.speculation.multiplier=1.0 \
> --conf spark.speculation.quantile=0.5 \
> --packages org.apache.spark:spark-avro_2.12:3.2.0 \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
> --props file:/home/hadoop/ethan/test.properties \
> --source-class ... \
> --source-ordering-field ts \
> --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
> --target-table test_table \
> --table-type COPY_ON_WRITE \
> --op BULK_INSERT \
> --checkpoint 0 \
> --continuous {code}
> test.properties:
>
>
> {code:java}
> hoodie.cleaner.commits.retained=4
> hoodie.keep.min.commits=5
> hoodie.keep.max.commits=7
> hoodie.clean.async=true
> hoodie.clustering.inline=true
> hoodie.clustering.async.max.commits=3
> hoodie.compact.inline.max.delta.commits=3
> hoodie.insert.shuffle.parallelism=10
> hoodie.upsert.shuffle.parallelism=10
> hoodie.bulk_insert.shuffle.parallelism=10
> hoodie.delete.shuffle.parallelism=10
> hoodie.bulkinsert.shuffle.parallelism=10
> hoodie.datasource.write.recordkey.field=key
> hoodie.datasource.write.partitionpath.field=partition
> # turn off any small file handling, for ease of testing
> hoodie.parquet.small.file.limit=1
> benchmark.input.source.path=...{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)