This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 38c87b7ebe1 [HUDI-7004] Add support of snapshotLoadQuerySplitter in
s3/gcs sources (#10152)
38c87b7ebe1 is described below
commit 38c87b7ebe148e8870db83be433376ad89b9c048
Author: harshal <[email protected]>
AuthorDate: Wed Nov 22 20:53:42 2023 +0530
[HUDI-7004] Add support of snapshotLoadQuerySplitter in s3/gcs sources
(#10152)
---
.../apache/hudi/common/config/TypedProperties.java | 5 ++
.../sources/GcsEventsHoodieIncrSource.java | 7 +-
.../hudi/utilities/sources/HoodieIncrSource.java | 6 +-
.../sources/S3EventsHoodieIncrSource.java | 9 ++-
.../sources/SnapshotLoadQuerySplitter.java | 9 +++
.../utilities/sources/helpers/QueryRunner.java | 35 +++++----
.../sources/TestGcsEventsHoodieIncrSource.java | 85 ++++++++++++++++++++--
.../sources/TestS3EventsHoodieIncrSource.java | 78 ++++++++++++++++++--
8 files changed, 198 insertions(+), 36 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java
index 3db8210cade..86b7f4cc457 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.config;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import java.io.Serializable;
@@ -78,6 +79,10 @@ public class TypedProperties extends Properties implements
Serializable {
return containsKey(property) ? getProperty(property) : defaultValue;
}
+ public Option<String> getNonEmptyStringOpt(String property, String
defaultValue) {
+ return Option.ofNullable(StringUtils.emptyToNull(getString(property,
defaultValue)));
+ }
+
public List<String> getStringList(String property, String delimiter,
List<String> defaultVal) {
if (!containsKey(property)) {
return defaultVal;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index d09bad71916..a06130d3972 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -114,6 +114,7 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
private final CloudDataFetcher gcsObjectDataFetcher;
private final QueryRunner queryRunner;
private final Option<SchemaProvider> schemaProvider;
+ private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
public static final String GCS_OBJECT_KEY = "name";
@@ -145,6 +146,7 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
this.gcsObjectDataFetcher = gcsObjectDataFetcher;
this.queryRunner = queryRunner;
this.schemaProvider = Option.ofNullable(schemaProvider);
+ this.snapshotLoadQuerySplitter =
SnapshotLoadQuerySplitter.getInstance(props);
LOG.info("srcPath: " + srcPath);
LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
@@ -171,8 +173,9 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
return Pair.of(Option.empty(), queryInfo.getStartInstant());
}
- Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo);
- Dataset<Row> filteredSourceData =
gcsObjectMetadataFetcher.applyFilter(cloudObjectMetadataDF);
+ Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair =
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
+ Dataset<Row> filteredSourceData =
gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight());
+ queryInfo = queryInfoDatasetPair.getLeft();
LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 1d302fa106b..f87e5c231bf 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -56,7 +55,6 @@ import static
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
-import static
org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
@@ -150,9 +148,7 @@ public class HoodieIncrSource extends RowSource {
}
}
- this.snapshotLoadQuerySplitter =
Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME,
null))
- .map(className -> (SnapshotLoadQuerySplitter)
ReflectionUtils.loadClass(className,
- new Class<?>[] {TypedProperties.class}, props));
+ this.snapshotLoadQuerySplitter =
SnapshotLoadQuerySplitter.getInstance(props);
}
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 4b9be847c75..325e494e0ab 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -80,6 +80,8 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
private final Option<SchemaProvider> schemaProvider;
+ private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
+
public static class Config {
// control whether we do existence check for files before consuming them
@Deprecated
@@ -138,6 +140,7 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
this.queryRunner = queryRunner;
this.cloudDataFetcher = cloudDataFetcher;
this.schemaProvider = Option.ofNullable(schemaProvider);
+ this.snapshotLoadQuerySplitter =
SnapshotLoadQuerySplitter.getInstance(props);
}
@Override
@@ -158,9 +161,9 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
LOG.warn("Already caught up. No new data to process");
return Pair.of(Option.empty(), queryInfo.getEndInstant());
}
-
- Dataset<Row> source = queryRunner.run(queryInfo);
- Dataset<Row> filteredSourceData = applyFilter(source, fileFormat);
+ Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair =
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
+ queryInfo = queryInfoDatasetPair.getLeft();
+ Dataset<Row> filteredSourceData =
applyFilter(queryInfoDatasetPair.getRight(), fileFormat);
LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
index 6a13607b1d5..ca299122ec7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
@@ -20,10 +20,13 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import static
org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME;
+
/**
* Abstract splitter responsible for managing the snapshot load query
operations.
*/
@@ -75,4 +78,10 @@ public abstract class SnapshotLoadQuerySplitter {
.map(checkpoint -> queryInfo.withUpdatedEndInstant(checkpoint))
.orElse(queryInfo);
}
+
+ public static Option<SnapshotLoadQuerySplitter> getInstance(TypedProperties
props) {
+ return props.getNonEmptyStringOpt(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME,
null)
+ .map(className -> (SnapshotLoadQuerySplitter)
ReflectionUtils.loadClass(className,
+ new Class<?>[] {TypedProperties.class}, props));
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
index ef903d7c647..2f0a8bf488e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -21,9 +21,12 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
+import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -62,16 +65,14 @@ public class QueryRunner {
* @param queryInfo all meta info about the query to be executed.
* @return the output of the query as Dataset < Row >.
*/
- public Dataset<Row> run(QueryInfo queryInfo) {
- Dataset<Row> dataset = null;
+ public Pair<QueryInfo, Dataset<Row>> run(QueryInfo queryInfo,
Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) {
if (queryInfo.isIncremental()) {
- dataset = runIncrementalQuery(queryInfo);
+ return runIncrementalQuery(queryInfo);
} else if (queryInfo.isSnapshot()) {
- dataset = runSnapshotQuery(queryInfo);
+ return runSnapshotQuery(queryInfo, snapshotLoadQuerySplitterOption);
} else {
throw new HoodieException("Unknown query type " +
queryInfo.getQueryType());
}
- return dataset;
}
public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String>
orderByColumns) {
@@ -82,26 +83,34 @@ public class QueryRunner {
return dataset;
}
- public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
+ public Pair<QueryInfo, Dataset<Row>> runIncrementalQuery(QueryInfo
queryInfo) {
LOG.info("Running incremental query");
- return sparkSession.read().format("org.apache.hudi")
+ return Pair.of(queryInfo, sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(),
queryInfo.getQueryType())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(),
queryInfo.getPreviousInstant())
.option(DataSourceReadOptions.END_INSTANTTIME().key(),
queryInfo.getEndInstant())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue()))
- .load(sourcePath);
+ .load(sourcePath));
}
- public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
+ public Pair<QueryInfo, Dataset<Row>> runSnapshotQuery(QueryInfo queryInfo,
Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) {
LOG.info("Running snapshot query");
- return sparkSession.read().format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE().key(),
queryInfo.getQueryType()).load(sourcePath)
+ Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
queryInfo.getQueryType()).load(sourcePath);
+ QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption
+ .map(snapshotLoadQuerySplitter ->
snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo))
+ .orElse(queryInfo);
+ return Pair.of(snapshotQueryInfo, applySnapshotQueryFilters(snapshot,
snapshotQueryInfo));
+ }
+
+ public Dataset<Row> applySnapshotQueryFilters(Dataset<Row> snapshot,
QueryInfo snapshotQueryInfo) {
+ return snapshot
// add filtering so that only interested records are returned.
.filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- queryInfo.getStartInstant()))
+ snapshotQueryInfo.getStartInstant()))
.filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- queryInfo.getEndInstant()));
+ snapshotQueryInfo.getEndInstant()));
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 5c31f310800..bc2906d251f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -40,6 +40,7 @@ import
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
@@ -56,6 +57,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -93,6 +96,8 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
@Mock
QueryRunner queryRunner;
+ @Mock
+ QueryInfo queryInfo;
protected Option<SchemaProvider> schemaProvider;
private HoodieTableMetaClient metaClient;
@@ -142,7 +147,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
}
@@ -160,7 +165,8 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+
+ setMockQueryRunner(inputDs);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, "1#path/to/file2.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
250L, "1#path/to/file3.json");
@@ -183,7 +189,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, "1#path/to/file10006.json");
readAndAssert(READ_UPTO_LATEST_COMMIT,
Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
@@ -205,7 +211,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
100L, "1#path/to/file2.json");
@@ -213,10 +219,68 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
}
+ @ParameterizedTest
+ @CsvSource({
+ "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
+ "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
+ "3,3#path/to/file5.json,3,1#path/to/file1.json,3"
+ })
+ public void testSplitSnapshotLoad(String snapshotCheckPoint, String
exptected1, String exptected2, String exptected3, String exptected4) throws
IOException {
+
+ writeGcsMetadataRecords("1");
+ writeGcsMetadataRecords("2");
+ writeGcsMetadataRecords("3");
+
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3"));
+
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
+ TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix",
"path/to/skip");
+ //1. snapshot query, read all records
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1,
typedProperties);
+ //2. incremental query, as commit is present in timeline
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L,
exptected2, typedProperties);
+ //3. snapshot query with source limit less than first commit size
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3,
typedProperties);
+
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix",
"path/to");
+ //4. As snapshotQuery will return 1 -> same would be return as
nextCheckpoint (dataset is empty due to ignore prefix).
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
+ }
+
+ private void setMockQueryRunner(Dataset<Row> inputDs) {
+ setMockQueryRunner(inputDs, Option.empty());
+ }
+
+ private void setMockQueryRunner(Dataset<Row> inputDs, Option<String>
nextCheckPointOpt) {
+
+ when(queryRunner.run(Mockito.any(QueryInfo.class),
Mockito.any())).thenAnswer(invocation -> {
+ QueryInfo queryInfo = invocation.getArgument(0);
+ QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
+ queryInfo.withUpdatedEndInstant(nextCheckPoint))
+ .orElse(queryInfo);
+ if (updatedQueryInfo.isSnapshot()) {
+ return Pair.of(updatedQueryInfo,
+ inputDs.filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ updatedQueryInfo.getStartInstant()))
+ .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ updatedQueryInfo.getEndInstant())));
+ }
+ return Pair.of(updatedQueryInfo, inputDs);
+ });
+ }
+
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
- Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
- TypedProperties typedProperties = setProps(missingCheckpointStrategy);
- typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format",
"json");
+ Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint,
+ TypedProperties typedProperties) {
GcsEventsHoodieIncrSource incrSource = new
GcsEventsHoodieIncrSource(typedProperties, jsc(),
spark(), schemaProvider.orElse(null), new
GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher,
queryRunner);
@@ -230,6 +294,13 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
}
+ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
+ Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
+ TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+ typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format",
"json");
+ readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit,
expectedCheckpoint, typedProperties);
+ }
+
private HoodieRecord getGcsMetadataRecord(String commitTime, String
filename, String bucketName, String generation) {
String partitionPath = bucketName;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 7d58d21d874..e0af8d73e26 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -40,6 +40,7 @@ import
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
@@ -56,6 +57,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -88,6 +91,8 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
QueryRunner mockQueryRunner;
@Mock
CloudDataFetcher mockCloudDataFetcher;
+ @Mock
+ QueryInfo queryInfo;
private JavaSparkContext jsc;
private HoodieTableMetaClient metaClient;
@@ -248,7 +253,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());
@@ -273,7 +278,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());
@@ -301,7 +306,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());
@@ -329,7 +334,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
@@ -361,7 +366,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
@@ -393,7 +398,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ setMockQueryRunner(inputDs);
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
@@ -407,6 +412,45 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"),
50L, "3#path/to/file4.json", typedProperties);
}
+ @ParameterizedTest
+ @CsvSource({
+ "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
+ "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
+ "3,3#path/to/file5.json,3,1#path/to/file1.json,3"
+ })
+ public void testSplitSnapshotLoad(String snapshotCheckPoint, String
exptected1, String exptected2, String exptected3, String exptected4) throws
IOException {
+
+ writeS3MetadataRecords("1");
+ writeS3MetadataRecords("2");
+ writeS3MetadataRecords("3");
+
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3"));
+
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
+ when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
+ .thenReturn(Option.empty());
+ TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
+ //1. snapshot query, read all records
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1,
typedProperties);
+ //2. incremental query, as commit is present in timeline
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L,
exptected2, typedProperties);
+ //3. snapshot query with source limit less than first commit size
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3,
typedProperties);
+
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
"path/to");
+ //4. As snapshotQuery will return 1 -> same would be return as
nextCheckpoint (dataset is empty due to ignore prefix).
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
+ }
+
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint,
TypedProperties typedProperties) {
@@ -422,6 +466,28 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
}
+ private void setMockQueryRunner(Dataset<Row> inputDs) {
+ setMockQueryRunner(inputDs, Option.empty());
+ }
+
+ private void setMockQueryRunner(Dataset<Row> inputDs, Option<String>
nextCheckPointOpt) {
+
+ when(mockQueryRunner.run(Mockito.any(QueryInfo.class),
Mockito.any())).thenAnswer(invocation -> {
+ QueryInfo queryInfo = invocation.getArgument(0);
+ QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
+ queryInfo.withUpdatedEndInstant(nextCheckPoint))
+ .orElse(queryInfo);
+ if (updatedQueryInfo.isSnapshot()) {
+ return Pair.of(updatedQueryInfo,
+ inputDs.filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ updatedQueryInfo.getStartInstant()))
+ .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ updatedQueryInfo.getEndInstant())));
+ }
+ return Pair.of(updatedQueryInfo, inputDs);
+ });
+ }
+
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
TypedProperties typedProperties = setProps(missingCheckpointStrategy);