nsivabalan commented on code in PR #12718:
URL: https://github.com/apache/hudi/pull/12718#discussion_r1932681358
##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -61,6 +61,20 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata
commitMetadata) {
throw new HoodieException("Checkpoint is not found in the commit metadata:
" + commitMetadata.getExtraMetadata());
}
+ public static Checkpoint buildCheckpointFromGeneralSource(
+ String sourceClassName, int writeTableVersion, String
checkpointToResume) {
+ return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
sourceClassName)
+ ? new StreamerCheckpointV2(checkpointToResume) : new
StreamerCheckpointV1(checkpointToResume);
+ }
+
+ // Whenever we create checkpoint from streamer config checkpoint override,
we should use this function
+ // to build checkpoints.
+ public static Checkpoint buildCheckpointFromConfigOverride(
Review Comment:
same here
##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+/**
+ * Utility class providing methods to check if a string starts with specific
resume-related prefixes.
+ */
+public class HoodieIncrSourceCheckpointValUtils {
+ public static final String RESET_CHECKPOINT_V2_SEPARATOR = ":";
+ public static final String REQUEST_TIME_PREFIX =
"resumeFromInstantRequestTime";
+ public static final String COMPLETION_TIME_PREFIX =
"resumeFromInstantCompletionTime";
+
+ /**
+ * For hoodie incremental source ingestion, if the target table is version 8
or higher, the checkpoint
+ * key set by streamer config can be in either of the following format:
+ * - resumeFromInstantRequestTime:[checkpoint value based on request time]
+ * - resumeFromInstantCompletionTime:[checkpoint value based on completion
time]
+ *
+ * StreamerCheckpointV2FromCfgCkp class itself captured the fact that this
is version 8 and higher, plus
+ * the checkpoint source is from streamer config override.
+ *
+ * When the checkpoint is consumed by individual data sources, we need to
convert them to either vanilla
+ * checkpoint v1 (request time based) or checkpoint v2 (completion time
based).
+ */
+ public static Checkpoint
resolveToV1V2Checkpoint(StreamerCheckpointFromCfgCkp checkpoint) {
Review Comment:
minor. resolveToActualCheckpointVersion
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java:
##########
@@ -128,6 +139,17 @@ protected Option<Checkpoint>
translateCheckpoint(Option<Checkpoint> lastCheckpoi
throw new UnsupportedOperationException("Unsupported checkpoint type: " +
lastCheckpoint.get());
}
+ public void assertCheckpointVersion(Checkpoint checkpoint) {
+ if (checkpoint != null) {
+ if (shouldTargetCheckpointV2(writeTableVersion, getClass().getName()) &&
!(checkpoint instanceof StreamerCheckpointV2)) {
+ throw new IllegalStateException("Data source target checkpoint v2
(completion time based) should always return checkpoint v2. Setup " + props);
Review Comment:
props might contain 100+ entries.
can you print the checkpoint instead.
we could also print the checkpoint to resume from
##########
hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestHoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieIncrSourceCheckpointValUtils {
+
+ @Test
+ public void testResolveToV1V2CheckpointWithRequestTime() {
+ StreamerCheckpointFromCfgCkp mockCheckpoint =
mock(StreamerCheckpointFromCfgCkp.class);
+
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantRequestTime:20240301");
+
+ Checkpoint result =
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint);
+
+ assertTrue(result instanceof StreamerCheckpointV1);
+ assertEquals("20240301", result.getCheckpointKey());
+ }
+
+ @Test
+ public void testResolveToV1V2CheckpointWithCompletionTime() {
+ StreamerCheckpointFromCfgCkp mockCheckpoint =
mock(StreamerCheckpointFromCfgCkp.class);
+
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantCompletionTime:20240302");
Review Comment:
can you add constants for "20240302" and re-use it.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -61,6 +61,20 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata
commitMetadata) {
throw new HoodieException("Checkpoint is not found in the commit metadata:
" + commitMetadata.getExtraMetadata());
}
+ public static Checkpoint buildCheckpointFromGeneralSource(
Review Comment:
lets be cautious about access specifiers.
can you add VisibleForTesting annotation if need be.
but avoid making public out of the box
##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointFromCfgCkp.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+/**
+ * A special checkpoint v2 class that indicates its checkpoint key comes from
streamer config checkpoint
+ * overrides.
+ *
+ * For hoodie incremental source, based on the content of the checkpoint
override value, it can indicate
+ * either request time based checkpoint or completion time based. So the class
serves as an indicator to
+ * data sources of interest that it needs to be further parsed and resolved to
either checkpoint v1 or v2.
+ *
+ * For all the other data sources, it behaves exactly the same as checkpoint
v2.
+ *
+ * To keep the checkpoint class design ignorant of which data source it
serves, the class only indicates where
+ * the checkpoint key comes from.
+ * */
+public class StreamerCheckpointFromCfgCkp extends StreamerCheckpointV2 {
Review Comment:
minor.
We could name this UnresolvedStreamerCheckpointBasedOnCfg
##########
hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java:
##########
@@ -207,7 +212,49 @@ public void testConvertCheckpointWithUseTransitionTime() {
"8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource,
false",
"8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource,
false"
})
- public void testTargetCheckpointV2(int version, String sourceClassName,
boolean expected) {
- assertEquals(expected, CheckpointUtils.shouldTargetCheckpointV2(version,
sourceClassName));
+ public void testTargetCheckpointV2(int version, String sourceClassName,
boolean isV2Checkpoint) {
+ assertEquals(isV2Checkpoint,
CheckpointUtils.buildCheckpointFromGeneralSource(sourceClassName, version,
"ignored") instanceof StreamerCheckpointV2);
+ }
+
+ @Test
+ public void testBuildCheckpointFromGeneralSource() {
+ // Test V2 checkpoint creation (newer table version + general source)
+ Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromGeneralSource(
+ GENERAL_SOURCE,
+ HoodieTableVersion.EIGHT.versionCode(),
+ CHECKPOINT_TO_RESUME
+ );
+ assertInstanceOf(StreamerCheckpointV2.class, checkpoint1);
+ assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());
+
+ // Test V1 checkpoint creation (older table version)
+ Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromGeneralSource(
+ GENERAL_SOURCE,
+ HoodieTableVersion.SEVEN.versionCode(),
+ CHECKPOINT_TO_RESUME
+ );
+ assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
+ assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
+ }
+
+ @Test
+ public void testBuildCheckpointFromConfigOverride() {
+ // Test checkpoint from config creation (newer table version + general
source)
+ Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromConfigOverride(
Review Comment:
where do we have coverage for HoodieIncrSource
##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+/**
+ * Utility class providing methods to check if a string starts with specific
resume-related prefixes.
+ */
+public class HoodieIncrSourceCheckpointValUtils {
+ public static final String RESET_CHECKPOINT_V2_SEPARATOR = ":";
+ public static final String REQUEST_TIME_PREFIX =
"resumeFromInstantRequestTime";
+ public static final String COMPLETION_TIME_PREFIX =
"resumeFromInstantCompletionTime";
+
+ /**
+ * For hoodie incremental source ingestion, if the target table is version 8
or higher, the checkpoint
+ * key set by streamer config can be in either of the following format:
+ * - resumeFromInstantRequestTime:[checkpoint value based on request time]
+ * - resumeFromInstantCompletionTime:[checkpoint value based on completion
time]
+ *
+ * StreamerCheckpointV2FromCfgCkp class itself captured the fact that this
is version 8 and higher, plus
+ * the checkpoint source is from streamer config override.
+ *
+ * When the checkpoint is consumed by individual data sources, we need to
convert them to either vanilla
+ * checkpoint v1 (request time based) or checkpoint v2 (completion time
based).
+ */
+ public static Checkpoint
resolveToV1V2Checkpoint(StreamerCheckpointFromCfgCkp checkpoint) {
+ String[] parts = extractKeyValues(checkpoint);
+ switch (parts[0]) {
+ case REQUEST_TIME_PREFIX: {
+ return new StreamerCheckpointV1(checkpoint).setCheckpointKey(parts[1]);
+ }
+ case COMPLETION_TIME_PREFIX: {
+ return new StreamerCheckpointV2(checkpoint).setCheckpointKey(parts[1]);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown event ordering mode " +
parts[0]);
+ }
+ }
+
+ private static String [] extractKeyValues(StreamerCheckpointFromCfgCkp
checkpoint) {
+ String checkpointKey = checkpoint.getCheckpointKey();
+ String[] parts = checkpointKey.split(RESET_CHECKPOINT_V2_SEPARATOR);
+ if (parts.length != 2
+ || (
+ !parts[0].trim().equals(REQUEST_TIME_PREFIX)
+ && !parts[0].trim().equals(COMPLETION_TIME_PREFIX)
+ )) {
+ throw new IllegalArgumentException(
+ "Illegal checkpoint key override `" + checkpointKey + "`. Valid
format is either `resumeFromInstantRequestTime:<checkpoint value>` or "
+ + "`resumeFromInstantCompletionTime:<checkpoint value>`.");
+ }
+ parts[0] = parts[0].trim();
+ parts[1] = parts[1].trim();
+ return parts;
+ }
+
+ /**
+ * Immutable class to hold the parts of a resume string.
+ */
+ public static class CheckpointWithMode {
Review Comment:
looks like never used. can we remove this
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
import java.io.IOException;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
public class StreamerCheckpointUtils {
private static final Logger LOG =
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
- public static Option<Checkpoint>
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-
HoodieStreamer.Config streamerConfig,
- TypedProperties
props) throws IOException {
+ /**
+ * The first phase of checkpoint resolution - read the checkpoint configs
from 2 sources and resolve
+ * conflicts:
+ * <ul>
+ * <li>commit metadata from the last completed instant, which can contain
what is the last checkpoint
+ * from the previous streamer ingestion.</li>
+ * <li>user checkpoint overrides specified in the writer config {@code
streamerConfig}. Users might want to
+ * forcefully set the checkpoint to an arbitrary position or start
from the very beginning.</li>
+ * </ul>
+ * The 2 sources can have conflicts, and we need to decide which config
should prevail.
+ * <p>
+ * For the second phase of checkpoint resolution please refer
+ * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and
child class overrides of this
+ * method.
+ */
+ public static Option<Checkpoint>
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+
HoodieStreamer.Config streamerConfig,
+
TypedProperties props,
+
HoodieTableMetaClient metaClient) throws IOException {
Option<Checkpoint> checkpoint = Option.empty();
+ assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+ // If we have both streamer config and commits specifying what checkpoint
to use, go with the
+ // checkpoint resolution logic to resolve conflicting configurations.
if (commitsTimelineOpt.isPresent()) {
- checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(),
streamerConfig, props);
+ checkpoint =
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(),
streamerConfig, props);
}
+ // If there is only streamer config, extract the checkpoint directly.
+ checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props,
checkpoint);
+ return checkpoint;
+ }
+ @VisibleForTesting
+ static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+ if (!StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+ || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint)) {
+ HoodieTableVersion writeTableVersion =
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props,
HoodieWriteConfig.WRITE_TABLE_VERSION));
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+ if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config,
writeTableVersion)) {
Review Comment:
can we confine this to only HoodieIncr sources?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
import java.io.IOException;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
public class StreamerCheckpointUtils {
private static final Logger LOG =
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
- public static Option<Checkpoint>
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-
HoodieStreamer.Config streamerConfig,
- TypedProperties
props) throws IOException {
+ /**
+ * The first phase of checkpoint resolution - read the checkpoint configs
from 2 sources and resolve
+ * conflicts:
+ * <ul>
+ * <li>commit metadata from the last completed instant, which can contain
what is the last checkpoint
+ * from the previous streamer ingestion.</li>
+ * <li>user checkpoint overrides specified in the writer config {@code
streamerConfig}. Users might want to
+ * forcefully set the checkpoint to an arbitrary position or start
from the very beginning.</li>
+ * </ul>
+ * The 2 sources can have conflicts, and we need to decide which config
should prevail.
+ * <p>
+ * For the second phase of checkpoint resolution please refer
+ * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and
child class overrides of this
+ * method.
+ */
+ public static Option<Checkpoint>
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
Review Comment:
minor.
rename to `resolveCheckpointToResumeFrom`
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
import java.io.IOException;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
public class StreamerCheckpointUtils {
private static final Logger LOG =
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
- public static Option<Checkpoint>
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-
HoodieStreamer.Config streamerConfig,
- TypedProperties
props) throws IOException {
+ /**
+ * The first phase of checkpoint resolution - read the checkpoint configs
from 2 sources and resolve
+ * conflicts:
+ * <ul>
+ * <li>commit metadata from the last completed instant, which can contain
what is the last checkpoint
+ * from the previous streamer ingestion.</li>
+ * <li>user checkpoint overrides specified in the writer config {@code
streamerConfig}. Users might want to
+ * forcefully set the checkpoint to an arbitrary position or start
from the very beginning.</li>
+ * </ul>
+ * The 2 sources can have conflicts, and we need to decide which config
should prevail.
+ * <p>
+ * For the second phase of checkpoint resolution please refer
+ * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and
child class overrides of this
+ * method.
+ */
+ public static Option<Checkpoint>
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+
HoodieStreamer.Config streamerConfig,
+
TypedProperties props,
+
HoodieTableMetaClient metaClient) throws IOException {
Option<Checkpoint> checkpoint = Option.empty();
+ assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+ // If we have both streamer config and commits specifying what checkpoint
to use, go with the
+ // checkpoint resolution logic to resolve conflicting configurations.
if (commitsTimelineOpt.isPresent()) {
- checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(),
streamerConfig, props);
+ checkpoint =
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(),
streamerConfig, props);
}
+ // If there is only streamer config, extract the checkpoint directly.
+ checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props,
checkpoint);
+ return checkpoint;
+ }
+ @VisibleForTesting
+ static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+ if (!StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+ || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint)) {
+ HoodieTableVersion writeTableVersion =
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props,
HoodieWriteConfig.WRITE_TABLE_VERSION));
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+ if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config,
writeTableVersion)) {
+ throw new HoodieUpgradeDowngradeException(
+ String.format("When upgrade/downgrade is happening, please avoid
setting --checkpoint option and --ignore-checkpoint for your delta streamers."
+ + " Detected invalid streamer configuration:\n%s",
streamerConfig));
+ }
+ }
+ }
+
+ private static Option<Checkpoint> useCkpFromOverrideConfigIfAny(
Review Comment:
to abbreviate "checkpoint", we can use "ckpt" instead of "ckp"
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -126,13 +167,26 @@ static Option<Checkpoint>
getCheckpointToResumeString(HoodieTimeline commitsTime
}
} else if (streamerConfig.checkpoint != null) {
//
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will
never return a commit metadata w/o any checkpoint key set.
- resumeCheckpoint =
Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
streamerConfig.sourceClassName)
- ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new
StreamerCheckpointV1(streamerConfig.checkpoint));
+ resumeCheckpoint =
Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName,
writeTableVersion, streamerConfig.checkpoint));
}
}
return resumeCheckpoint;
}
+ private static boolean shouldUseCkpFromPrevCommit(Checkpoint
checkpointFromCommit) {
+ return !StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey());
+ }
+
+ private static boolean
ckpOverrideCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config
streamerConfig, Checkpoint checkpointFromCommit) {
Review Comment:
same comment as above. "ckpt" instead of "ckp"
##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+/**
+ * Utility class providing methods to check if a string starts with specific
resume-related prefixes.
+ */
+public class HoodieIncrSourceCheckpointValUtils {
+ public static final String RESET_CHECKPOINT_V2_SEPARATOR = ":";
+ public static final String REQUEST_TIME_PREFIX =
"resumeFromInstantRequestTime";
+ public static final String COMPLETION_TIME_PREFIX =
"resumeFromInstantCompletionTime";
+
+ /**
+ * For hoodie incremental source ingestion, if the target table is version 8
or higher, the checkpoint
+ * key set by streamer config can be in either of the following format:
+ * - resumeFromInstantRequestTime:[checkpoint value based on request time]
+ * - resumeFromInstantCompletionTime:[checkpoint value based on completion
time]
+ *
+ * StreamerCheckpointV2FromCfgCkp class itself captured the fact that this
is version 8 and higher, plus
+ * the checkpoint source is from streamer config override.
+ *
+ * When the checkpoint is consumed by individual data sources, we need to
convert them to either vanilla
+ * checkpoint v1 (request time based) or checkpoint v2 (completion time
based).
+ */
+ public static Checkpoint
resolveToV1V2Checkpoint(StreamerCheckpointFromCfgCkp checkpoint) {
Review Comment:
Do we have UTs for this
##########
hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestHoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieIncrSourceCheckpointValUtils {
+
+ @Test
+ public void testResolveToV1V2CheckpointWithRequestTime() {
+ StreamerCheckpointFromCfgCkp mockCheckpoint =
mock(StreamerCheckpointFromCfgCkp.class);
+
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantRequestTime:20240301");
+
+ Checkpoint result =
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint);
+
+ assertTrue(result instanceof StreamerCheckpointV1);
+ assertEquals("20240301", result.getCheckpointKey());
+ }
+
+ @Test
+ public void testResolveToV1V2CheckpointWithCompletionTime() {
+ StreamerCheckpointFromCfgCkp mockCheckpoint =
mock(StreamerCheckpointFromCfgCkp.class);
+
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantCompletionTime:20240302");
+
+ Checkpoint result =
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint);
+
+ assertTrue(result instanceof StreamerCheckpointV2);
+ assertEquals("20240302", result.getCheckpointKey());
+ }
+
+ @Test
+ public void testResolveToV1V2CheckpointWithInvalidPrefix() {
+ StreamerCheckpointFromCfgCkp mockCheckpoint =
mock(StreamerCheckpointFromCfgCkp.class);
+
when(mockCheckpoint.getCheckpointKey()).thenReturn("invalidPrefix:20240303");
+
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () ->
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint)
+ );
+ assertTrue(exception.getMessage().contains("Illegal checkpoint key
override"));
+ }
+
+ @Test
+ public void testResolveToV1V2CheckpointWithMalformedInput() {
+ StreamerCheckpointFromCfgCkp mockCheckpoint =
mock(StreamerCheckpointFromCfgCkp.class);
+ when(mockCheckpoint.getCheckpointKey()).thenReturn("malformedInput");
+
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () ->
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint)
+ );
+ assertTrue(exception.getMessage().contains("Illegal checkpoint key
override"));
+ }
+
+ @Test
+ public void testCheckpointWithModeToString() {
+ HoodieIncrSourceCheckpointValUtils.CheckpointWithMode checkpointWithMode =
Review Comment:
probably might need to remove this test as well
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
import java.io.IOException;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
public class StreamerCheckpointUtils {
private static final Logger LOG =
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
- public static Option<Checkpoint>
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-
HoodieStreamer.Config streamerConfig,
- TypedProperties
props) throws IOException {
+ /**
+ * The first phase of checkpoint resolution - read the checkpoint configs
from 2 sources and resolve
+ * conflicts:
+ * <ul>
+ * <li>commit metadata from the last completed instant, which can contain
what is the last checkpoint
+ * from the previous streamer ingestion.</li>
+ * <li>user checkpoint overrides specified in the writer config {@code
streamerConfig}. Users might want to
+ * forcefully set the checkpoint to an arbitrary position or start
from the very beginning.</li>
+ * </ul>
+ * The 2 sources can have conflicts, and we need to decide which config
should prevail.
+ * <p>
+ * For the second phase of checkpoint resolution please refer
+ * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and
child class overrides of this
+ * method.
+ */
+ public static Option<Checkpoint>
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+
HoodieStreamer.Config streamerConfig,
+
TypedProperties props,
+
HoodieTableMetaClient metaClient) throws IOException {
Option<Checkpoint> checkpoint = Option.empty();
+ assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+ // If we have both streamer config and commits specifying what checkpoint
to use, go with the
+ // checkpoint resolution logic to resolve conflicting configurations.
if (commitsTimelineOpt.isPresent()) {
- checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(),
streamerConfig, props);
+ checkpoint =
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(),
streamerConfig, props);
}
+ // If there is only streamer config, extract the checkpoint directly.
+ checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props,
checkpoint);
+ return checkpoint;
+ }
+ @VisibleForTesting
+ static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
Review Comment:
java docs please
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
import java.io.IOException;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
public class StreamerCheckpointUtils {
private static final Logger LOG =
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
- public static Option<Checkpoint>
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-
HoodieStreamer.Config streamerConfig,
- TypedProperties
props) throws IOException {
+ /**
+ * The first phase of checkpoint resolution - read the checkpoint configs
from 2 sources and resolve
+ * conflicts:
+ * <ul>
+ * <li>commit metadata from the last completed instant, which can contain
what is the last checkpoint
+ * from the previous streamer ingestion.</li>
+ * <li>user checkpoint overrides specified in the writer config {@code
streamerConfig}. Users might want to
+ * forcefully set the checkpoint to an arbitrary position or start
from the very beginning.</li>
+ * </ul>
+ * The 2 sources can have conflicts, and we need to decide which config
should prevail.
+ * <p>
+ * For the second phase of checkpoint resolution please refer
+ * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and
child class overrides of this
+ * method.
+ */
+ public static Option<Checkpoint>
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+
HoodieStreamer.Config streamerConfig,
+
TypedProperties props,
+
HoodieTableMetaClient metaClient) throws IOException {
Option<Checkpoint> checkpoint = Option.empty();
+ assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+ // If we have both streamer config and commits specifying what checkpoint
to use, go with the
+ // checkpoint resolution logic to resolve conflicting configurations.
if (commitsTimelineOpt.isPresent()) {
- checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(),
streamerConfig, props);
+ checkpoint =
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(),
streamerConfig, props);
}
+ // If there is only streamer config, extract the checkpoint directly.
+ checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props,
checkpoint);
+ return checkpoint;
+ }
+ @VisibleForTesting
+ static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+ if (!StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+ || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint)) {
+ HoodieTableVersion writeTableVersion =
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props,
HoodieWriteConfig.WRITE_TABLE_VERSION));
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+ if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config,
writeTableVersion)) {
+ throw new HoodieUpgradeDowngradeException(
+ String.format("When upgrade/downgrade is happening, please avoid
setting --checkpoint option and --ignore-checkpoint for your delta streamers."
+ + " Detected invalid streamer configuration:\n%s",
streamerConfig));
Review Comment:
do you know if we are logging each individual entries here. or it just the
hash of the object
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]