Davis-Zhang-Onehouse commented on code in PR #12718:
URL: https://github.com/apache/hudi/pull/12718#discussion_r1932737435
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
import java.io.IOException;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
public class StreamerCheckpointUtils {
private static final Logger LOG =
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
- public static Option<Checkpoint>
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-
HoodieStreamer.Config streamerConfig,
- TypedProperties
props) throws IOException {
+ /**
+ * The first phase of checkpoint resolution - read the checkpoint configs
from 2 sources and resolve
+ * conflicts:
+ * <ul>
+ * <li>commit metadata from the last completed instant, which can contain
what is the last checkpoint
+ * from the previous streamer ingestion.</li>
+ * <li>user checkpoint overrides specified in the writer config {@code
streamerConfig}. Users might want to
+ * forcefully set the checkpoint to an arbitrary position or start
from the very beginning.</li>
+ * </ul>
+ * The 2 sources can have conflicts, and we need to decide which config
should prevail.
+ * <p>
+ * For the second phase of checkpoint resolution please refer
+ * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and
child class overrides of this
+ * method.
+ */
+ public static Option<Checkpoint>
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+
HoodieStreamer.Config streamerConfig,
+
TypedProperties props,
+
HoodieTableMetaClient metaClient) throws IOException {
Option<Checkpoint> checkpoint = Option.empty();
+ assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+ // If we have both streamer config and commits specifying what checkpoint
to use, go with the
+ // checkpoint resolution logic to resolve conflicting configurations.
if (commitsTimelineOpt.isPresent()) {
- checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(),
streamerConfig, props);
+ checkpoint =
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(),
streamerConfig, props);
}
+ // If there is only streamer config, extract the checkpoint directly.
+ checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props,
checkpoint);
+ return checkpoint;
+ }
+ @VisibleForTesting
+ static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+ if (!StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+ || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint)) {
+ HoodieTableVersion writeTableVersion =
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props,
HoodieWriteConfig.WRITE_TABLE_VERSION));
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+ if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config,
writeTableVersion)) {
+ throw new HoodieUpgradeDowngradeException(
+ String.format("When upgrade/downgrade is happening, please avoid
setting --checkpoint option and --ignore-checkpoint for your delta streamers."
+ + " Detected invalid streamer configuration:\n%s",
streamerConfig));
+ }
+ }
+ }
+
+ private static Option<Checkpoint> useCkpFromOverrideConfigIfAny(
Review Comment:
I will leave it to future follow up
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -126,13 +167,26 @@ static Option<Checkpoint>
getCheckpointToResumeString(HoodieTimeline commitsTime
}
} else if (streamerConfig.checkpoint != null) {
//
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will
never return a commit metadata w/o any checkpoint key set.
- resumeCheckpoint =
Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
streamerConfig.sourceClassName)
- ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new
StreamerCheckpointV1(streamerConfig.checkpoint));
+ resumeCheckpoint =
Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName,
writeTableVersion, streamerConfig.checkpoint));
}
}
return resumeCheckpoint;
}
+ private static boolean shouldUseCkpFromPrevCommit(Checkpoint
checkpointFromCommit) {
+ return !StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey());
+ }
+
+ private static boolean
ckpOverrideCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config
streamerConfig, Checkpoint checkpointFromCommit) {
Review Comment:
I will leave it to future follow up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]