This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bac6966b5b9 [HUDI-6738] - Apply object filter before checkpoint
batching in GcsEventsHoodieIncrSource (#9538)
bac6966b5b9 is described below
commit bac6966b5b9e151285b9c0810b0b770584b22850
Author: lokesh-lingarajan-0310
<[email protected]>
AuthorDate: Mon Sep 11 10:26:24 2023 -0700
[HUDI-6738] - Apply object filter before checkpoint batching in
GcsEventsHoodieIncrSource (#9538)
Apply filtering before we start checkpoint batching.
This change list will bring GCS job similar to S3 job.
---------
Co-authored-by: Lokesh Lingarajan
<[email protected]>
Co-authored-by: sivabalan <[email protected]>
---
.../sources/GcsEventsHoodieIncrSource.java | 3 +-
.../helpers/gcs/GcsObjectMetadataFetcher.java | 17 ++-
.../sources/TestGcsEventsHoodieIncrSource.java | 169 ++++++---------------
3 files changed, 63 insertions(+), 126 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 891881095fd..d09bad71916 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -172,10 +172,11 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
}
Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo);
+ Dataset<Row> filteredSourceData =
gcsObjectMetadataFetcher.applyFilter(cloudObjectMetadataDF);
LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
- cloudObjectMetadataDF, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
+ filteredSourceData, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
if (!checkPointAndDataset.getRight().isPresent()) {
LOG.info("Empty source, returning endpoint:" +
queryInfo.getEndInstant());
return Pair.of(Option.empty(), queryInfo.getEndInstant());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
index 08116ac0fa5..c92901d14cf 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
@@ -78,19 +78,26 @@ public class GcsObjectMetadataFetcher implements
Serializable {
* @return A {@link List} of {@link CloudObjectMetadata} containing GCS info.
*/
public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc,
Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists) {
- String filter = createFilter();
- LOG.info("Adding filter string to Dataset: " + filter);
-
SerializableConfiguration serializableHadoopConf = new
SerializableConfiguration(jsc.hadoopConfiguration());
-
return cloudObjectMetadataDF
- .filter(filter)
.select("bucket", "name", "size")
.distinct()
.mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX,
serializableHadoopConf, checkIfExists),
Encoders.kryo(CloudObjectMetadata.class))
.collectAsList();
}
+ /**
+ * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS
objects. Assumed to be a persisted form
+ * of a Cloud Storage Pubsub Notification event.
+ * @return Dataset<Row> after apply the filtering.
+ */
+ public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
+ String filter = createFilter();
+ LOG.info("Adding filter string to Dataset: " + filter);
+
+ return cloudObjectMetadataDF.filter(filter);
+ }
+
/**
* Add optional filters that narrow down the list of GCS objects to fetch.
*/
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index cc80123a19c..5c31f310800 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -39,7 +39,6 @@ import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
@@ -53,10 +52,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -78,9 +73,6 @@ import java.util.stream.Collectors;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -96,9 +88,6 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
@TempDir
protected java.nio.file.Path tempDir;
- @Mock
- GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
-
@Mock
CloudDataFetcher gcsObjectDataFetcher;
@@ -135,10 +124,8 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 0, inserts.getKey());
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, inserts.getKey());
- verify(gcsObjectMetadataFetcher,
times(0)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
- anyBoolean());
verify(gcsObjectDataFetcher, times(0)).getCloudObjectDataDF(
Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider));
}
@@ -147,24 +134,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws
IOException {
String commitTimeForWrites = "2";
String commitTimeForReads = "1";
-
Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
- List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
- new CloudObjectMetadata("data-file-1.json", 1),
- new CloudObjectMetadata("data-file-2.json", 1));
- when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(),
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
-
- List<Row> recs = Arrays.asList(
- new GenericRow(new String[] {"1", "Hello 1"}),
- new GenericRow(new String[] {"2", "Hello 2"}),
- new GenericRow(new String[] {"3", "Hello 3"}),
- new GenericRow(new String[] {"4", "Hello 4"})
- );
- StructType schema = new StructType(new StructField[] {
- DataTypes.createStructField("id", DataTypes.StringType, true),
- DataTypes.createStructField("text", DataTypes.StringType, true)
- });
- Dataset<Row> rows = spark().createDataFrame(recs, schema);
List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
// Add file paths and sizes to the list
filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
@@ -172,16 +142,9 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any(),
- eq(schemaProvider))).thenReturn(Option.of(rows));
when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 4, "1#path/to/file1.json");
-
- verify(gcsObjectMetadataFetcher,
times(1)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
- anyBoolean());
- verify(gcsObjectDataFetcher, times(1)).getCloudObjectDataDF(Mockito.any(),
- eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
}
@Test
@@ -190,23 +153,6 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
String commitTimeForReads = "1";
Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
- List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
- new CloudObjectMetadata("data-file-1.json", 1),
- new CloudObjectMetadata("data-file-2.json", 1));
- when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(),
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
-
- List<Row> recs = Arrays.asList(
- new GenericRow(new String[] {"1", "Hello 1"}),
- new GenericRow(new String[] {"2", "Hello 2"}),
- new GenericRow(new String[] {"3", "Hello 3"}),
- new GenericRow(new String[] {"4", "Hello 4"})
- );
- StructType schema = new StructType(new StructField[] {
- DataTypes.createStructField("id", DataTypes.StringType, true),
- DataTypes.createStructField("text", DataTypes.StringType, true)
- });
- Dataset<Row> rows = spark().createDataFrame(recs, schema);
-
List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
// Add file paths and sizes to the list
filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
@@ -214,18 +160,33 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
-
- when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any(),
- eq(schemaProvider))).thenReturn(Option.of(rows));
when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, 4, "1#path/to/file2.json");
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
250L, 4, "1#path/to/file3.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, "1#path/to/file2.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
250L, "1#path/to/file3.json");
+ }
+
+ @Test
+ public void largeBootstrapWithFilters() throws IOException {
+ String commitTimeForWrites = "2";
+ String commitTimeForReads = "1";
- verify(gcsObjectMetadataFetcher,
times(2)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
- anyBoolean());
- verify(gcsObjectDataFetcher, times(2)).getCloudObjectDataDF(Mockito.any(),
- eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
+ Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ for (int i = 0; i <= 10000; i++) {
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file" + i + ".parquet",
100L, "1"));
+ }
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file10005.json", 100L,
"1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file10006.json", 150L,
"1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file10007.json", 200L,
"1"));
+
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, "1#path/to/file10006.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT,
Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
}
@Test
@@ -234,23 +195,6 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
String commitTimeForReads = "1";
Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
- List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
- new CloudObjectMetadata("data-file-1.json", 1),
- new CloudObjectMetadata("data-file-2.json", 1));
- when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(),
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
-
- List<Row> recs = Arrays.asList(
- new GenericRow(new String[] {"1", "Hello 1"}),
- new GenericRow(new String[] {"2", "Hello 2"}),
- new GenericRow(new String[] {"3", "Hello 3"}),
- new GenericRow(new String[] {"4", "Hello 4"})
- );
- StructType schema = new StructType(new StructField[] {
- DataTypes.createStructField("id", DataTypes.StringType, true),
- DataTypes.createStructField("text", DataTypes.StringType, true)
- });
- Dataset<Row> rows = spark().createDataFrame(recs, schema);
-
List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
// Add file paths and sizes to the list
filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
@@ -261,31 +205,21 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any(),
- eq(schemaProvider))).thenReturn(Option.of(rows));
when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 4, "1#path/to/file1.json");
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
100L, 4, "1#path/to/file2.json");
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
1000L, 4, "2#path/to/file5.json");
-
- verify(gcsObjectMetadataFetcher,
times(3)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
- anyBoolean());
- verify(gcsObjectDataFetcher, times(3)).getCloudObjectDataDF(Mockito.any(),
- eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
-
- schemaProvider = Option.empty();
- when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any(),
- eq(schemaProvider))).thenReturn(Option.of(rows));
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 4, "1#path/to/file1.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
100L, "1#path/to/file2.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
1000L, "2#path/to/file5.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
}
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
- Option<String> checkpointToPull, long
sourceLimit, int expectedCount, String expectedCheckpoint) {
+ Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+ typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format",
"json");
GcsEventsHoodieIncrSource incrSource = new
GcsEventsHoodieIncrSource(typedProperties, jsc(),
- spark(), schemaProvider.orElse(null), gcsObjectMetadataFetcher,
gcsObjectDataFetcher, queryRunner);
+ spark(), schemaProvider.orElse(null), new
GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher,
queryRunner);
Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
@@ -293,13 +227,6 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
String nextCheckPoint = dataAndCheckpoint.getRight();
Assertions.assertNotNull(nextCheckPoint);
-
- if (expectedCount == 0) {
- assertFalse(datasetOpt.isPresent());
- } else {
- assertEquals(datasetOpt.get().count(), expectedCount);
- }
-
Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
}
@@ -341,11 +268,11 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
private HoodieWriteConfig getWriteConfig() {
return getConfigBuilder(basePath(), metaClient)
-
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4,
5).build())
-
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
- .withMetadataConfig(HoodieMetadataConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(1).build())
- .build();
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
3).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .build();
}
private Pair<String, List<HoodieRecord>> writeGcsMetadataRecords(String
commitTime) throws IOException {
@@ -370,22 +297,25 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy) {
Properties properties = new Properties();
+ //String schemaFilePath =
TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
+ //properties.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
+ properties.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path",
basePath());
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
- missingCheckpointStrategy.name());
+ missingCheckpointStrategy.name());
properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format",
"json");
return new TypedProperties(properties);
}
private HoodieWriteConfig.Builder getConfigBuilder(String basePath,
HoodieTableMetaClient metaClient) {
return HoodieWriteConfig.newBuilder()
- .withPath(basePath)
- .withSchema(GCS_METADATA_SCHEMA.toString())
- .withParallelism(2, 2)
- .withBulkInsertParallelism(2)
- .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
- .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
- .forTable(metaClient.getTableConfig().getTableName());
+ .withPath(basePath)
+ .withSchema(GCS_METADATA_SCHEMA.toString())
+ .withParallelism(2, 2)
+ .withBulkInsertParallelism(2)
+ .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+ .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+ .forTable(metaClient.getTableConfig().getTableName());
}
private String generateGCSEventMetadata(Long objectSize, String bucketName,
String objectKey, String commitTime)
@@ -413,5 +343,4 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Dataset<Row> inputDs = spark().read().json(testRdd);
return inputDs;
}
-
}