This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0ae996b8116 [HUDI-8726] s3/gcs incremental source should stick to
checkpoint v1 (#12688)
0ae996b8116 is described below
commit 0ae996b811644338298b8fcde65d6a2c81418f97
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Tue Jan 28 15:12:31 2025 -0800
[HUDI-8726] s3/gcs incremental source should stick to checkpoint v1 (#12688)
- If USE_TRANSITION_TIME is configure, the checkpoint written by 0.x is
already using transition or completion time, so it should not go through the
checkpoint translation from requested to completion time. This need to be fixed.
- Pure refactoring of the test classes - moving some method to the parent
class so the subsequent commits can use them. Extracting methods to a
new class so it can be consumed by later changes.
- Introduce dummy hoodie incremental source class and target for test
coverage
For testing ingestion flow e2e to ensure we consume the right checkpoint
version and write out the right checkpoint version, we split the
coverage into 2 parts:
- Test anything interacting with S3/GCS incremental source. This is done
by introducing a dummy class as an injected dependency so we can do
validations and trigger the ingestion code covering the e2e behavior
- Test S3/GCS incremental source itself, this is done by existing unit
test against the class, they have done the testing already about the
relevant code part.
- Introducing MockGcs source coverage just like MockS3 source.
---
.../hudi/testutils/HoodieClientTestUtils.java | 2 +-
.../SparkClientFunctionalTestHarness.java | 10 +
.../common/table/checkpoint/CheckpointUtils.java | 23 +-
.../table/checkpoint/TestCheckpointUtils.java | 47 ++-
.../common/testutils/HoodieCommonTestHarness.java | 7 +-
.../sql/hudi/streaming/HoodieStreamSourceV1.scala | 10 +-
.../sql/hudi/streaming/HoodieStreamSourceV2.scala | 2 +-
.../hudi/utilities/sources/HoodieIncrSource.java | 4 +-
.../org/apache/hudi/utilities/sources/Source.java | 2 +-
.../sources/helpers/IncrSourceHelper.java | 5 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 31 +-
.../streamer/StreamerCheckpointUtils.java | 30 +-
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 4 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 20 +
.../utilities/sources/CheckpointValidator.java | 105 +++++
.../utilities/sources/DummyOperationExecutor.java | 95 +++++
.../sources/MockGcsEventsHoodieIncrSource.java | 82 ++++
.../sources/MockS3EventsHoodieIncrSource.java | 95 +++++
.../sources/S3EventsHoodieIncrSourceHarness.java | 308 +++++++++++++++
.../sources/TestGcsEventsHoodieIncrSource.java | 2 +-
.../sources/TestS3EventsHoodieIncrSource.java | 262 +------------
...stS3GcsEventsHoodieIncrSourceE2ECkpVersion.java | 430 +++++++++++++++++++++
.../hudi/utilities/streamer/TestStreamSync.java | 124 ++++--
.../streamer/TestStreamerCheckpointUtils.java | 366 ++++++++++++++++++
24 files changed, 1734 insertions(+), 332 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index a23bca1701b..c305ccd42f0 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -325,7 +325,7 @@ public class HoodieClientTestUtils {
return HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(spark.sessionState().newHadoopConf()), basePath);
}
- private static Option<HoodieCommitMetadata>
getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant
instant) {
+ public static Option<HoodieCommitMetadata>
getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant
instant) {
try {
HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
byte[] data = timeline.getInstantDetails(instant).get();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index e5f7406652a..a24710ce201 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -25,6 +25,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -38,6 +39,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -87,6 +89,7 @@ import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
import static org.apache.hudi.testutils.Assertions.assertFileSizesEqual;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -181,12 +184,19 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
return getHoodieMetaClient(storageConf, basePath,
getPropertiesForKeyGen(true));
}
+ public HoodieTableMetaClient
getHoodieMetaClientWithTableVersion(StorageConfiguration<?> storageConf, String
basePath, String tableVersion) throws IOException {
+ Properties props = getPropertiesForKeyGen(true);
+ props.put(WRITE_TABLE_VERSION.key(), tableVersion);
+ return getHoodieMetaClient(storageConf, basePath, props);
+ }
+
@Override
public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?>
storageConf, String basePath, Properties props) throws IOException {
return HoodieTableMetaClient.newTableBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(COPY_ON_WRITE)
.setPayloadClass(HoodieAvroPayload.class)
+ .setTableVersion(ConfigUtils.getIntWithAltKeys(new
TypedProperties(props), WRITE_TABLE_VERSION))
.fromProperties(props)
.initTable(storageConf.newInstance(), basePath);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
index 8e5c2db3aff..2f49f3d2b1f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
@@ -24,19 +24,31 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Objects;
+import java.util.Set;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
+import static
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
public class CheckpointUtils {
+ public static final Set<String> DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2 =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ "org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource",
+ "org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource",
+ "org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource",
+ "org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource"
+ )));
public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
||
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
{
@@ -49,14 +61,15 @@ public class CheckpointUtils {
throw new HoodieException("Checkpoint is not found in the commit metadata:
" + commitMetadata.getExtraMetadata());
}
- public static boolean targetCheckpointV2(int writeTableVersion) {
- return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
+ public static boolean shouldTargetCheckpointV2(int writeTableVersion, String
sourceClassName) {
+ return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
+ && !DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(sourceClassName);
}
// TODO(yihua): for checkpoint translation, handle cases where the
checkpoint is not exactly the
// instant or completion time
public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
- Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+ Checkpoint checkpoint, HoodieTableMetaClient metaClient,
TimelineUtils.HollowCommitHandling hollowCommitHandlingMode) {
if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS);
}
@@ -65,7 +78,9 @@ public class CheckpointUtils {
}
if (checkpoint instanceof StreamerCheckpointV1) {
// V1 -> V2 translation
- // TODO(yihua): handle USE_TRANSITION_TIME in V1
+ if (hollowCommitHandlingMode.equals(USE_TRANSITION_TIME)) {
+ return new StreamerCheckpointV2(checkpoint);
+ }
// TODO(yihua): handle different ordering between requested and
completion time
// TODO(yihua): handle timeline history / archived timeline
String instantTime = checkpoint.getCheckpointKey();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
index 715d89c5078..450627e473b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
@@ -30,9 +30,14 @@ import org.apache.hudi.exception.HoodieException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.BLOCK;
+import static
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL;
+import static
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -95,7 +100,7 @@ public class TestCheckpointUtils {
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));
Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
- StreamerCheckpointV2 translatedCheckpoint =
CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
+ StreamerCheckpointV2 translatedCheckpoint =
CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient,
FAIL);
assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}
@@ -126,7 +131,7 @@ public class TestCheckpointUtils {
Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
Exception exception = assertThrows(UnsupportedOperationException.class,
- () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint,
metaClient));
+ () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint,
metaClient, BLOCK));
assertTrue(exception.getMessage().contains("Unable to find completion
time"));
}
@@ -155,7 +160,7 @@ public class TestCheckpointUtils {
Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
Exception exception = assertThrows(UnsupportedOperationException.class,
- () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint,
metaClient));
+ () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint,
metaClient, FAIL));
assertTrue(exception.getMessage().contains("Unable to find completion
time"));
}
@@ -168,7 +173,41 @@ public class TestCheckpointUtils {
assertEquals(HoodieTimeline.INIT_INSTANT_TS,
translated.getCheckpointKey());
checkpoint = new StreamerCheckpointV2(instantTime);
- translated =
CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
+ translated =
CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient,
BLOCK);
assertEquals(HoodieTimeline.INIT_INSTANT_TS,
translated.getCheckpointKey());
}
+
+ @Test
+ public void testConvertCheckpointWithUseTransitionTime() {
+ String instantTime = "20231127010101";
+ String completionTime = "20231127020102";
+
+ // Mock active timeline
+ HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
"commit", instantTime, completionTime,
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
+ when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));
+
+ Checkpoint checkpoint = new StreamerCheckpointV1(completionTime);
+ StreamerCheckpointV2 translatedCheckpoint =
CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient,
USE_TRANSITION_TIME);
+
+ assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ // version, sourceClassName, expectedResult
+ // Version >= 8 with allowed sources should return true
+ "8, org.apache.hudi.utilities.sources.TestSource, true",
+ "9, org.apache.hudi.utilities.sources.AnotherSource, true",
+ // Version < 8 should return false regardless of source
+ "7, org.apache.hudi.utilities.sources.TestSource, false",
+ "6, org.apache.hudi.utilities.sources.AnotherSource, false",
+ // Disallowed sources should return false even with version >= 8
+ "8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
+ "8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false",
+ "8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource,
false",
+ "8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource,
false"
+ })
+ public void testTargetCheckpointV2(int version, String sourceClassName,
boolean expected) {
+ assertEquals(expected, CheckpointUtils.shouldTargetCheckpointV2(version,
sourceClassName));
+ }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index b001dcb202d..eb7a04c83e2 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -382,7 +382,12 @@ public class HoodieCommonTestHarness {
}
}
- protected byte[] getCommitMetadata(String basePath, String partition, String
commitTs, int count, Map<String, String> extraMetadata)
+ public byte[] getCommitMetadata(String basePath, String partition, String
commitTs, int count, Map<String, String> extraMetadata)
+ throws IOException {
+ return getCommitMetadata(metaClient, basePath, partition, commitTs, count,
extraMetadata);
+ }
+
+ public static byte[] getCommitMetadata(HoodieTableMetaClient metaClient,
String basePath, String partition, String commitTs, int count, Map<String,
String> extraMetadata)
throws IOException {
HoodieCommitMetadata commit = new HoodieCommitMetadata();
for (int i = 1; i <= count; i++) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
index 82a7e804656..b59a247fe15 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
@@ -67,7 +67,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
* Users can set
[[DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT]] to
* [[HollowCommitHandling.USE_TRANSITION_TIME]] to avoid the blocking
behavior.
*/
- private val hollowCommitHandling: HollowCommitHandling =
+ private val hollowCommitHandlingMode: HollowCommitHandling =
parameters.get(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT.key)
.map(HollowCommitHandling.valueOf)
.getOrElse(HollowCommitHandling.BLOCK)
@@ -103,10 +103,10 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
private def getLatestOffset: Option[HoodieSourceOffset] = {
metaClient.reloadActiveTimeline()
val filteredTimeline = handleHollowCommitIfNeeded(
- metaClient.getActiveTimeline.filterCompletedInstants(), metaClient,
hollowCommitHandling)
+ metaClient.getActiveTimeline.filterCompletedInstants(), metaClient,
hollowCommitHandlingMode)
filteredTimeline match {
case activeInstants if !activeInstants.empty() =>
- val timestamp = if (hollowCommitHandling == USE_TRANSITION_TIME) {
+ val timestamp = if (hollowCommitHandlingMode == USE_TRANSITION_TIME) {
activeInstants.getLatestCompletionTime.get()
} else {
activeInstants.lastInstant().get().requestedTime()
@@ -155,7 +155,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
DataSourceReadOptions.START_COMMIT.key ->
startCommitTime(startOffset),
DataSourceReadOptions.END_COMMIT.key -> endOffset.offsetCommitTime,
- INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT.key ->
hollowCommitHandling.name
+ INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT.key ->
hollowCommitHandlingMode.name
)
val rdd = tableType match {
@@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
private def translateCheckpoint(commitTime: String): String = {
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
CheckpointUtils.convertToCheckpointV2ForCommitTime(
- new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey
+ new StreamerCheckpointV1(commitTime), metaClient,
hollowCommitHandlingMode).getCheckpointKey
} else {
commitTime
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index f825aa7e186..66d0882e9e2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -163,7 +163,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
}
private def translateCheckpoint(commitTime: String): String = {
- if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ if
(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion.versionCode(),
getClass.getName)) {
commitTime
} else {
CheckpointUtils.convertToCheckpointV1ForCommitTime(
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 244bcbb63e3..10a02b6511a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -190,7 +190,7 @@ public class HoodieIncrSource extends RowSource {
@Override
public Pair<Option<Dataset<Row>>, Checkpoint>
fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
- if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
+ if (CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
getClass().getName())) {
return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
} else {
return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit);
@@ -215,7 +215,7 @@ public class HoodieIncrSource extends RowSource {
IncrementalQueryAnalyzer analyzer =
IncrSourceHelper.getIncrementalQueryAnalyzer(
sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy,
getIntWithAltKeys(props,
HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH),
- getLatestSourceProfile());
+ getLatestSourceProfile(), getHollowCommitHandleMode(props));
QueryContext queryContext = analyzer.analyze();
Option<InstantRange> instantRange = queryContext.getInstantRange();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 754c9e9fe60..779981fbbef 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -104,7 +104,7 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
if (lastCheckpoint.isEmpty()) {
return Option.empty();
}
- if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
+ if (CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
getClass().getName())) {
// V2 -> V2
if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
return lastCheckpoint;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index d9fa890886b..9b75549c8be 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -29,6 +29,7 @@ import
org.apache.hudi.common.table.log.InstantRange.RangeType;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -184,7 +185,7 @@ public class IncrSourceHelper {
Option<Checkpoint> lastCheckpoint,
MissingCheckpointStrategy missingCheckpointStrategy,
int numInstantsFromConfig,
- Option<SourceProfile<Integer>> latestSourceProfile) {
+ Option<SourceProfile<Integer>> latestSourceProfile,
TimelineUtils.HollowCommitHandling handlingMode) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()))
.setBasePath(srcPath)
@@ -197,7 +198,7 @@ public class IncrSourceHelper {
if (lastCheckpoint.isPresent() &&
!lastCheckpoint.get().getCheckpointKey().isEmpty()) {
// Translate checkpoint
StreamerCheckpointV2 lastStreamerCheckpointV2 =
CheckpointUtils.convertToCheckpointV2ForCommitTime(
- lastCheckpoint.get(), metaClient);
+ lastCheckpoint.get(), metaClient, handlingMode);
startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey();
rangeType = RangeType.OPEN_CLOSED;
} else if (missingCheckpointStrategy != null) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index bb408753374..f13e4c00bba 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -139,6 +140,7 @@ import static
org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
import static
org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.shouldTargetCheckpointV2;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
@@ -808,14 +810,7 @@ public class StreamSync implements Serializable, Closeable
{
}
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || cfg.commitOnErrors) {
- Map<String, String> checkpointCommitMetadata =
- !getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP)
- ? inputBatch.getCheckpointForNextBatch() != null
- ?
inputBatch.getCheckpointForNextBatch().getCheckpointCommitMetadata(
- cfg.checkpoint, cfg.ignoreCheckpoint)
- : new StreamerCheckpointV2((String)
null).getCheckpointCommitMetadata(
- cfg.checkpoint, cfg.ignoreCheckpoint)
- : Collections.emptyMap();
+ Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
if (hasErrors) {
LOG.warn("Some records failed to be merged but forcing commit since
commitOnErrors set. Errors/Total="
@@ -879,6 +874,26 @@ public class StreamSync implements Serializable, Closeable
{
return Pair.of(scheduledCompactionInstant, writeStatusRDD);
}
+ Map<String, String> extractCheckpointMetadata(InputBatch inputBatch,
TypedProperties props, int versionCode, HoodieStreamer.Config cfg) {
+ // If checkpoint force skip is enabled, return empty map
+ if (getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP)) {
+ return Collections.emptyMap();
+ }
+
+ // If we have a next checkpoint batch, use its metadata
+ if (inputBatch.getCheckpointForNextBatch() != null) {
+ return inputBatch.getCheckpointForNextBatch()
+ .getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint);
+ }
+
+ // Otherwise create new checkpoint based on version
+ Checkpoint checkpoint = shouldTargetCheckpointV2(versionCode,
cfg.sourceClassName)
+ ? new StreamerCheckpointV2((String) null)
+ : new StreamerCheckpointV1((String) null);
+
+ return checkpoint.getCheckpointCommitMetadata(cfg.checkpoint,
cfg.ignoreCheckpoint);
+ }
+
/**
* Try to start a new commit.
* <p>
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
index c4af4385a33..c2e310ec81c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
@@ -58,13 +58,13 @@ public class StreamerCheckpointUtils {
TypedProperties
props) throws IOException {
Option<Checkpoint> checkpoint = Option.empty();
if (commitsTimelineOpt.isPresent()) {
- checkpoint = getCheckpointToResumeString(commitsTimelineOpt,
streamerConfig, props);
+ checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(),
streamerConfig, props);
}
LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint);
if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
int writeTableVersion = ConfigUtils.getIntWithAltKeys(props,
HoodieWriteConfig.WRITE_TABLE_VERSION);
- checkpoint =
Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
+ checkpoint =
Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new
StreamerCheckpointV1(streamerConfig.checkpoint));
}
return checkpoint;
@@ -73,28 +73,30 @@ public class StreamerCheckpointUtils {
/**
* Process previous commit metadata and checkpoint configs set by user to
determine the checkpoint to resume from.
*
- * @param commitsTimelineOpt commits timeline of interest, including .commit
and .deltacommit.
+ * @param commitsTimeline commits timeline of interest, including .commit
and .deltacommit.
*
* @return the checkpoint to resume from if applicable.
* @throws IOException
*/
@VisibleForTesting
- static Option<Checkpoint> getCheckpointToResumeString(Option<HoodieTimeline>
commitsTimelineOpt,
+ static Option<Checkpoint> getCheckpointToResumeString(HoodieTimeline
commitsTimeline,
HoodieStreamer.Config
streamerConfig,
TypedProperties props)
throws IOException {
Option<Checkpoint> resumeCheckpoint = Option.empty();
- // try get checkpoint from commits(including commit and deltacommit)
- // in COW migrating to MOR case, the first batch of the deltastreamer will
lost the checkpoint from COW table, cause the dataloss
- HoodieTimeline deltaCommitTimeline =
commitsTimelineOpt.get().filter(instant ->
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
// has deltacommit and this is a MOR table, then we should get checkpoint
from .deltacommit
// if changing from mor to cow, before changing we must do a full
compaction, so we can only consider .commit in such case
- if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name())
&& !deltaCommitTimeline.empty()) {
- commitsTimelineOpt = Option.of(deltaCommitTimeline);
+ if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name()))
{
+ // try get checkpoint from commits(including commit and deltacommit)
+ // in COW migrating to MOR case, the first batch of the deltastreamer
will lost the checkpoint from COW table, cause the dataloss
+ HoodieTimeline deltaCommitTimeline = commitsTimeline.filter(instant ->
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+ if (!deltaCommitTimeline.empty()) {
+ commitsTimeline = deltaCommitTimeline;
+ }
}
- Option<HoodieInstant> lastCommit = commitsTimelineOpt.get().lastInstant();
+ Option<HoodieInstant> lastCommit = commitsTimeline.lastInstant();
if (lastCommit.isPresent()) {
// if previous commit metadata did not have the checkpoint key, try
traversing previous commits until we find one.
- Option<HoodieCommitMetadata> commitMetadataOption =
getLatestCommitMetadataWithValidCheckpointInfo(commitsTimelineOpt.get());
+ Option<HoodieCommitMetadata> commitMetadataOption =
getLatestCommitMetadataWithValidCheckpointInfo(commitsTimeline);
int writeTableVersion = ConfigUtils.getIntWithAltKeys(props,
HoodieWriteConfig.WRITE_TABLE_VERSION);
if (commitMetadataOption.isPresent()) {
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
@@ -106,7 +108,7 @@ public class StreamerCheckpointUtils {
resumeCheckpoint = Option.empty();
} else if (streamerConfig.checkpoint != null &&
(StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey())
||
!streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey())))
{
- resumeCheckpoint =
Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
+ resumeCheckpoint =
Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new
StreamerCheckpointV1(streamerConfig.checkpoint));
} else if
(!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) {
//if previous checkpoint is an empty string, skip resume use
Option.empty()
@@ -116,7 +118,7 @@ public class StreamerCheckpointUtils {
throw new HoodieStreamerException(
"Unable to find previous checkpoint. Please double check if this
table "
+ "was indeed built via delta streamer. Last Commit :" +
lastCommit + ", Instants :"
- + commitsTimelineOpt.get().getInstants());
+ + commitsTimeline.getInstants());
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY)))
{
@@ -124,7 +126,7 @@ public class StreamerCheckpointUtils {
}
} else if (streamerConfig.checkpoint != null) {
//
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will
never return a commit metadata w/o any checkpoint key set.
- resumeCheckpoint =
Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
+ resumeCheckpoint =
Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new
StreamerCheckpointV1(streamerConfig.checkpoint));
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 989be986b40..b6ed6139547 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -563,7 +563,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
.count());
}
- static class TestHelpers {
+ public static class TestHelpers {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath,
WriteOperationType op) {
return makeConfig(basePath, op,
Collections.singletonList(TestHoodieDeltaStreamer.DropAllTransformer.class.getName()));
@@ -590,7 +590,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName,
tableType, "timestamp", null);
}
- static HoodieDeltaStreamer.Config makeConfig(String basePath,
WriteOperationType op, String sourceClassName,
+ public static HoodieDeltaStreamer.Config makeConfig(String basePath,
WriteOperationType op, String sourceClassName,
List<String>
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean
useSchemaProviderClass,
int sourceLimit, boolean
updatePayloadClass, String payloadClassName, String tableType, String
sourceOrderingField,
String checkpoint) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index cf504ef5fd6..4e8e1342820 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -83,6 +83,7 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.sync.common.HoodieSyncConfig;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
@@ -167,6 +168,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
import static org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient;
@@ -422,6 +425,17 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
expectedKeyGeneratorClassName,
metaClient.getTableConfig().getKeyGeneratorClassName());
Dataset<Row> res = sqlContext.read().format("hudi").load(tableBasePath);
assertEquals(1000, res.count());
+ assertUseV2Checkpoint(metaClient);
+ }
+
+ private static void assertUseV2Checkpoint(HoodieTableMetaClient metaClient) {
+ metaClient.reloadActiveTimeline();
+ Option<HoodieCommitMetadata> metadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ metaClient, metaClient.getActiveTimeline().lastInstant().get());
+ assertFalse(metadata.isEmpty());
+ Map<String, String> extraMetadata = metadata.get().getExtraMetadata();
+ assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));
+ assertFalse(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V1));
}
@Test
@@ -568,6 +582,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
new HoodieDeltaStreamer(cfg, jsc).sync();
+ assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -579,6 +595,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" +
basePath + "/source_evolved.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
new HoodieDeltaStreamer(cfg, jsc).sync();
+ assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
@@ -644,6 +661,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
+ assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be
shutdown");
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -673,6 +691,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
+ assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be
shutdown");
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -1384,6 +1403,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
entry, metaClient, WriteOperationType.BULK_INSERT));
}
}
+ assertUseV2Checkpoint(createMetaClient(jsc, tableBasePath));
} finally {
deltaStreamer.shutdownGracefully();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/CheckpointValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/CheckpointValidator.java
new file mode 100644
index 00000000000..88a24cf4085
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/CheckpointValidator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.Option;
+
+import java.util.function.BiFunction;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Helper class to validate checkpoint options in test scenarios.
+ * Used by MockS3EventsHoodieIncrSource to validate checkpoint behavior.
+ */
+public class CheckpointValidator {
+ public static final String VAL_INPUT_CKP = "valInputCkp";
+
+ /**
+ * Validation keys for checkpoint testing.
+ */
+ public static final String VAL_CKP_KEY_EQ_VAL = "VAL_CKP_KEY_EQ_VAL_KEY";
+ public static final String VAL_CKP_RESET_KEY_EQUALS = "VAL_CKP_RESET_KEY";
+ public static final String VAL_CKP_RESET_KEY_IS_NULL =
"VAL_CKP_RESET_KEY_IS_NULL";
+ public static final String VAL_CKP_IGNORE_KEY_EQUALS = "VAL_CKP_IGNORE_KEY";
+ public static final String VAL_CKP_IGNORE_KEY_IS_NULL =
"VAL_CKP_IGNORE_KEY_IS_NULL";
+ public static final String VAL_NON_EMPTY_CKP_ALL_MEMBERS =
"VAL_NON_EMPTY_CKP_ALL_MEMBERS";
+ public static final String VAL_EMPTY_CKP_KEY = "VAL_EMPTY_CKP_KEY";
+ public static final String VAL_NO_OP = "VAL_NO_OP";
+
+ /*
+ * Checkpoint validation method that assert value equals to expected ones.
+ * */
+ private static final BiFunction<Option<Checkpoint>, TypedProperties, Void>
VAL_NON_EMPTY_CKP_WITH_FIXED_VALUE = (ckpOpt, props) -> {
+ assertFalse(ckpOpt.isEmpty());
+ if (props.containsKey(VAL_CKP_KEY_EQ_VAL)) {
+ assertEquals(props.getString(VAL_CKP_KEY_EQ_VAL),
ckpOpt.get().getCheckpointKey());
+ }
+
+ if (props.containsKey(VAL_CKP_RESET_KEY_EQUALS)) {
+ assertEquals(ckpOpt.get().getCheckpointResetKey(),
props.getString(VAL_CKP_RESET_KEY_EQUALS));
+ }
+ if (props.containsKey(VAL_CKP_RESET_KEY_IS_NULL)) {
+ assertNull(ckpOpt.get().getCheckpointResetKey());
+ }
+
+ if (props.containsKey(VAL_CKP_IGNORE_KEY_EQUALS)) {
+ assertEquals(ckpOpt.get().getCheckpointIgnoreKey(),
props.getString(VAL_CKP_IGNORE_KEY_EQUALS));
+ }
+ if (props.containsKey(VAL_CKP_IGNORE_KEY_IS_NULL)) {
+ assertNull(ckpOpt.get().getCheckpointIgnoreKey());
+ }
+ return null;
+ };
+
+ private static final BiFunction<Option<Checkpoint>, TypedProperties, Void>
VAL_EMPTY_CKP = (ckpOpt, props) -> {
+ assertTrue(ckpOpt.isEmpty());
+ return null;
+ };
+
+ /**
+ * Validates the checkpoint option based on validation type specified in
props.
+ *
+ * @param lastCheckpoint Option containing the last checkpoint to validate
+ * @param props TypedProperties containing validation configuration
+ * @throws IllegalArgumentException if validation fails
+ */
+ public static void validateCheckpointOption(Option<Checkpoint>
lastCheckpoint, TypedProperties props) {
+ String valType = props.getString(VAL_INPUT_CKP, VAL_NO_OP);
+
+ switch (valType) {
+ case VAL_NO_OP:
+ break;
+ case VAL_NON_EMPTY_CKP_ALL_MEMBERS:
+ VAL_NON_EMPTY_CKP_WITH_FIXED_VALUE.apply(lastCheckpoint, props);
+ break;
+ case VAL_EMPTY_CKP_KEY:
+ VAL_EMPTY_CKP.apply(lastCheckpoint, props);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported validation type: " +
valType);
+ }
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DummyOperationExecutor.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DummyOperationExecutor.java
new file mode 100644
index 00000000000..054fb95acc9
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DummyOperationExecutor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * Helper class to execute dummy operations for testing
S3EventsHoodieIncrSource.
+ * Provides different scenarios of empty row sets with various checkpoint
configurations.
+ */
+public class DummyOperationExecutor {
+
+ public static final String OP_FETCH_NEXT_BATCH = "mockTestFetchNextBatchOp";
+
+ /**
+ * Operation keys for different test scenarios returning empty row sets with
different checkpoint configurations.
+ */
+ public static final String OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY =
"OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY";
+ public static final String OP_EMPTY_ROW_SET_NULL_CKP_KEY =
"OP_EMPTY_ROW_SET_NULL_CKP";
+
+ /**
+ * Custom checkpoint values used in testing.
+ */
+ public static final String CUSTOM_CHECKPOINT1 = "custom-checkpoint1";
+ public static final String RETURN_CHECKPOINT_KEY = "RETURN_CHECKPOINT_KEY";
+
+ @FunctionalInterface
+ private interface OperationFunction {
+ Pair<Option<Dataset<Row>>, Checkpoint> apply(Option<Checkpoint>
checkpoint, Long limit, TypedProperties props);
+ }
+
+ private static final OperationFunction EMPTY_ROW_SET_NONE_NULL_CKP =
+ (checkpoint, limit, props) -> {
+ Option<Dataset<Row>> empty = Option.empty();
+ String returnCheckpoint = props.getString(RETURN_CHECKPOINT_KEY,
CUSTOM_CHECKPOINT1);
+ return Pair.of(
+ empty,
+ new StreamerCheckpointV1(returnCheckpoint));
+ };
+
+ private static final OperationFunction EMPTY_ROW_SET_NULL_CKP =
+ (checkpoint, limit, props) -> {
+ Option<Dataset<Row>> empty = Option.empty();
+ return Pair.of(
+ empty,
+ null
+ );
+ };
+
+ /**
+ * Executes the dummy operation based on the operation type in props.
+ *
+ * @param lastCheckpoint Option containing the last checkpoint
+ * @param sourceLimit maximum number of records to fetch
+ * @param props TypedProperties containing the operation type
+ * @return Pair containing Option<Dataset<Row>> and Checkpoint
+ * @throws IllegalArgumentException if operation type is not supported
+ */
+ public static Pair<Option<Dataset<Row>>, Checkpoint> executeDummyOperation(
+ Option<Checkpoint> lastCheckpoint, long sourceLimit, TypedProperties
props) {
+ String opType = props.getString(OP_FETCH_NEXT_BATCH,
OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY);
+ switch (opType) {
+ case OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY:
+ return EMPTY_ROW_SET_NONE_NULL_CKP.apply(lastCheckpoint, sourceLimit,
props);
+ case OP_EMPTY_ROW_SET_NULL_CKP_KEY:
+ return EMPTY_ROW_SET_NULL_CKP.apply(lastCheckpoint, sourceLimit,
props);
+ default:
+ throw new IllegalArgumentException("Unsupported operation type: " +
opType);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockGcsEventsHoodieIncrSource.java
new file mode 100644
index 00000000000..d0e8db7e738
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockGcsEventsHoodieIncrSource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A mock implementation of GcsEventsHoodieIncrSource used for testing
StreamSync functionality.
+ * This class simulates different checkpoint and data fetch scenarios to test
the checkpoint handling
+ * and data ingestion behavior of the StreamSync class.
+ */
+public class MockGcsEventsHoodieIncrSource extends GcsEventsHoodieIncrSource {
+
+ public MockGcsEventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext jsc,
+ SparkSession spark,
+ SchemaProvider schemaProvider,
+ HoodieIngestionMetrics metrics) {
+ super(props, jsc, spark,
+ new CloudDataFetcher(props, jsc, spark, metrics),
+ new QueryRunner(spark, props),
+ new DefaultStreamContext(schemaProvider, Option.empty())
+ );
+ }
+
+ public MockGcsEventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext jsc,
+ SparkSession spark,
+ HoodieIngestionMetrics metrics,
+ StreamContext streamContext) {
+ super(props, jsc, spark,
+ new CloudDataFetcher(props, jsc, spark, metrics),
+ new QueryRunner(spark, props),
+ streamContext
+ );
+ }
+
+ /**
+ * Overrides the fetchNextBatch method to simulate different test scenarios
based on configuration.
+ *
+ * @param lastCheckpoint Option containing the last checkpoint
+ * @param sourceLimit maximum number of records to fetch
+ * @return Pair containing Option<Dataset<Row>> and Checkpoint
+ */
+ @Override
+ public Pair<Option<Dataset<Row>>, Checkpoint>
fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
+ CheckpointValidator.validateCheckpointOption(lastCheckpoint, props);
+ return DummyOperationExecutor.executeDummyOperation(lastCheckpoint,
sourceLimit, props);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockS3EventsHoodieIncrSource.java
new file mode 100644
index 00000000000..da1795bed94
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockS3EventsHoodieIncrSource.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A mock implementation of S3EventsHoodieIncrSource used for testing
StreamSync functionality.
+ * This class simulates different checkpoint and data fetch scenarios to test
the checkpoint handling
+ * and data ingestion behavior of the StreamSync class.
+ */
+public class MockS3EventsHoodieIncrSource extends S3EventsHoodieIncrSource {
+
+ /**
+ * Constructs a new MockS3EventsHoodieIncrSource with the specified
parameters.
+ *
+ * @param props TypedProperties containing configuration properties
+ * @param sparkContext JavaSparkContext instance
+ * @param sparkSession SparkSession instance
+ * @param schemaProvider SchemaProvider for the source
+ * @param metrics HoodieIngestionMetrics instance
+ */
+ public MockS3EventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider,
+ HoodieIngestionMetrics metrics) {
+ this(props, sparkContext, sparkSession, new QueryRunner(sparkSession,
props),
+ new CloudDataFetcher(props, sparkContext, sparkSession, metrics), new
DefaultStreamContext(schemaProvider, Option.empty()));
+ }
+
+ public MockS3EventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ HoodieIngestionMetrics metrics,
+ StreamContext streamContext) {
+ this(props, sparkContext, sparkSession, new QueryRunner(sparkSession,
props),
+ new CloudDataFetcher(props, sparkContext, sparkSession, metrics),
streamContext);
+ }
+
+ MockS3EventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ QueryRunner queryRunner,
+ CloudDataFetcher cloudDataFetcher,
+ StreamContext streamContext) {
+ super(props, sparkContext, sparkSession, queryRunner, cloudDataFetcher,
streamContext);
+ }
+
+ /**
+ * Overrides the fetchNextBatch method to simulate different test scenarios
based on configuration.
+ *
+ * @param lastCheckpoint Option containing the last checkpoint
+ * @param sourceLimit maximum number of records to fetch
+ * @return Pair containing Option<Dataset<Row>> and Checkpoint
+ */
+ @Override
+ public Pair<Option<Dataset<Row>>, Checkpoint>
fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
+ CheckpointValidator.validateCheckpointOption(lastCheckpoint, props);
+ return DummyOperationExecutor.executeDummyOperation(lastCheckpoint,
sourceLimit, props);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java
new file mode 100644
index 00000000000..03df6201ba3
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class S3EventsHoodieIncrSourceHarness extends
SparkClientFunctionalTestHarness {
+ protected static final Schema S3_METADATA_SCHEMA =
SchemaTestUtil.getSchemaFromResource(
+ S3EventsHoodieIncrSourceHarness.class,
"/streamer-config/s3-metadata.avsc", true);
+
+ protected ObjectMapper mapper = new ObjectMapper();
+
+ protected static final String MY_BUCKET = "some-bucket";
+ protected static final String IGNORE_FILE_EXTENSION = ".ignore";
+
+ protected Option<SchemaProvider> schemaProvider;
+ @Mock
+ QueryRunner mockQueryRunner;
+ @Mock
+ CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon;
+ @Mock
+ SourceProfileSupplier sourceProfileSupplier;
+ @Mock
+ QueryInfo queryInfo;
+ @Mock
+ HoodieIngestionMetrics metrics;
+ protected JavaSparkContext jsc;
+ protected HoodieTableMetaClient metaClient;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+ String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
+ TypedProperties props = new TypedProperties();
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.streamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+ this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc));
+ }
+
+ protected List<String> getSampleS3ObjectKeys(List<Triple<String, Long,
String>> filePathSizeAndCommitTime) {
+ return filePathSizeAndCommitTime.stream().map(f -> {
+ try {
+ return generateS3EventMetadata(f.getMiddle(), MY_BUCKET, f.getLeft(),
f.getRight());
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ protected Dataset<Row> generateDataset(List<Triple<String, Long, String>>
filePathSizeAndCommitTime) {
+ JavaRDD<String> testRdd =
jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
+ Dataset<Row> inputDs = spark().read().json(testRdd);
+ return inputDs;
+ }
+
+ /**
+ * Generates simple Json structure like below
+ * <p>
+ * s3 : {
+ * object : {
+ * size:
+ * key:
+ * }
+ * bucket: {
+ * name:
+ * }
+ */
+ protected String generateS3EventMetadata(Long objectSize, String bucketName,
String objectKey, String commitTime)
+ throws JsonProcessingException {
+ Map<String, Object> objectMetadata = new HashMap<>();
+ objectMetadata.put("size", objectSize);
+ objectMetadata.put("key", objectKey);
+ Map<String, String> bucketMetadata = new HashMap<>();
+ bucketMetadata.put("name", bucketName);
+ Map<String, Object> s3Metadata = new HashMap<>();
+ s3Metadata.put("object", objectMetadata);
+ s3Metadata.put("bucket", bucketMetadata);
+ Map<String, Object> eventMetadata = new HashMap<>();
+ eventMetadata.put("s3", s3Metadata);
+ eventMetadata.put("_hoodie_commit_time", commitTime);
+ return mapper.writeValueAsString(eventMetadata);
+ }
+
+ protected HoodieRecord generateS3EventMetadata(String commitTime, String
bucketName, String objectKey, Long objectSize) {
+ String partitionPath = bucketName;
+ Schema schema = S3_METADATA_SCHEMA;
+ GenericRecord rec = new GenericData.Record(schema);
+ Schema.Field s3Field = schema.getField("s3");
+ Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the
record schema is the second type
+ // Create a generic record for the "s3" field
+ GenericRecord s3Record = new GenericData.Record(s3Schema);
+
+ Schema.Field s3BucketField = s3Schema.getField("bucket");
+ Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming
the record schema is the second type
+ GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
+ s3BucketRec.put("name", bucketName);
+
+
+ Schema.Field s3ObjectField = s3Schema.getField("object");
+ Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming
the record schema is the second type
+ GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
+ s3ObjectRec.put("key", objectKey);
+ s3ObjectRec.put("size", objectSize);
+
+ s3Record.put("bucket", s3BucketRec);
+ s3Record.put("object", s3ObjectRec);
+ rec.put("s3", s3Record);
+ rec.put("_hoodie_commit_time", commitTime);
+
+ HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
+ return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath),
payload);
+ }
+
+ protected TypedProperties
setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) {
+ Properties properties = new Properties();
+ properties.setProperty("hoodie.streamer.source.hoodieincr.path",
basePath());
+
properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy",
+ missingCheckpointStrategy.name());
+ properties.setProperty("hoodie.streamer.source.hoodieincr.file.format",
"json");
+ return new TypedProperties(properties);
+ }
+
+ protected HoodieWriteConfig.Builder getConfigBuilder(String basePath,
HoodieTableMetaClient metaClient) {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withSchema(S3_METADATA_SCHEMA.toString())
+ .withParallelism(2, 2)
+ .withBulkInsertParallelism(2)
+ .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+ .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+ .forTable(metaClient.getTableConfig().getTableName());
+ }
+
+ protected HoodieWriteConfig getWriteConfig() {
+ return getConfigBuilder(basePath(), metaClient)
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
3).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .build();
+ }
+
+ protected Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String
commitTime) throws IOException {
+ HoodieWriteConfig writeConfig = getWriteConfig();
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+
+ writeClient.startCommitWithTime(commitTime);
+ List<HoodieRecord> s3MetadataRecords = Arrays.asList(
+ generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json",
1L)
+ );
+ JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+
+ List<WriteStatus> statuses = result.collect();
+ assertNoWriteErrors(statuses);
+
+ return Pair.of(commitTime, s3MetadataRecords);
+ }
+ }
+
+ protected void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
+ Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint,
+ TypedProperties typedProperties) {
+ S3EventsHoodieIncrSource incrSource = new
S3EventsHoodieIncrSource(typedProperties, jsc(),
+ spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(),
spark(), metrics, mockCloudObjectsSelectorCommon),
+ new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
+
+ Pair<Option<Dataset<Row>>, Checkpoint> dataAndCheckpoint =
incrSource.fetchNextBatch(
+ checkpointToPull.isPresent() ? Option.of(new
StreamerCheckpointV1(checkpointToPull.get())) : Option.empty(), sourceLimit);
+
+ Option<Dataset<Row>> datasetOpt = dataAndCheckpoint.getLeft();
+ Checkpoint nextCheckPoint = dataAndCheckpoint.getRight();
+
+ Assertions.assertNotNull(nextCheckPoint);
+ Assertions.assertEquals(new StreamerCheckpointV1(expectedCheckpoint),
nextCheckPoint);
+ }
+
+ protected void setMockQueryRunner(Dataset<Row> inputDs) {
+ setMockQueryRunner(inputDs, Option.empty());
+ }
+
+ protected void setMockQueryRunner(Dataset<Row> inputDs, Option<String>
nextCheckPointOpt) {
+
+ when(mockQueryRunner.run(Mockito.any(QueryInfo.class),
Mockito.any())).thenAnswer(invocation -> {
+ QueryInfo queryInfo = invocation.getArgument(0);
+ QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
+ queryInfo.withUpdatedEndInstant(nextCheckPoint))
+ .orElse(queryInfo);
+ if (updatedQueryInfo.isSnapshot()) {
+ return Pair.of(updatedQueryInfo,
+ inputDs.filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ updatedQueryInfo.getStartInstant()))
+ .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ updatedQueryInfo.getEndInstant())));
+ }
+ return Pair.of(updatedQueryInfo, inputDs);
+ });
+ }
+
+ protected void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
+ Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
+ TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+
+ readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit,
expectedCheckpoint, typedProperties);
+ }
+
+ static class TestSourceProfile implements SourceProfile<Long> {
+ private final long maxSourceBytes;
+ private final int sourcePartitions;
+ private final long bytesPerPartition;
+
+ public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long
bytesPerPartition) {
+ this.maxSourceBytes = maxSourceBytes;
+ this.sourcePartitions = sourcePartitions;
+ this.bytesPerPartition = bytesPerPartition;
+ }
+
+ @Override
+ public long getMaxSourceBytes() {
+ return maxSourceBytes;
+ }
+
+ @Override
+ public int getSourcePartitions() {
+ return sourcePartitions;
+ }
+
+ @Override
+ public Long getSourceSpecificContext() {
+ return bytesPerPartition;
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 8b5ab0bddc9..e66018f9365 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -44,12 +44,12 @@ import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import
org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource.TestSourceProfile;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import
org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness.TestSourceProfile;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index e07ff5022de..4218aaca26d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -18,49 +18,18 @@
package org.apache.hudi.utilities.sources;
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieAvroPayload;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
-import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
-import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
-import org.apache.hudi.config.HoodieArchivalConfig;
-import org.apache.hudi.config.HoodieCleanConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.CloudSourceConfig;
-import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
-import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
-import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
-import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
-import org.apache.hudi.utilities.sources.helpers.QueryInfo;
-import org.apache.hudi.utilities.sources.helpers.QueryRunner;
-import
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceProfile;
-import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
@@ -71,20 +40,14 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -93,158 +56,12 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarness {
- private static final Schema S3_METADATA_SCHEMA =
SchemaTestUtil.getSchemaFromResource(
- TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc",
true);
-
- private ObjectMapper mapper = new ObjectMapper();
-
- private static final String MY_BUCKET = "some-bucket";
- private static final String IGNORE_FILE_EXTENSION = ".ignore";
-
- private Option<SchemaProvider> schemaProvider;
- @Mock
- QueryRunner mockQueryRunner;
- @Mock
- CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon;
- @Mock
- SourceProfileSupplier sourceProfileSupplier;
- @Mock
- QueryInfo queryInfo;
- @Mock
- HoodieIngestionMetrics metrics;
- private JavaSparkContext jsc;
- private HoodieTableMetaClient metaClient;
+public class TestS3EventsHoodieIncrSource extends
S3EventsHoodieIncrSourceHarness {
@BeforeEach
public void setUp() throws IOException {
- jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+ super.setUp();
metaClient = getHoodieMetaClient(storageConf(), basePath());
- String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
- TypedProperties props = new TypedProperties();
- props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
- props.put("hoodie.streamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
- this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc));
- }
-
- private List<String> getSampleS3ObjectKeys(List<Triple<String, Long,
String>> filePathSizeAndCommitTime) {
- return filePathSizeAndCommitTime.stream().map(f -> {
- try {
- return generateS3EventMetadata(f.getMiddle(), MY_BUCKET, f.getLeft(),
f.getRight());
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
- }
-
- private Dataset<Row> generateDataset(List<Triple<String, Long, String>>
filePathSizeAndCommitTime) {
- JavaRDD<String> testRdd =
jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
- Dataset<Row> inputDs = spark().read().json(testRdd);
- return inputDs;
- }
-
- /**
- * Generates simple Json structure like below
- * <p>
- * s3 : {
- * object : {
- * size:
- * key:
- * }
- * bucket: {
- * name:
- * }
- */
- private String generateS3EventMetadata(Long objectSize, String bucketName,
String objectKey, String commitTime)
- throws JsonProcessingException {
- Map<String, Object> objectMetadata = new HashMap<>();
- objectMetadata.put("size", objectSize);
- objectMetadata.put("key", objectKey);
- Map<String, String> bucketMetadata = new HashMap<>();
- bucketMetadata.put("name", bucketName);
- Map<String, Object> s3Metadata = new HashMap<>();
- s3Metadata.put("object", objectMetadata);
- s3Metadata.put("bucket", bucketMetadata);
- Map<String, Object> eventMetadata = new HashMap<>();
- eventMetadata.put("s3", s3Metadata);
- eventMetadata.put("_hoodie_commit_time", commitTime);
- return mapper.writeValueAsString(eventMetadata);
- }
-
- private HoodieRecord generateS3EventMetadata(String commitTime, String
bucketName, String objectKey, Long objectSize) {
- String partitionPath = bucketName;
- Schema schema = S3_METADATA_SCHEMA;
- GenericRecord rec = new GenericData.Record(schema);
- Schema.Field s3Field = schema.getField("s3");
- Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the
record schema is the second type
- // Create a generic record for the "s3" field
- GenericRecord s3Record = new GenericData.Record(s3Schema);
-
- Schema.Field s3BucketField = s3Schema.getField("bucket");
- Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming
the record schema is the second type
- GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
- s3BucketRec.put("name", bucketName);
-
-
- Schema.Field s3ObjectField = s3Schema.getField("object");
- Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming
the record schema is the second type
- GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
- s3ObjectRec.put("key", objectKey);
- s3ObjectRec.put("size", objectSize);
-
- s3Record.put("bucket", s3BucketRec);
- s3Record.put("object", s3ObjectRec);
- rec.put("s3", s3Record);
- rec.put("_hoodie_commit_time", commitTime);
-
- HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
- return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath),
payload);
- }
-
- private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy) {
- Properties properties = new Properties();
- properties.setProperty("hoodie.streamer.source.hoodieincr.path",
basePath());
-
properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy",
- missingCheckpointStrategy.name());
- properties.setProperty("hoodie.streamer.source.hoodieincr.file.format",
"json");
- return new TypedProperties(properties);
- }
-
- private HoodieWriteConfig.Builder getConfigBuilder(String basePath,
HoodieTableMetaClient metaClient) {
- return HoodieWriteConfig.newBuilder()
- .withPath(basePath)
- .withSchema(S3_METADATA_SCHEMA.toString())
- .withParallelism(2, 2)
- .withBulkInsertParallelism(2)
- .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
- .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
- .forTable(metaClient.getTableConfig().getTableName());
- }
-
- private HoodieWriteConfig getWriteConfig() {
- return getConfigBuilder(basePath(), metaClient)
-
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
3).build())
-
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
- .withMetadataConfig(HoodieMetadataConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(1).build())
- .build();
- }
-
- private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String
commitTime) throws IOException {
- HoodieWriteConfig writeConfig = getWriteConfig();
- try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
-
- writeClient.startCommitWithTime(commitTime);
- List<HoodieRecord> s3MetadataRecords = Arrays.asList(
- generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json",
1L)
- );
- JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
-
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
-
- return Pair.of(commitTime, s3MetadataRecords);
- }
}
@Test
@@ -553,77 +370,4 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
assertEquals(Source.SourceType.ROW, s3Source.getSourceType());
}
-
- private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
- Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint,
- TypedProperties typedProperties) {
- S3EventsHoodieIncrSource incrSource = new
S3EventsHoodieIncrSource(typedProperties, jsc(),
- spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(),
spark(), metrics, mockCloudObjectsSelectorCommon),
- new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
-
- Pair<Option<Dataset<Row>>, Checkpoint> dataAndCheckpoint =
incrSource.fetchNextBatch(
- checkpointToPull.isPresent() ? Option.of(new
StreamerCheckpointV1(checkpointToPull.get())) : Option.empty(), sourceLimit);
-
- Option<Dataset<Row>> datasetOpt = dataAndCheckpoint.getLeft();
- Checkpoint nextCheckPoint = dataAndCheckpoint.getRight();
-
- Assertions.assertNotNull(nextCheckPoint);
- Assertions.assertEquals(new StreamerCheckpointV1(expectedCheckpoint),
nextCheckPoint);
- }
-
- private void setMockQueryRunner(Dataset<Row> inputDs) {
- setMockQueryRunner(inputDs, Option.empty());
- }
-
- private void setMockQueryRunner(Dataset<Row> inputDs, Option<String>
nextCheckPointOpt) {
-
- when(mockQueryRunner.run(Mockito.any(QueryInfo.class),
Mockito.any())).thenAnswer(invocation -> {
- QueryInfo queryInfo = invocation.getArgument(0);
- QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
- queryInfo.withUpdatedEndInstant(nextCheckPoint))
- .orElse(queryInfo);
- if (updatedQueryInfo.isSnapshot()) {
- return Pair.of(updatedQueryInfo,
- inputDs.filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- updatedQueryInfo.getStartInstant()))
- .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- updatedQueryInfo.getEndInstant())));
- }
- return Pair.of(updatedQueryInfo, inputDs);
- });
- }
-
- private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
- Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
- TypedProperties typedProperties = setProps(missingCheckpointStrategy);
-
- readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit,
expectedCheckpoint, typedProperties);
- }
-
- static class TestSourceProfile implements SourceProfile<Long> {
- private final long maxSourceBytes;
- private final int sourcePartitions;
- private final long bytesPerPartition;
-
- public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long
bytesPerPartition) {
- this.maxSourceBytes = maxSourceBytes;
- this.sourcePartitions = sourcePartitions;
- this.bytesPerPartition = bytesPerPartition;
- }
-
- @Override
- public long getMaxSourceBytes() {
- return maxSourceBytes;
- }
-
- @Override
- public int getSourcePartitions() {
- return sourcePartitions;
- }
-
- @Override
- public Long getSourceSpecificContext() {
- return bytesPerPartition;
- }
- }
}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java
new file mode 100644
index 00000000000..7c461fa8450
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.common.config.DFSPropertiesConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
+import org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource;
+import org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource;
+import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource;
+import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
+import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP;
+import static
org.apache.hudi.utilities.sources.CheckpointValidator.VAL_CKP_IGNORE_KEY_IS_NULL;
+import static
org.apache.hudi.utilities.sources.CheckpointValidator.VAL_CKP_KEY_EQ_VAL;
+import static
org.apache.hudi.utilities.sources.CheckpointValidator.VAL_CKP_RESET_KEY_IS_NULL;
+import static
org.apache.hudi.utilities.sources.CheckpointValidator.VAL_EMPTY_CKP_KEY;
+import static
org.apache.hudi.utilities.sources.CheckpointValidator.VAL_INPUT_CKP;
+import static
org.apache.hudi.utilities.sources.CheckpointValidator.VAL_NON_EMPTY_CKP_ALL_MEMBERS;
+import static
org.apache.hudi.utilities.sources.DummyOperationExecutor.OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY;
+import static
org.apache.hudi.utilities.sources.DummyOperationExecutor.OP_EMPTY_ROW_SET_NULL_CKP_KEY;
+import static
org.apache.hudi.utilities.sources.DummyOperationExecutor.OP_FETCH_NEXT_BATCH;
+import static
org.apache.hudi.utilities.sources.DummyOperationExecutor.RETURN_CHECKPOINT_KEY;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
+import static
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+@ExtendWith(MockitoExtension.class)
+public class TestS3GcsEventsHoodieIncrSourceE2ECkpVersion extends
S3EventsHoodieIncrSourceHarness {
+
+ private String toggleVersion(String version) {
+ return "8".equals(version) ? "6" : "8";
+ }
+
+ private HoodieDeltaStreamer.Config createConfig(String basePath, String
sourceCheckpoint) {
+ return createConfig(basePath, sourceCheckpoint,
MockS3EventsHoodieIncrSource.class.getName());
+ }
+
+ private HoodieDeltaStreamer.Config createConfig(String basePath, String
sourceCheckpoint, String sourceClass) {
+ HoodieDeltaStreamer.Config cfg =
HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(
+ basePath,
+ WriteOperationType.INSERT,
+ sourceClass,
+ Collections.emptyList(),
+ sourceCheckpoint != null ?
DFSPropertiesConfiguration.DEFAULT_PATH.toString() : null,
+ false,
+ false,
+ 100000,
+ false,
+ null,
+ null,
+ "timestamp",
+ sourceCheckpoint);
+ cfg.propsFilePath = DFSPropertiesConfiguration.DEFAULT_PATH.toString();
+ return cfg;
+ }
+
+ private TypedProperties setupBaseProperties(String tableVersion) {
+ TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT);
+ props.put(WRITE_TABLE_VERSION.key(), tableVersion);
+ return props;
+ }
+
+ public void verifyLastInstantCommitMetadata(Map<String, String>
expectedMetadata) {
+ metaClient.reloadActiveTimeline();
+ Option<HoodieCommitMetadata> metadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ metaClient, metaClient.getActiveTimeline().lastInstant().get());
+ assertFalse(metadata.isEmpty());
+ assertEquals(metadata.get().getExtraMetadata(), expectedMetadata);
+ }
+
+
+ /**
+ * Tests the end-to-end sync behavior with multiple sync iterations.
+ *
+ * Test flow:
+ * 1. First sync:
+ * - Starts with no checkpoint
+ * - Ingests data until checkpoint "10"
+ * - Verifies commit metadata contains only checkpoint="10"
+ *
+ * 2. Second sync:
+ * - Uses different table version
+ * - Continues from checkpoint "10" to "20"
+ * - Verifies commit metadata contains checkpoint="20"
+ *
+ * 3. Third sync:
+ * - Upgrades to table version 8
+ * - Continues from checkpoint "20" to "30"
+ * - Verifies commit metadata still uses checkpoint V1 format
+ * - Verifies final checkpoint="30"
+ */
+
+ @ParameterizedTest
+ @CsvSource({
+ "6, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource",
+ "8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource",
+ "6, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource",
+ "8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource"
+ })
+ public void testSyncE2ENoPrevCkpThenSyncMultipleTimes(String tableVersion,
String sourceClass) throws Exception {
+ // First start with no previous checkpoint and ingest till ckp 1 with
table version.
+ // Disable auto upgrade and MDT as we want to keep things as it is.
+ metaClient = getHoodieMetaClientWithTableVersion(storageConf(),
basePath(), tableVersion);
+ TypedProperties props = setupBaseProperties(tableVersion);
+ // Round 1: ingest start from beginning to checkpoint 10. No checkpoint
override.
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY);
+ props.put(RETURN_CHECKPOINT_KEY, "10");
+ // Validating the source input ckp is empty when doing the sync.
+ props.put(VAL_INPUT_CKP, VAL_EMPTY_CKP_KEY);
+ props.put("hoodie.metadata.enable", "false");
+ props.put("hoodie.write.auto.upgrade", "false");
+
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(createConfig(basePath(),
null, sourceClass), jsc, Option.of(props));
+ ds.sync();
+
+ Map<String, String> expectedMetadata = new HashMap<>();
+ // Confirm the resulting checkpoint is 10 and no other metadata.
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "10");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+
+ // Then resume from ckp 1 and ingest till ckp 2 with table version Y.
+ // Disable auto upgrade and MDT as we want to keep things as it is.
+ props = setupBaseProperties(toggleVersion(tableVersion));
+ props.put("hoodie.metadata.enable", "false");
+ // Dummy behavior injection to return ckp 2.
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY);
+ props.put(RETURN_CHECKPOINT_KEY, "20");
+ props.put("hoodie.write.auto.upgrade", "false");
+ // Validate the given checkpoint is ckp 1 when doing the sync.
+ props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS);
+ props.put(VAL_CKP_KEY_EQ_VAL, "10");
+ props.put(VAL_CKP_RESET_KEY_IS_NULL, "IGNORED");
+ props.put(VAL_CKP_IGNORE_KEY_IS_NULL, "IGNORED");
+
+ ds = new HoodieDeltaStreamer(createConfig(basePath(), null, sourceClass),
jsc, Option.of(props));
+ ds.sync();
+
+ // We do not allow table version 8 and ingest with version 6 delta
streamer. But not table with version 6
+ // and delta streamer with version 8.
+ expectedMetadata = new HashMap<>();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "20");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+
+ // In the third round, enable MDT and auto upgrade, use table version 8
+ props = setupBaseProperties("8");
+ props.put("hoodie.metadata.enable", "false");
+ // Dummy behavior injection to return ckp 1.
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY);
+ props.put(RETURN_CHECKPOINT_KEY, "30");
+ props.put("hoodie.write.auto.upgrade", "false");
+ // Validate the given checkpoint is ckp 2 when doing the sync.
+ props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS);
+ props.put(VAL_CKP_KEY_EQ_VAL, "20");
+ props.put(VAL_CKP_RESET_KEY_IS_NULL, "IGNORED");
+ props.put(VAL_CKP_IGNORE_KEY_IS_NULL, "IGNORED");
+
+ ds = new HoodieDeltaStreamer(createConfig(basePath(), null, sourceClass),
jsc, Option.of(props));
+ ds.sync();
+
+ // After upgrading, we still use checkpoint V1 since this is s3/Gcs
incremental source.
+ expectedMetadata = new HashMap<>();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "30");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+ }
+
+ /**
+ * Tests sync behavior with checkpoint override configurations.
+ *
+ * Test flow:
+ * 1. Initial sync with override:
+ * - Starts with no previous commits but config forces start at "10"
+ * - Ingests until checkpoint "30"
+ * - Verifies metadata contains checkpoint="30" and reset_key="10"
+ *
+ * 2. Second sync with same override:
+ * - Continues from "30" to "40"
+ * - Verifies reset_key="10" is maintained
+ *
+ * 3. Third sync without override:
+ * - Continues from "40" to "50"
+ * - Verifies reset_key is removed from metadata
+ *
+ * 4. Fourth sync with ignore checkpoint:
+ * - Sets ignore_checkpoint="50"
+ * - Ingests to "60"
+ * - Verifies metadata contains ignore_key="50"
+ *
+ * 5. Final syncs:
+ * - Tests that ignore_key persists with same config
+ * - Verifies ignore_key is removed when config is lifted
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"6", "8"})
+ public void testSyncE2ENoPrevCkpWithCkpOverride(String tableVersion) throws
Exception {
+ // The streamer started clean with no previous commit metadata, now
streamer config forcefully
+ // set it to start at ckp "10" and in that iteration the streamer stops at
ckp "30
+ metaClient = getHoodieMetaClientWithTableVersion(storageConf(),
basePath(), tableVersion);
+ TypedProperties props = setupBaseProperties(tableVersion);
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY);
+ props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS);
+ props.put(RETURN_CHECKPOINT_KEY, "30"); // The data source said it stops
at "30".
+ props.put(VAL_CKP_KEY_EQ_VAL, "10"); // Ensure the data source is notified
we should start at "10"
+
+ HoodieDeltaStreamer.Config cfg = createConfig(basePath(), "10");
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc,
Option.of(props));
+ ds.sync();
+
+ Map<String, String> expectedMetadata = new HashMap<>();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "30");
+ expectedMetadata.put(STREAMER_CHECKPOINT_RESET_KEY_V1, "10");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+
+ // Set dummy data source behavior
+ props.put(RETURN_CHECKPOINT_KEY, "40"); // The data source said it stops
at "40".
+ props.put(VAL_CKP_KEY_EQ_VAL, "30"); // Ensure the data source is notified
we should start at "10".
+ // With the same config that contains the checkpoint override, it does not
affect ds to proceed normally.
+ ds = new HoodieDeltaStreamer(cfg, jsc, Option.of(props));
+ ds.sync();
+
+ expectedMetadata.clear();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "40");
+ expectedMetadata.put(STREAMER_CHECKPOINT_RESET_KEY_V1, "10");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+
+ // Later if we lift the override config from props, the reset key info is
also gone in the resulting
+ // commit metadata.
+ props.put(VAL_CKP_KEY_EQ_VAL, "40"); // Ensure the data source is notified
we should start at "40".
+ props.put(RETURN_CHECKPOINT_KEY, "50");
+ ds = new HoodieDeltaStreamer(createConfig(basePath(), null), jsc,
Option.of(props));
+ ds.sync();
+
+ expectedMetadata.clear();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "50"); // no more reset
key.
+ verifyLastInstantCommitMetadata(expectedMetadata);
+
+ // Check ignore key config validation
+ // Later if we lift the override config from props, the reset key info is
also gone in the resulting
+ // commit metadata.
+ props.put(VAL_INPUT_CKP, VAL_EMPTY_CKP_KEY); // Ensure the data source is
notified we should start at "40".
+ props.put(RETURN_CHECKPOINT_KEY, "60");
+ cfg = createConfig(basePath(), null);
+ cfg.ignoreCheckpoint = "50"; // Set the ckp to be the same as where we
stopped at in the last iteration.
+ ds = new HoodieDeltaStreamer(cfg, jsc, Option.of(props));
+ ds.sync();
+
+ expectedMetadata.clear();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "60");
+ expectedMetadata.put(CHECKPOINT_IGNORE_KEY, "50");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+
+ // In the next iteration, using the same config with ignore key setting
does not lead to checkpoint reset.
+ props = setupBaseProperties(tableVersion);
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY);
+ props.put(RETURN_CHECKPOINT_KEY, "70"); // Stop at 70 after ingestion.
+ props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS);
+ props.put(VAL_CKP_KEY_EQ_VAL, "60"); // Ensure the data source is notified
we should start at "60".
+ ds = new HoodieDeltaStreamer(cfg, jsc, Option.of(props));
+ ds.sync();
+
+ expectedMetadata.clear();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "70");
+ expectedMetadata.put(CHECKPOINT_IGNORE_KEY, "50");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+
+ // Once we lift the ignore key in the given streamer config, the resulting
commit metadata also won't
+ // contain that info.
+ cfg.ignoreCheckpoint = null;
+ props = setupBaseProperties(tableVersion);
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY);
+ props.put(RETURN_CHECKPOINT_KEY, "80"); // Stop at 70 after ingestion.
+ props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS);
+ props.put(VAL_CKP_KEY_EQ_VAL, "70"); // Ensure the data source is notified
we should start at "60".
+ ds = new HoodieDeltaStreamer(cfg, jsc, Option.of(props));
+ ds.sync();
+
+ expectedMetadata.clear();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, "80");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+ }
+
+ /**
+ * Tests sync behavior when source returns null checkpoint and no previous
checkpoint exists.
+ *
+ * Expected behavior:
+ * - Sync completes successfully
+ * - Commit metadata contains only schema="" with no checkpoint information
+ * - Requires allowCommitOnNoCheckpointChange=true to permit commit
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"6", "8"})
+ public void testSyncE2ENoNextCkpNoPrevCkp(String tableVersion) throws
Exception {
+ metaClient = getHoodieMetaClientWithTableVersion(storageConf(),
basePath(), tableVersion);
+ TypedProperties props = setupBaseProperties(tableVersion);
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NULL_CKP_KEY);
+ props.put(VAL_INPUT_CKP, VAL_EMPTY_CKP_KEY);
+
+ HoodieDeltaStreamer.Config cfg = createConfig(basePath(), null);
+ cfg.allowCommitOnNoCheckpointChange = true;
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc,
Option.of(props));
+ ds.sync();
+
+ Map<String, String> expectedMetadata = new HashMap<>();
+ expectedMetadata.put("schema", "");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+ }
+
+ /**
+ * Tests sync behavior when source returns null checkpoint but has previous
checkpoint.
+ *
+ * Expected behavior:
+ * - Sync completes with previous checkpoint "previousCkp"
+ * - Commit metadata contains:
+ * - schema=""
+ * - checkpoint_reset_key="previousCkp"
+ * - Requires allowCommitOnNoCheckpointChange=true to permit commit
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"6", "8"})
+ public void testSyncE2ENoNextCkpHasPrevCkp(String tableVersion) throws
Exception {
+ metaClient = getHoodieMetaClientWithTableVersion(storageConf(),
basePath(), tableVersion);
+ String previousCkp = "previousCkp";
+ TypedProperties props = setupBaseProperties(tableVersion);
+ props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS);
+ props.put(VAL_CKP_KEY_EQ_VAL, previousCkp);
+ props.put(VAL_CKP_RESET_KEY_IS_NULL, "");
+ props.put(VAL_CKP_IGNORE_KEY_IS_NULL, "");
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NULL_CKP_KEY);
+
+ HoodieDeltaStreamer.Config cfg = createConfig(basePath(), previousCkp);
+ cfg.allowCommitOnNoCheckpointChange = true;
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc,
Option.of(props));
+ ds.sync();
+
+ Map<String, String> expectedMetadata = new HashMap<>();
+ expectedMetadata.put("schema", "");
+ expectedMetadata.put(STREAMER_CHECKPOINT_RESET_KEY_V1, previousCkp);
+ verifyLastInstantCommitMetadata(expectedMetadata);
+ }
+
+ /**
+ * Tests sync behavior with force skip checkpoint enabled.
+ *
+ * Expected behavior:
+ * - Sync completes without processing any checkpoints
+ * - Commit metadata contains only schema=""
+ * - No checkpoint information is stored regardless of source checkpoint
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"6", "8"})
+ public void testSyncE2EForceSkip(String tableVersion) throws Exception {
+ metaClient = getHoodieMetaClientWithTableVersion(storageConf(),
basePath(), tableVersion);
+ TypedProperties props = setupBaseProperties(tableVersion);
+ props.put(CHECKPOINT_FORCE_SKIP.key(), "true");
+ props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP_KEY);
+ props.put(VAL_INPUT_CKP, VAL_EMPTY_CKP_KEY);
+
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(createConfig(basePath(),
null), jsc, Option.of(props));
+ ds.sync();
+
+ Map<String, String> expectedMetadata = new HashMap<>();
+ expectedMetadata.put("schema", "");
+ verifyLastInstantCommitMetadata(expectedMetadata);
+ }
+
+ /**
+ * Tests checkpoint version selection for S3 and GCS sources.
+ *
+ * Expected behavior:
+ * - Both S3EventsHoodieIncrSource and GcsEventsHoodieIncrSource must use
checkpoint V1
+ * - This remains true for both table version 6 and 8
+ * - shouldTargetCheckpointV2() returns false in all these cases
+ *
+ * This ensures these sources maintain backward compatibility with
checkpoint V1 format
+ */
+ @Test
+ public void testTargetCheckpointV2ForS3Gcs() {
+ // To ensure we properly track sources that must use checkpoint V1.
+ assertFalse(CheckpointUtils.shouldTargetCheckpointV2(8,
S3EventsHoodieIncrSource.class.getName()));
+ assertFalse(CheckpointUtils.shouldTargetCheckpointV2(6,
S3EventsHoodieIncrSource.class.getName()));
+ assertFalse(CheckpointUtils.shouldTargetCheckpointV2(8,
GcsEventsHoodieIncrSource.class.getName()));
+ assertFalse(CheckpointUtils.shouldTargetCheckpointV2(6,
GcsEventsHoodieIncrSource.class.getName()));
+ }
+}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
index fc66cb6d2bd..832c655904b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
@@ -25,12 +25,14 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.transform.Transformer;
@@ -51,10 +53,14 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA;
+import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP;
import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
import static
org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_RESET_KEY;
import static
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
@@ -72,7 +78,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class TestStreamSync {
+public class TestStreamSync extends SparkClientFunctionalTestHarness {
@ParameterizedTest
@MethodSource("testCasesFetchNextBatchFromSource")
@@ -149,32 +155,6 @@ public class TestStreamSync {
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue());
}
- @ParameterizedTest
- @MethodSource("getCheckpointToResumeCases")
- void testGetCheckpointToResume(HoodieStreamer.Config cfg,
HoodieCommitMetadata commitMetadata, Option<String> expectedResumeCheckpoint)
throws IOException {
- // TODO(yihua): rewrite this tests
- /*
- HoodieSparkEngineContext hoodieSparkEngineContext =
mock(HoodieSparkEngineContext.class);
- HoodieStorage storage = new HoodieHadoopStorage(mock(FileSystem.class));
- TypedProperties props = new TypedProperties();
- SparkSession sparkSesion = mock(SparkSession.class);
- Configuration configuration = mock(Configuration.class);
- HoodieTimeline commitsTimeline = mock(HoodieTimeline.class);
- HoodieInstant hoodieInstant = mock(HoodieInstant.class);
-
- when(commitsTimeline.filter(any())).thenReturn(commitsTimeline);
- when(commitsTimeline.lastInstant()).thenReturn(Option.of(hoodieInstant));
-
- StreamSync streamSync = new StreamSync(cfg, sparkSession, props,
hoodieSparkEngineContext,
- storage, configuration, client -> true, null, Option.empty(), null,
Option.empty(), true);
- StreamSync spy = spy(streamSync);
-
//doReturn(Option.of(commitMetadata)).when(spy).getLatestCommitMetadataWithValidCheckpointInfo(any());
-
- Option<Checkpoint> resumeCheckpoint =
CheckpointUtils.getCheckpointToResumeFrom(Option.of(commitsTimeline), cfg,
props);
- assertEquals(expectedResumeCheckpoint, resumeCheckpoint);
- */
- }
-
@ParameterizedTest
@MethodSource("getMultiTableStreamerCases")
void testCloneConfigsFromMultiTableStreamer(HoodieMultiTableStreamer.Config
cfg) throws IOException {
@@ -319,4 +299,94 @@ public class TestStreamSync {
// then
verify(tableBuilder, times(1)).setTableVersion(HoodieTableVersion.SIX);
}
+
+ private StreamSync setupStreamSync() {
+ HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+ cfg.checkpoint = "test-checkpoint";
+ cfg.ignoreCheckpoint = "test-ignore";
+ cfg.sourceClassName = "test-source";
+
+ TypedProperties props = new TypedProperties();
+ SchemaProvider schemaProvider = mock(SchemaProvider.class);
+
+ return new StreamSync(cfg, mock(SparkSession.class), props,
+ mock(HoodieSparkEngineContext.class), mock(HoodieStorage.class),
+ mock(Configuration.class), client -> true, schemaProvider,
+ Option.empty(), mock(SourceFormatAdapter.class), Option.empty(),
false);
+ }
+
+ @Test
+ public void testExtractCheckpointMetadata_WhenForceSkipIsTrue() {
+ StreamSync streamSync = setupStreamSync();
+ HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+ TypedProperties props = new TypedProperties();
+ props.put(CHECKPOINT_FORCE_SKIP.key(), "true");
+
+ Map<String, String> result = streamSync.extractCheckpointMetadata(
+ mock(InputBatch.class), props, HoodieTableVersion.ZERO.versionCode(),
cfg);
+
+ assertTrue(result.isEmpty(), "Should return empty map when
CHECKPOINT_FORCE_SKIP is true");
+ }
+
+ @Test
+ public void testExtractCheckpointMetadata_WhenCheckpointExists() {
+ StreamSync streamSync = setupStreamSync();
+ HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+ TypedProperties props = new TypedProperties();
+ props.put(CHECKPOINT_FORCE_SKIP.key(), "false");
+
+ InputBatch inputBatch = mock(InputBatch.class);
+ Checkpoint checkpoint = mock(Checkpoint.class);
+ Map<String, String> expectedMetadata = new HashMap<>();
+ expectedMetadata.put("test", "value");
+
+ when(inputBatch.getCheckpointForNextBatch()).thenReturn(checkpoint);
+ when(checkpoint.getCheckpointCommitMetadata(cfg.checkpoint,
cfg.ignoreCheckpoint))
+ .thenReturn(expectedMetadata);
+
+ Map<String, String> result = streamSync.extractCheckpointMetadata(
+ inputBatch, props, HoodieTableVersion.ZERO.versionCode(), cfg);
+
+ assertEquals(expectedMetadata, result, "Should return checkpoint metadata
when checkpoint exists");
+ }
+
+ @Test
+ public void testExtractCheckpointMetadata_WhenCheckpointIsNullV2() {
+ StreamSync streamSync = setupStreamSync();
+ HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+ cfg.checkpoint = "test-checkpoint";
+ cfg.ignoreCheckpoint = "test-ignore";
+ TypedProperties props = new TypedProperties();
+
+ InputBatch inputBatch = mock(InputBatch.class);
+ when(inputBatch.getCheckpointForNextBatch()).thenReturn(null);
+
+ Map<String, String> result = streamSync.extractCheckpointMetadata(
+ inputBatch, props, HoodieTableVersion.EIGHT.versionCode(), cfg);
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put(CHECKPOINT_IGNORE_KEY, "test-ignore");
+ expected.put(STREAMER_CHECKPOINT_RESET_KEY_V2, "test-checkpoint");
+ assertEquals(expected, result, "Should return default metadata when
checkpoint is null");
+ }
+
+ @Test
+ public void testExtractCheckpointMetadata_WhenCheckpointIsNullV1() {
+ StreamSync streamSync = setupStreamSync();
+ HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+ cfg.checkpoint = "test-checkpoint";
+ cfg.ignoreCheckpoint = "test-ignore";
+ TypedProperties props = new TypedProperties();
+
+ InputBatch inputBatch = mock(InputBatch.class);
+ when(inputBatch.getCheckpointForNextBatch()).thenReturn(null);
+
+ Map<String, String> result = streamSync.extractCheckpointMetadata(
+ inputBatch, props, HoodieTableVersion.SIX.versionCode(), cfg);
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put(CHECKPOINT_IGNORE_KEY, "test-ignore");
+ expected.put(CHECKPOINT_RESET_KEY, "test-checkpoint");
+ assertEquals(expected, result, "Should return default metadata when
checkpoint is null");
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
new file mode 100644
index 00000000000..113f8e3f8d8
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.exception.HoodieStreamerException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
+import static
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+public class TestStreamerCheckpointUtils extends
SparkClientFunctionalTestHarness {
+ private TypedProperties props;
+ private HoodieStreamer.Config streamerConfig;
+ protected HoodieTableMetaClient metaClient;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ metaClient = HoodieTestUtils.init(basePath(),
HoodieTableType.COPY_ON_WRITE);
+ props = new TypedProperties();
+ streamerConfig = new HoodieStreamer.Config();
+ streamerConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
+ }
+
+ @Test
+ public void testEmptyTimelineCase() throws IOException {
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.isEmpty());
+ }
+
+ @Test
+ public void testIgnoreCheckpointCaseEmptyIgnoreKey() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(CHECKPOINT_KEY, "ckp_key");
+ extraMetadata.put(CHECKPOINT_IGNORE_KEY, "");
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.ignoreCheckpoint = "ignore_checkpoint_1";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.isEmpty());
+ }
+
+ @Test
+ public void testIgnoreCheckpointCaseIgnoreKeyMismatch() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(CHECKPOINT_KEY, "ckp_key");
+ extraMetadata.put(CHECKPOINT_IGNORE_KEY, "ignore_checkpoint_2");
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.ignoreCheckpoint = "ignore_checkpoint_1";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.isEmpty());
+ }
+
+ @Test
+ public void testThrowExceptionCase() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(CHECKPOINT_KEY, "");
+ extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, "old-reset-key");
+ createCommit(commitTime, extraMetadata);
+
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ HoodieStreamerException exception =
assertThrows(HoodieStreamerException.class, () -> {
+ StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ });
+ assertTrue(exception.getMessage().contains("Unable to find previous
checkpoint"));
+ }
+
+ @Test
+ public void testNewCheckpointV2WithResetKeyCase() throws IOException {
+ String commitTime = "0000000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(CHECKPOINT_KEY, "");
+ extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, "old-reset-key");
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.checkpoint = "earliest";
+ streamerConfig.sourceClassName =
"org.apache.hudi.utilities.sources.KafkaSource";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.get() instanceof StreamerCheckpointV1);
+ assertEquals("earliest", checkpoint.get().getCheckpointKey());
+ }
+
+ @Test
+ public void testNewCheckpointV1WithResetKeyCase() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, "old-reset-key");
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.checkpoint = "earliest";
+ streamerConfig.sourceClassName =
"org.apache.hudi.utilities.sources.KafkaSource";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "1");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.get() instanceof StreamerCheckpointV1);
+ assertEquals("earliest", checkpoint.get().getCheckpointKey());
+ }
+
+ @Test
+ public void testReuseCheckpointCase() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(CHECKPOINT_KEY, "earliest-0-100");
+ extraMetadata.put(CHECKPOINT_IGNORE_KEY, "");
+ extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, "");
+ createCommit(commitTime, extraMetadata);
+
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertEquals("earliest-0-100", checkpoint.get().getCheckpointKey());
+ }
+
+ public void testNewCheckpointV2NoMetadataCase() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.checkpoint = "earliest";
+ streamerConfig.sourceClassName =
"org.apache.hudi.utilities.sources.KafkaSource";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.get() instanceof StreamerCheckpointV2);
+ assertEquals("earliest", checkpoint.get().getCheckpointKey());
+ }
+
+ @Test
+ public void testNewCheckpointV1NoMetadataCase() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.checkpoint = "earliest";
+ streamerConfig.sourceClassName =
"org.apache.hudi.utilities.sources.KafkaSource";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "1");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.get() instanceof StreamerCheckpointV1);
+ assertEquals("earliest", checkpoint.get().getCheckpointKey());
+ }
+
+ private void createCommit(String commitTime, Map<String, String>
extraMetadata) throws IOException {
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ HoodieInstant instant = new HoodieInstant(HoodieInstant.State.INFLIGHT,
+ HoodieTimeline.COMMIT_ACTION, commitTime,
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+ timeline.createNewInstant(instant);
+ timeline.saveAsComplete(instant,
+ Option.of(HoodieCommonTestHarness.getCommitMetadata(metaClient,
basePath(), "partition1", commitTime, 2, extraMetadata)));
+ metaClient.reloadActiveTimeline();
+ }
+
+ @Test
+ public void testIgnoreCheckpointNullKeyCase() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ // Set empty ignore key
+ extraMetadata.put(CHECKPOINT_KEY, "some-checkpoint");
+ extraMetadata.put(CHECKPOINT_IGNORE_KEY, "");
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.ignoreCheckpoint = "ignore_checkpoint_1";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.isEmpty());
+ }
+
+ @Test
+ public void testNewCheckpointWithEmptyResetKey() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "old-checkpoint");
+ extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, ""); // Empty reset
key
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.checkpoint = "new-checkpoint";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.get() instanceof StreamerCheckpointV1);
+ assertEquals("new-checkpoint", checkpoint.get().getCheckpointKey());
+ }
+
+ @Test
+ public void testNewCheckpointWithDifferentResetKey() throws IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "old-checkpoint");
+ extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY,
"different-reset-key");
+ createCommit(commitTime, extraMetadata);
+
+ streamerConfig.checkpoint = "new-checkpoint";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+ assertTrue(checkpoint.get() instanceof StreamerCheckpointV1);
+ assertEquals("new-checkpoint", checkpoint.get().getCheckpointKey());
+ }
+
+ @Test
+ public void testMergeOnReadWithDeltaCommits() throws IOException {
+ // Setup MOR table
+ metaClient = HoodieTestUtils.init(basePath(),
HoodieTableType.MERGE_ON_READ);
+ streamerConfig.tableType = HoodieTableType.MERGE_ON_READ.name();
+
+ // Create a commit and deltacommit
+ String commitTime = "20240120000000";
+ String deltaCommitTime = "20240120000001";
+
+ // Create commit
+ Map<String, String> commitMetadata = new HashMap<>();
+ commitMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "commit-cp");
+ createCommit(commitTime, commitMetadata);
+
+ // Create deltacommit
+ Map<String, String> deltaCommitMetadata = new HashMap<>();
+ deltaCommitMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "deltacommit-cp");
+ createDeltaCommit(deltaCommitTime, deltaCommitMetadata);
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+
+ // Should use deltacommit checkpoint
+ assertEquals("deltacommit-cp", checkpoint.get().getCheckpointKey());
+ }
+
+ @Test
+ public void testMergeOnReadWithoutDeltaCommits() throws IOException {
+ // Setup MOR table
+ metaClient = HoodieTestUtils.init(basePath(),
HoodieTableType.MERGE_ON_READ);
+ streamerConfig.tableType = HoodieTableType.MERGE_ON_READ.name();
+
+ // Create only commit
+ String commitTime = "20240120000000";
+ Map<String, String> commitMetadata = new HashMap<>();
+ commitMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "commit-cp");
+ createCommit(commitTime, commitMetadata);
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeString(
+ metaClient.getActiveTimeline(), streamerConfig, props);
+
+ // Should use commit checkpoint
+ assertEquals("commit-cp", checkpoint.get().getCheckpointKey());
+ }
+
+ private void createDeltaCommit(String deltaCommitTime, Map<String, String>
extraMetadata) throws IOException {
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ HoodieInstant instant = new HoodieInstant(HoodieInstant.State.INFLIGHT,
+ HoodieTimeline.DELTA_COMMIT_ACTION, deltaCommitTime,
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+ timeline.createNewInstant(instant);
+ timeline.saveAsComplete(instant,
+ Option.of(HoodieCommonTestHarness.getCommitMetadata(metaClient,
basePath(), "partition1", deltaCommitTime, 2, extraMetadata)));
+ metaClient.reloadActiveTimeline();
+ }
+
+ @Test
+ public void testCreateNewCheckpointV2WithNullTimeline() throws IOException {
+ streamerConfig.checkpoint = "test-cp";
+ streamerConfig.sourceClassName =
"org.apache.hudi.utilities.sources.KafkaSource";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeFrom(
+ Option.empty(), streamerConfig, props);
+ assertTrue(checkpoint.get() instanceof StreamerCheckpointV1);
+ assertEquals("test-cp", checkpoint.get().getCheckpointKey());
+ }
+
+ @Test
+ public void testCreateNewCheckpointV1WithNullTimeline() throws IOException {
+ streamerConfig.checkpoint = "test-cp";
+ props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "1");
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeFrom(
+ Option.empty(), streamerConfig, props);
+ assertTrue(checkpoint.get() instanceof StreamerCheckpointV1);
+ assertEquals("test-cp", checkpoint.get().getCheckpointKey());
+ }
+
+ @Test
+ public void testEmptyTimelineAndNullCheckpoint() throws IOException {
+ streamerConfig.checkpoint = null;
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeFrom(
+ Option.empty(), streamerConfig, props);
+ assertTrue(checkpoint.isEmpty());
+ }
+
+ @Test
+ public void testTimelineWithCheckpointOverridesConfigCheckpoint() throws
IOException {
+ String commitTime = "20240120000000";
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(HoodieStreamer.CHECKPOINT_KEY, "commit-cp");
+ createCommit(commitTime, metadata);
+
+ streamerConfig.checkpoint = "config-cp";
+
+ Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.getCheckpointToResumeFrom(
+ Option.of(metaClient.getActiveTimeline()), streamerConfig, props);
+ assertEquals("config-cp", checkpoint.get().getCheckpointKey());
+ }
+}