This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5b99ed406caac976d893c3fb0250163808c00cca Author: lokesh-lingarajan-0310 <[email protected]> AuthorDate: Mon Sep 11 10:26:24 2023 -0700 [HUDI-6738] - Apply object filter before checkpoint batching in GcsEventsHoodieIncrSource (#9538) Apply filtering before we start checkpoint batching. This change list will bring GCS job similar to S3 job. --------- Co-authored-by: Lokesh Lingarajan <[email protected]> Co-authored-by: sivabalan <[email protected]> --- .../sources/GcsEventsHoodieIncrSource.java | 3 +- .../helpers/gcs/GcsObjectMetadataFetcher.java | 17 ++- .../sources/TestGcsEventsHoodieIncrSource.java | 169 ++++++--------------- 3 files changed, 63 insertions(+), 126 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index 891881095fd..d09bad71916 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -172,10 +172,11 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { } Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo); + Dataset<Row> filteredSourceData = gcsObjectMetadataFetcher.applyFilter(cloudObjectMetadataDF); LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - cloudObjectMetadataDF, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); + filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); if (!checkPointAndDataset.getRight().isPresent()) { LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant()); return Pair.of(Option.empty(), queryInfo.getEndInstant()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java index 08116ac0fa5..c92901d14cf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java @@ -78,19 +78,26 @@ public class GcsObjectMetadataFetcher implements Serializable { * @return A {@link List} of {@link CloudObjectMetadata} containing GCS info. */ public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc, Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists) { - String filter = createFilter(); - LOG.info("Adding filter string to Dataset: " + filter); - SerializableConfiguration serializableHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration()); - return cloudObjectMetadataDF - .filter(filter) .select("bucket", "name", "size") .distinct() .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX, serializableHadoopConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class)) .collectAsList(); } + /** + * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS objects. Assumed to be a persisted form + * of a Cloud Storage Pubsub Notification event. + * @return Dataset<Row> after apply the filtering. + */ + public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) { + String filter = createFilter(); + LOG.info("Adding filter string to Dataset: " + filter); + + return cloudObjectMetadataDF.filter(filter); + } + /** * Add optional filters that narrow down the list of GCS objects to fetch. */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index cc80123a19c..5c31f310800 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -39,7 +39,6 @@ import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; -import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher; @@ -53,10 +52,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.expressions.GenericRow; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -78,9 +73,6 @@ import java.util.stream.Collectors; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -96,9 +88,6 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn @TempDir protected java.nio.file.Path tempDir; - @Mock - GcsObjectMetadataFetcher gcsObjectMetadataFetcher; - @Mock CloudDataFetcher gcsObjectDataFetcher; @@ -135,10 +124,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Pair<String, List<HoodieRecord>> inserts = writeGcsMetadataRecords(commitTimeForWrites); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, 0, inserts.getKey()); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, inserts.getKey()); - verify(gcsObjectMetadataFetcher, times(0)).getGcsObjectMetadata(Mockito.any(), Mockito.any(), - anyBoolean()); verify(gcsObjectDataFetcher, times(0)).getCloudObjectDataDF( Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider)); } @@ -147,24 +134,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws IOException { String commitTimeForWrites = "2"; String commitTimeForReads = "1"; - Pair<String, List<HoodieRecord>> inserts = writeGcsMetadataRecords(commitTimeForWrites); - List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList( - new CloudObjectMetadata("data-file-1.json", 1), - new CloudObjectMetadata("data-file-2.json", 1)); - when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(), Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList); - - List<Row> recs = Arrays.asList( - new GenericRow(new String[] {"1", "Hello 1"}), - new GenericRow(new String[] {"2", "Hello 2"}), - new GenericRow(new String[] {"3", "Hello 3"}), - new GenericRow(new String[] {"4", "Hello 4"}) - ); - StructType schema = new StructType(new StructField[] { - DataTypes.createStructField("id", DataTypes.StringType, true), - DataTypes.createStructField("text", DataTypes.StringType, true) - }); - Dataset<Row> rows = spark().createDataFrame(recs, schema); List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); // Add file paths and sizes to the list filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1")); @@ -172,16 +142,9 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any(), - eq(schemaProvider))).thenReturn(Option.of(rows)); when(queryRunner.run(Mockito.any())).thenReturn(inputDs); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, 4, "1#path/to/file1.json"); - - verify(gcsObjectMetadataFetcher, times(1)).getGcsObjectMetadata(Mockito.any(), Mockito.any(), - anyBoolean()); - verify(gcsObjectDataFetcher, times(1)).getCloudObjectDataDF(Mockito.any(), - eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider)); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); } @Test @@ -190,23 +153,6 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn String commitTimeForReads = "1"; Pair<String, List<HoodieRecord>> inserts = writeGcsMetadataRecords(commitTimeForWrites); - List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList( - new CloudObjectMetadata("data-file-1.json", 1), - new CloudObjectMetadata("data-file-2.json", 1)); - when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(), Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList); - - List<Row> recs = Arrays.asList( - new GenericRow(new String[] {"1", "Hello 1"}), - new GenericRow(new String[] {"2", "Hello 2"}), - new GenericRow(new String[] {"3", "Hello 3"}), - new GenericRow(new String[] {"4", "Hello 4"}) - ); - StructType schema = new StructType(new StructField[] { - DataTypes.createStructField("id", DataTypes.StringType, true), - DataTypes.createStructField("text", DataTypes.StringType, true) - }); - Dataset<Row> rows = spark().createDataFrame(recs, schema); - List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); // Add file paths and sizes to the list filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1")); @@ -214,18 +160,33 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - - when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any(), - eq(schemaProvider))).thenReturn(Option.of(rows)); when(queryRunner.run(Mockito.any())).thenReturn(inputDs); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, 4, "1#path/to/file2.json"); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, 4, "1#path/to/file3.json"); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json"); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, "1#path/to/file3.json"); + } + + @Test + public void largeBootstrapWithFilters() throws IOException { + String commitTimeForWrites = "2"; + String commitTimeForReads = "1"; - verify(gcsObjectMetadataFetcher, times(2)).getGcsObjectMetadata(Mockito.any(), Mockito.any(), - anyBoolean()); - verify(gcsObjectDataFetcher, times(2)).getCloudObjectDataDF(Mockito.any(), - eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider)); + Pair<String, List<HoodieRecord>> inserts = writeGcsMetadataRecords(commitTimeForWrites); + List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); + // Add file paths and sizes to the list + for (int i = 0; i <= 10000; i++) { + filePathSizeAndCommitTime.add(Triple.of("path/to/file" + i + ".parquet", 100L, "1")); + } + filePathSizeAndCommitTime.add(Triple.of("path/to/file10005.json", 100L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file10006.json", 150L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file10007.json", 200L, "1")); + + Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); + + when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file10006.json"); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json"); } @Test @@ -234,23 +195,6 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn String commitTimeForReads = "1"; Pair<String, List<HoodieRecord>> inserts = writeGcsMetadataRecords(commitTimeForWrites); - List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList( - new CloudObjectMetadata("data-file-1.json", 1), - new CloudObjectMetadata("data-file-2.json", 1)); - when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(), Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList); - - List<Row> recs = Arrays.asList( - new GenericRow(new String[] {"1", "Hello 1"}), - new GenericRow(new String[] {"2", "Hello 2"}), - new GenericRow(new String[] {"3", "Hello 3"}), - new GenericRow(new String[] {"4", "Hello 4"}) - ); - StructType schema = new StructType(new StructField[] { - DataTypes.createStructField("id", DataTypes.StringType, true), - DataTypes.createStructField("text", DataTypes.StringType, true) - }); - Dataset<Row> rows = spark().createDataFrame(recs, schema); - List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); // Add file paths and sizes to the list filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1")); @@ -261,31 +205,21 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any(), - eq(schemaProvider))).thenReturn(Option.of(rows)); when(queryRunner.run(Mockito.any())).thenReturn(inputDs); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, 4, "1#path/to/file1.json"); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 100L, 4, "1#path/to/file2.json"); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 1000L, 4, "2#path/to/file5.json"); - - verify(gcsObjectMetadataFetcher, times(3)).getGcsObjectMetadata(Mockito.any(), Mockito.any(), - anyBoolean()); - verify(gcsObjectDataFetcher, times(3)).getCloudObjectDataDF(Mockito.any(), - eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider)); - - schemaProvider = Option.empty(); - when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any(), - eq(schemaProvider))).thenReturn(Option.of(rows)); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, 4, "1#path/to/file1.json"); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 100L, "1#path/to/file2.json"); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 1000L, "2#path/to/file5.json"); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); } private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, - Option<String> checkpointToPull, long sourceLimit, int expectedCount, String expectedCheckpoint) { + Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) { TypedProperties typedProperties = setProps(missingCheckpointStrategy); + typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json"); GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(), - spark(), schemaProvider.orElse(null), gcsObjectMetadataFetcher, gcsObjectDataFetcher, queryRunner); + spark(), schemaProvider.orElse(null), new GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher, queryRunner); Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, sourceLimit); @@ -293,13 +227,6 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn String nextCheckPoint = dataAndCheckpoint.getRight(); Assertions.assertNotNull(nextCheckPoint); - - if (expectedCount == 0) { - assertFalse(datasetOpt.isPresent()); - } else { - assertEquals(datasetOpt.get().count(), expectedCount); - } - Assertions.assertEquals(expectedCheckpoint, nextCheckPoint); } @@ -341,11 +268,11 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn private HoodieWriteConfig getWriteConfig() { return getConfigBuilder(basePath(), metaClient) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 5).build()) - .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .build(); + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .build(); } private Pair<String, List<HoodieRecord>> writeGcsMetadataRecords(String commitTime) throws IOException { @@ -370,22 +297,25 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) { Properties properties = new Properties(); + //String schemaFilePath = TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); + //properties.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); + properties.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath()); properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", - missingCheckpointStrategy.name()); + missingCheckpointStrategy.name()); properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format", "json"); return new TypedProperties(properties); } private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) { return HoodieWriteConfig.newBuilder() - .withPath(basePath) - .withSchema(GCS_METADATA_SCHEMA.toString()) - .withParallelism(2, 2) - .withBulkInsertParallelism(2) - .withFinalizeWriteParallelism(2).withDeleteParallelism(2) - .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) - .forTable(metaClient.getTableConfig().getTableName()); + .withPath(basePath) + .withSchema(GCS_METADATA_SCHEMA.toString()) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .forTable(metaClient.getTableConfig().getTableName()); } private String generateGCSEventMetadata(Long objectSize, String bucketName, String objectKey, String commitTime) @@ -413,5 +343,4 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = spark().read().json(testRdd); return inputDs; } - }
