This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bac6966b5b9 [HUDI-6738] - Apply object filter before checkpoint 
batching in GcsEventsHoodieIncrSource (#9538)
bac6966b5b9 is described below

commit bac6966b5b9e151285b9c0810b0b770584b22850
Author: lokesh-lingarajan-0310 
<[email protected]>
AuthorDate: Mon Sep 11 10:26:24 2023 -0700

    [HUDI-6738] - Apply object filter before checkpoint batching in 
GcsEventsHoodieIncrSource (#9538)
    
    Apply filtering before we start checkpoint batching.
    This change list will bring GCS job similar to S3 job.
    
    ---------
    
    Co-authored-by: Lokesh Lingarajan 
<[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../sources/GcsEventsHoodieIncrSource.java         |   3 +-
 .../helpers/gcs/GcsObjectMetadataFetcher.java      |  17 ++-
 .../sources/TestGcsEventsHoodieIncrSource.java     | 169 ++++++---------------
 3 files changed, 63 insertions(+), 126 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 891881095fd..d09bad71916 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -172,10 +172,11 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
     }
 
     Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo);
+    Dataset<Row> filteredSourceData = 
gcsObjectMetadataFetcher.applyFilter(cloudObjectMetadataDF);
     LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
     Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
         IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
-            cloudObjectMetadataDF, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
+            filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
     if (!checkPointAndDataset.getRight().isPresent()) {
       LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
       return Pair.of(Option.empty(), queryInfo.getEndInstant());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
index 08116ac0fa5..c92901d14cf 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
@@ -78,19 +78,26 @@ public class GcsObjectMetadataFetcher implements 
Serializable {
    * @return A {@link List} of {@link CloudObjectMetadata} containing GCS info.
    */
   public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc, 
Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists) {
-    String filter = createFilter();
-    LOG.info("Adding filter string to Dataset: " + filter);
-
     SerializableConfiguration serializableHadoopConf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
-
     return cloudObjectMetadataDF
-        .filter(filter)
         .select("bucket", "name", "size")
         .distinct()
         .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX, 
serializableHadoopConf, checkIfExists), 
Encoders.kryo(CloudObjectMetadata.class))
         .collectAsList();
   }
 
+  /**
+   * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS 
objects. Assumed to be a persisted form
+   *                              of a Cloud Storage Pubsub Notification event.
+   * @return Dataset<Row> after apply the filtering.
+   */
+  public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
+    String filter = createFilter();
+    LOG.info("Adding filter string to Dataset: " + filter);
+
+    return cloudObjectMetadataDF.filter(filter);
+  }
+
   /**
    * Add optional filters that narrow down the list of GCS objects to fetch.
    */
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index cc80123a19c..5c31f310800 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -39,7 +39,6 @@ import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
@@ -53,10 +52,6 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -78,9 +73,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -96,9 +88,6 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   @TempDir
   protected java.nio.file.Path tempDir;
 
-  @Mock
-  GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
-
   @Mock
   CloudDataFetcher gcsObjectDataFetcher;
 
@@ -135,10 +124,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
     Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 0, inserts.getKey());
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, inserts.getKey());
 
-    verify(gcsObjectMetadataFetcher, 
times(0)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
-            anyBoolean());
     verify(gcsObjectDataFetcher, times(0)).getCloudObjectDataDF(
         Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider));
   }
@@ -147,24 +134,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws 
IOException {
     String commitTimeForWrites = "2";
     String commitTimeForReads = "1";
-
     Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
-    List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
-        new CloudObjectMetadata("data-file-1.json", 1),
-        new CloudObjectMetadata("data-file-2.json", 1));
-    when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(), 
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
-
-    List<Row> recs = Arrays.asList(
-        new GenericRow(new String[] {"1", "Hello 1"}),
-        new GenericRow(new String[] {"2", "Hello 2"}),
-        new GenericRow(new String[] {"3", "Hello 3"}),
-        new GenericRow(new String[] {"4", "Hello 4"})
-    );
-    StructType schema = new StructType(new StructField[] {
-        DataTypes.createStructField("id", DataTypes.StringType, true),
-        DataTypes.createStructField("text", DataTypes.StringType, true)
-    });
-    Dataset<Row> rows = spark().createDataFrame(recs, schema);
     List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
     // Add file paths and sizes to the list
     filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
@@ -172,16 +142,9 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any(),
-        eq(schemaProvider))).thenReturn(Option.of(rows));
     when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 4, "1#path/to/file1.json");
-
-    verify(gcsObjectMetadataFetcher, 
times(1)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
-            anyBoolean());
-    verify(gcsObjectDataFetcher, times(1)).getCloudObjectDataDF(Mockito.any(),
-        eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
   }
 
   @Test
@@ -190,23 +153,6 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     String commitTimeForReads = "1";
 
     Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
-    List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
-        new CloudObjectMetadata("data-file-1.json", 1),
-        new CloudObjectMetadata("data-file-2.json", 1));
-    when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(), 
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
-
-    List<Row> recs = Arrays.asList(
-        new GenericRow(new String[] {"1", "Hello 1"}),
-        new GenericRow(new String[] {"2", "Hello 2"}),
-        new GenericRow(new String[] {"3", "Hello 3"}),
-        new GenericRow(new String[] {"4", "Hello 4"})
-    );
-    StructType schema = new StructType(new StructField[] {
-        DataTypes.createStructField("id", DataTypes.StringType, true),
-        DataTypes.createStructField("text", DataTypes.StringType, true)
-    });
-    Dataset<Row> rows = spark().createDataFrame(recs, schema);
-
     List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
     // Add file paths and sizes to the list
     filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
