hudi-agent commented on code in PR #18889:
URL: https://github.com/apache/hudi/pull/18889#discussion_r3336087177
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -472,8 +472,15 @@ private static void
assertUseV2Checkpoint(HoodieTableMetaClient metaClient) {
metaClient, metaClient.getActiveTimeline().lastInstant().get());
assertFalse(metadata.isEmpty());
Map<String, String> extraMetadata = metadata.get().getExtraMetadata();
- assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));
- assertFalse(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ // V1 vs V2 is driven by CheckpointUtils.shouldTargetCheckpointV2 — for v6
tables the contract
+ // is V1, so the assertion is conditional on the on-disk table version.
+ if (metaClient.getTableConfig().getTableVersion().versionCode() >=
HoodieTableVersion.EIGHT.versionCode()) {
+ assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));
Review Comment:
🤖 nit: the method is still called `assertUseV2Checkpoint` but the body now
conditionally checks either V1 or V2 depending on table version — could you
rename it to something like `assertCheckpointMatchesTableVersion` so future
callers aren't surprised?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -947,10 +950,21 @@ Map<String, String> extractCheckpointMetadata(InputBatch
inputBatch, TypedProper
return Collections.emptyMap();
}
- // If we have a next checkpoint batch, use its metadata
+ // If we have a next checkpoint batch, use its metadata. Some Source
implementations (e.g.
+ // DFSPathSelector-backed *DFSSource) unconditionally return a
StreamerCheckpointV2 regardless
+ // of the target table version. The persisted checkpoint key must match
the table version
+ // contract enforced by CheckpointUtils.shouldTargetCheckpointV2 (V2 keys
are only valid for
+ // writeTableVersion >= 8 and outside the not-supported set). Normalize
here so that a v6
+ // write does not get a V2 key stamped into its commit metadata.
if (inputBatch.getCheckpointForNextBatch() != null) {
- return inputBatch.getCheckpointForNextBatch()
- .getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint);
+ Checkpoint sourceCheckpoint = inputBatch.getCheckpointForNextBatch();
Review Comment:
🤖 nit: since you're already introducing a local `sourceCheckpoint`, could
you assign it before the `if` and null-check the variable instead? That avoids
calling `getCheckpointForNextBatch()` twice and reads more cleanly as
`Checkpoint sourceCheckpoint = inputBatch.getCheckpointForNextBatch(); if
(sourceCheckpoint != null) { … }`.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]