This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 38c87b7ebe1 [HUDI-7004] Add support of snapshotLoadQuerySplitter in s3/gcs sources (#10152) 38c87b7ebe1 is described below commit 38c87b7ebe148e8870db83be433376ad89b9c048 Author: harshal <harshal.j.pa...@gmail.com> AuthorDate: Wed Nov 22 20:53:42 2023 +0530 [HUDI-7004] Add support of snapshotLoadQuerySplitter in s3/gcs sources (#10152) --- .../apache/hudi/common/config/TypedProperties.java | 5 ++ .../sources/GcsEventsHoodieIncrSource.java | 7 +- .../hudi/utilities/sources/HoodieIncrSource.java | 6 +- .../sources/S3EventsHoodieIncrSource.java | 9 ++- .../sources/SnapshotLoadQuerySplitter.java | 9 +++ .../utilities/sources/helpers/QueryRunner.java | 35 +++++---- .../sources/TestGcsEventsHoodieIncrSource.java | 85 ++++++++++++++++++++-- .../sources/TestS3EventsHoodieIncrSource.java | 78 ++++++++++++++++++-- 8 files changed, 198 insertions(+), 36 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java index 3db8210cade..86b7f4cc457 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.config; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import java.io.Serializable; @@ -78,6 +79,10 @@ public class TypedProperties extends Properties implements Serializable { return containsKey(property) ? getProperty(property) : defaultValue; } + public Option<String> getNonEmptyStringOpt(String property, String defaultValue) { + return Option.ofNullable(StringUtils.emptyToNull(getString(property, defaultValue))); + } + public List<String> getStringList(String property, String delimiter, List<String> defaultVal) { if (!containsKey(property)) { return defaultVal; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index d09bad71916..a06130d3972 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -114,6 +114,7 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { private final CloudDataFetcher gcsObjectDataFetcher; private final QueryRunner queryRunner; private final Option<SchemaProvider> schemaProvider; + private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter; public static final String GCS_OBJECT_KEY = "name"; @@ -145,6 +146,7 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { this.gcsObjectDataFetcher = gcsObjectDataFetcher; this.queryRunner = queryRunner; this.schemaProvider = Option.ofNullable(schemaProvider); + this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); LOG.info("srcPath: " + srcPath); LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy); @@ -171,8 +173,9 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { return Pair.of(Option.empty(), queryInfo.getStartInstant()); } - Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo); - Dataset<Row> filteredSourceData = gcsObjectMetadataFetcher.applyFilter(cloudObjectMetadataDF); + Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter); + Dataset<Row> filteredSourceData = gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight()); + queryInfo = queryInfoDatasetPair.getLeft(); LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 1d302fa106b..f87e5c231bf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -56,7 +55,6 @@ import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger; -import static org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode; @@ -150,9 +148,7 @@ public class HoodieIncrSource extends RowSource { } } - this.snapshotLoadQuerySplitter = Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null)) - .map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className, - new Class<?>[] {TypedProperties.class}, props)); + this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 4b9be847c75..325e494e0ab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -80,6 +80,8 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { private final Option<SchemaProvider> schemaProvider; + private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter; + public static class Config { // control whether we do existence check for files before consuming them @Deprecated @@ -138,6 +140,7 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { this.queryRunner = queryRunner; this.cloudDataFetcher = cloudDataFetcher; this.schemaProvider = Option.ofNullable(schemaProvider); + this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); } @Override @@ -158,9 +161,9 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { LOG.warn("Already caught up. No new data to process"); return Pair.of(Option.empty(), queryInfo.getEndInstant()); } - - Dataset<Row> source = queryRunner.run(queryInfo); - Dataset<Row> filteredSourceData = applyFilter(source, fileFormat); + Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter); + queryInfo = queryInfoDatasetPair.getLeft(); + Dataset<Row> filteredSourceData = applyFilter(queryInfoDatasetPair.getRight(), fileFormat); LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java index 6a13607b1d5..ca299122ec7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java @@ -20,10 +20,13 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import static org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME; + /** * Abstract splitter responsible for managing the snapshot load query operations. */ @@ -75,4 +78,10 @@ public abstract class SnapshotLoadQuerySplitter { .map(checkpoint -> queryInfo.withUpdatedEndInstant(checkpoint)) .orElse(queryInfo); } + + public static Option<SnapshotLoadQuerySplitter> getInstance(TypedProperties props) { + return props.getNonEmptyStringOpt(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null) + .map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className, + new Class<?>[] {TypedProperties.class}, props)); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index ef903d7c647..2f0a8bf488e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -21,9 +21,12 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; +import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -62,16 +65,14 @@ public class QueryRunner { * @param queryInfo all meta info about the query to be executed. * @return the output of the query as Dataset < Row >. */ - public Dataset<Row> run(QueryInfo queryInfo) { - Dataset<Row> dataset = null; + public Pair<QueryInfo, Dataset<Row>> run(QueryInfo queryInfo, Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) { if (queryInfo.isIncremental()) { - dataset = runIncrementalQuery(queryInfo); + return runIncrementalQuery(queryInfo); } else if (queryInfo.isSnapshot()) { - dataset = runSnapshotQuery(queryInfo); + return runSnapshotQuery(queryInfo, snapshotLoadQuerySplitterOption); } else { throw new HoodieException("Unknown query type " + queryInfo.getQueryType()); } - return dataset; } public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String> orderByColumns) { @@ -82,26 +83,34 @@ public class QueryRunner { return dataset; } - public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) { + public Pair<QueryInfo, Dataset<Row>> runIncrementalQuery(QueryInfo queryInfo) { LOG.info("Running incremental query"); - return sparkSession.read().format("org.apache.hudi") + return Pair.of(queryInfo, sparkSession.read().format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant()) .option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()) .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())) - .load(sourcePath); + .load(sourcePath)); } - public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) { + public Pair<QueryInfo, Dataset<Row>> runSnapshotQuery(QueryInfo queryInfo, Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) { LOG.info("Running snapshot query"); - return sparkSession.read().format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(sourcePath) + Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(sourcePath); + QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption + .map(snapshotLoadQuerySplitter -> snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo)) + .orElse(queryInfo); + return Pair.of(snapshotQueryInfo, applySnapshotQueryFilters(snapshot, snapshotQueryInfo)); + } + + public Dataset<Row> applySnapshotQueryFilters(Dataset<Row> snapshot, QueryInfo snapshotQueryInfo) { + return snapshot // add filtering so that only interested records are returned. .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryInfo.getStartInstant())) + snapshotQueryInfo.getStartInstant())) .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryInfo.getEndInstant())); + snapshotQueryInfo.getEndInstant())); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 5c31f310800..bc2906d251f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -40,6 +40,7 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher; @@ -56,6 +57,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -93,6 +96,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn @Mock QueryRunner queryRunner; + @Mock + QueryInfo queryInfo; protected Option<SchemaProvider> schemaProvider; private HoodieTableMetaClient metaClient; @@ -142,7 +147,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); } @@ -160,7 +165,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + + setMockQueryRunner(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, "1#path/to/file3.json"); @@ -183,7 +189,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file10006.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json"); @@ -205,7 +211,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 100L, "1#path/to/file2.json"); @@ -213,10 +219,68 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); } + @ParameterizedTest + @CsvSource({ + "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", + "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", + "3,3#path/to/file5.json,3,1#path/to/file1.json,3" + }) + public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException { + + writeGcsMetadataRecords("1"); + writeGcsMetadataRecords("2"); + writeGcsMetadataRecords("3"); + + List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); + // Add file paths and sizes to the list + filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3")); + + Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); + + setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip"); + //1. snapshot query, read all records + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties); + //2. incremental query, as commit is present in timeline + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties); + //3. snapshot query with source limit less than first commit size + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties); + typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix", "path/to"); + //4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix). + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); + } + + private void setMockQueryRunner(Dataset<Row> inputDs) { + setMockQueryRunner(inputDs, Option.empty()); + } + + private void setMockQueryRunner(Dataset<Row> inputDs, Option<String> nextCheckPointOpt) { + + when(queryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> { + QueryInfo queryInfo = invocation.getArgument(0); + QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> + queryInfo.withUpdatedEndInstant(nextCheckPoint)) + .orElse(queryInfo); + if (updatedQueryInfo.isSnapshot()) { + return Pair.of(updatedQueryInfo, + inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getStartInstant())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getEndInstant()))); + } + return Pair.of(updatedQueryInfo, inputDs); + }); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, - Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) { - TypedProperties typedProperties = setProps(missingCheckpointStrategy); - typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json"); + Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint, + TypedProperties typedProperties) { GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(), spark(), schemaProvider.orElse(null), new GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher, queryRunner); @@ -230,6 +294,13 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Assertions.assertEquals(expectedCheckpoint, nextCheckPoint); } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, + Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) { + TypedProperties typedProperties = setProps(missingCheckpointStrategy); + typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json"); + readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties); + } + private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation) { String partitionPath = bucketName; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 7d58d21d874..e0af8d73e26 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -40,6 +40,7 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon; @@ -56,6 +57,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -88,6 +91,8 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne QueryRunner mockQueryRunner; @Mock CloudDataFetcher mockCloudDataFetcher; + @Mock + QueryInfo queryInfo; private JavaSparkContext jsc; private HoodieTableMetaClient metaClient; @@ -248,7 +253,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); @@ -273,7 +278,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); @@ -301,7 +306,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); @@ -329,7 +334,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); @@ -361,7 +366,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); @@ -393,7 +398,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); @@ -407,6 +412,45 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties); } + @ParameterizedTest + @CsvSource({ + "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", + "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", + "3,3#path/to/file5.json,3,1#path/to/file1.json,3" + }) + public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException { + + writeS3MetadataRecords("1"); + writeS3MetadataRecords("2"); + writeS3MetadataRecords("3"); + + List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); + // Add file paths and sizes to the list + filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3")); + + Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); + + setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) + .thenReturn(Option.empty()); + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + //1. snapshot query, read all records + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties); + //2. incremental query, as commit is present in timeline + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties); + //3. snapshot query with source limit less than first commit size + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties); + typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to"); + //4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix). + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint, TypedProperties typedProperties) { @@ -422,6 +466,28 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Assertions.assertEquals(expectedCheckpoint, nextCheckPoint); } + private void setMockQueryRunner(Dataset<Row> inputDs) { + setMockQueryRunner(inputDs, Option.empty()); + } + + private void setMockQueryRunner(Dataset<Row> inputDs, Option<String> nextCheckPointOpt) { + + when(mockQueryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> { + QueryInfo queryInfo = invocation.getArgument(0); + QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> + queryInfo.withUpdatedEndInstant(nextCheckPoint)) + .orElse(queryInfo); + if (updatedQueryInfo.isSnapshot()) { + return Pair.of(updatedQueryInfo, + inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getStartInstant())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getEndInstant()))); + } + return Pair.of(updatedQueryInfo, inputDs); + }); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) { TypedProperties typedProperties = setProps(missingCheckpointStrategy);