@@ -214,18 +160,33 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
-
-    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any(),
-        eq(schemaProvider))).thenReturn(Option.of(rows));
     when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, 4, "1#path/to/file2.json");
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
250L, 4, "1#path/to/file3.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file2.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
250L, "1#path/to/file3.json");
+  }
+
+  @Test
+  public void largeBootstrapWithFilters() throws IOException {
+    String commitTimeForWrites = "2";
+    String commitTimeForReads = "1";
 
-    verify(gcsObjectMetadataFetcher, 
times(2)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
-        anyBoolean());
-    verify(gcsObjectDataFetcher, times(2)).getCloudObjectDataDF(Mockito.any(),
-        eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
+    Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    for (int i = 0; i <= 10000; i++) {
+      filePathSizeAndCommitTime.add(Triple.of("path/to/file" + i + ".parquet", 
100L, "1"));
+    }
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file10005.json", 100L, 
"1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file10006.json", 150L, 
"1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file10007.json", 200L, 
"1"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file10006.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, 
Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
   }
 
   @Test
@@ -234,23 +195,6 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     String commitTimeForReads = "1";
 
     Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
-    List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
-        new CloudObjectMetadata("data-file-1.json", 1),
-        new CloudObjectMetadata("data-file-2.json", 1));
-    when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(), 
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
-
-    List<Row> recs = Arrays.asList(
-        new GenericRow(new String[] {"1", "Hello 1"}),
-        new GenericRow(new String[] {"2", "Hello 2"}),
-        new GenericRow(new String[] {"3", "Hello 3"}),
-        new GenericRow(new String[] {"4", "Hello 4"})
-    );
-    StructType schema = new StructType(new StructField[] {
-        DataTypes.createStructField("id", DataTypes.StringType, true),
-        DataTypes.createStructField("text", DataTypes.StringType, true)
-    });
-    Dataset<Row> rows = spark().createDataFrame(recs, schema);
-
     List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
     // Add file paths and sizes to the list
     filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
@@ -261,31 +205,21 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any(),
-        eq(schemaProvider))).thenReturn(Option.of(rows));
     when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 4, "1#path/to/file1.json");
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
100L, 4, "1#path/to/file2.json");
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
1000L, 4, "2#path/to/file5.json");
-
-    verify(gcsObjectMetadataFetcher, 
times(3)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
-        anyBoolean());
-    verify(gcsObjectDataFetcher, times(3)).getCloudObjectDataDF(Mockito.any(),
-        eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
-
-    schemaProvider = Option.empty();
-    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any(),
-        eq(schemaProvider))).thenReturn(Option.of(rows));
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 4, "1#path/to/file1.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
100L, "1#path/to/file2.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
1000L, "2#path/to/file5.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
   }
 
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
-                             Option<String> checkpointToPull, long 
sourceLimit, int expectedCount, String expectedCheckpoint) {
+                             Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint) {
     TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+    typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", 
"json");
 
     GcsEventsHoodieIncrSource incrSource = new 
GcsEventsHoodieIncrSource(typedProperties, jsc(),
-        spark(), schemaProvider.orElse(null), gcsObjectMetadataFetcher, 
gcsObjectDataFetcher, queryRunner);
+        spark(), schemaProvider.orElse(null), new 
GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher, 
queryRunner);
 
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
 
@@ -293,13 +227,6 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     String nextCheckPoint = dataAndCheckpoint.getRight();
 
     Assertions.assertNotNull(nextCheckPoint);
-
-    if (expectedCount == 0) {
-      assertFalse(datasetOpt.isPresent());
-    } else {
-      assertEquals(datasetOpt.get().count(), expectedCount);
-    }
-
     Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
   }
 
@@ -341,11 +268,11 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
   private HoodieWriteConfig getWriteConfig() {
     return getConfigBuilder(basePath(), metaClient)
-        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 
5).build())
-            
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
-            .withMetadataConfig(HoodieMetadataConfig.newBuilder()
-                    .withMaxNumDeltaCommitsBeforeCompaction(1).build())
-            .build();
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
3).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .build();
   }
 
   private Pair<String, List<HoodieRecord>> writeGcsMetadataRecords(String 
commitTime) throws IOException {
@@ -370,22 +297,25 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
   private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy) {
     Properties properties = new Properties();
+    //String schemaFilePath = 
TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
+    //properties.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    properties.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
     properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", 
basePath());
     
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
-            missingCheckpointStrategy.name());
+        missingCheckpointStrategy.name());
     
properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format", 
"json");
     return new TypedProperties(properties);
   }
 
   private HoodieWriteConfig.Builder getConfigBuilder(String basePath, 
HoodieTableMetaClient metaClient) {
     return HoodieWriteConfig.newBuilder()
-            .withPath(basePath)
-            .withSchema(GCS_METADATA_SCHEMA.toString())
-            .withParallelism(2, 2)
-            .withBulkInsertParallelism(2)
-            .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
-            .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
-            .forTable(metaClient.getTableConfig().getTableName());
+        .withPath(basePath)
+        .withSchema(GCS_METADATA_SCHEMA.toString())
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .forTable(metaClient.getTableConfig().getTableName());
   }
 
   private String generateGCSEventMetadata(Long objectSize, String bucketName, 
String objectKey, String commitTime)
@@ -413,5 +343,4 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     Dataset<Row> inputDs = spark().read().json(testRdd);
     return inputDs;
   }
-
 }

Reply via email to