This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a09c77061e [HUDI-7416] Remove duplicate code for getFileFormat and 
Refactor filter methods for S3/GCS sources (#10701)
8a09c77061e is described below

commit 8a09c77061e0ba720c53435e07b68a6bc129e876
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue Feb 20 11:34:12 2024 +0530

    [HUDI-7416] Remove duplicate code for getFileFormat and Refactor filter 
methods for S3/GCS sources (#10701)
---
 .../sources/GcsEventsHoodieIncrSource.java         | 11 +---
 .../sources/S3EventsHoodieIncrSource.java          | 58 +++++++++-------------
 .../sources/helpers/CloudDataFetcher.java          | 27 +++++++---
 .../helpers/gcs/GcsObjectMetadataFetcher.java      | 49 ++++++++----------
 .../sources/TestGcsEventsHoodieIncrSource.java     |  5 +-
 5 files changed, 68 insertions(+), 82 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index a06130d3972..208aaaf3b5b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -48,11 +48,9 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
-import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
@@ -126,8 +124,8 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
                                    SchemaProvider schemaProvider) {
 
     this(props, jsc, spark, schemaProvider,
-        new GcsObjectMetadataFetcher(props, getSourceFileFormat(props)),
-        new CloudDataFetcher(props, getStringWithAltKeys(props, 
DATAFILE_FORMAT, true)),
+        new GcsObjectMetadataFetcher(props),
+        new CloudDataFetcher(props),
         new QueryRunner(spark, props)
     );
   }
@@ -196,9 +194,4 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
     Option<Dataset<Row>> fileDataRows = 
gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, 
props, schemaProvider);
     return Pair.of(fileDataRows, queryInfo.getEndInstant());
   }
-
-  private static String getSourceFileFormat(TypedProperties props) {
-    return getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true);
-  }
-
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 4cbec4d2212..c4ab7339fbb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -25,7 +25,6 @@ import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.utilities.config.CloudSourceConfig;
 import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
@@ -52,11 +51,9 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
-import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
 import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX;
 import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX;
 import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING;
@@ -72,11 +69,9 @@ import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMiss
 public class S3EventsHoodieIncrSource extends HoodieIncrSource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
-  private static final String EMPTY_STRING = "";
   private final String srcPath;
   private final int numInstantsPerFetch;
   private final boolean checkIfFileExists;
-  private final String fileFormat;
   private final IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy;
   private final QueryRunner queryRunner;
   private final CloudDataFetcher cloudDataFetcher;
@@ -123,7 +118,7 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
       SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     this(props, sparkContext, sparkSession, schemaProvider, new 
QueryRunner(sparkSession, props),
-        new CloudDataFetcher(props, getStringWithAltKeys(props, 
CloudSourceConfig.DATAFILE_FORMAT, true)));
+        new CloudDataFetcher(props));
   }
 
   public S3EventsHoodieIncrSource(
@@ -138,13 +133,6 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
     this.numInstantsPerFetch = getIntWithAltKeys(props, 
NUM_INSTANTS_PER_FETCH);
     this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
-
-    // This is to ensure backward compatibility where we were using the
-    // config SOURCE_FILE_FORMAT for file format in previous versions.
-    this.fileFormat = StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
DATAFILE_FORMAT, EMPTY_STRING))
-        ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
-        : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
-
     this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
     this.queryRunner = queryRunner;
     this.cloudDataFetcher = cloudDataFetcher;
@@ -152,6 +140,27 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     this.snapshotLoadQuerySplitter = 
SnapshotLoadQuerySplitter.getInstance(props);
   }
 
+  public static String generateFilter(TypedProperties props) {
+    String fileFormat = CloudDataFetcher.getFileFormat(props);
+    String filter = S3_OBJECT_SIZE + " > 0";
+    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_KEY_PREFIX, 
true))) {
+      filter = filter + " and " + S3_OBJECT_KEY + " like '" + 
getStringWithAltKeys(props, S3_KEY_PREFIX) + "%'";
+    }
+    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
S3_IGNORE_KEY_PREFIX, true))) {
+      filter = filter + " and " + S3_OBJECT_KEY + " not like '" + 
getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX) + "%'";
+    }
+    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
S3_IGNORE_KEY_SUBSTRING, true))) {
+      filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + 
getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
+    }
+    // Match files with a given extension, or use the fileFormat as the 
fallback incase the config is not set.
+    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
CLOUD_DATAFILE_EXTENSION, true))) {
+      filter = filter + " and " + S3_OBJECT_KEY + " like '%" + 
getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
+    } else {
+      filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + 
"%'";
+    }
+    return filter;
+  }
+
   @Override
   public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpoint, long sourceLimit) {
     CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = 
CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
@@ -172,7 +181,7 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     }
     Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = 
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
     queryInfo = queryInfoDatasetPair.getLeft();
-    Dataset<Row> filteredSourceData = 
applyFilter(queryInfoDatasetPair.getRight(), fileFormat);
+    Dataset<Row> filteredSourceData = 
queryInfoDatasetPair.getRight().filter(generateFilter(props));
 
     LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
     Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
@@ -199,25 +208,4 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     Option<Dataset<Row>> datasetOption = 
cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, 
schemaProvider);
     return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
   }
-
-  Dataset<Row> applyFilter(Dataset<Row> source, String fileFormat) {
-    String filter = S3_OBJECT_SIZE + " > 0";
-    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_KEY_PREFIX, 
true))) {
-      filter = filter + " and " + S3_OBJECT_KEY + " like '" + 
getStringWithAltKeys(props, S3_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
S3_IGNORE_KEY_PREFIX, true))) {
-      filter = filter + " and " + S3_OBJECT_KEY + " not like '" + 
getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
S3_IGNORE_KEY_SUBSTRING, true))) {
-      filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + 
getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // Match files with a given extension, or use the fileFormat as the 
fallback incase the config is not set.
-    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
CLOUD_DATAFILE_EXTENSION, true))) {
-      filter = filter + " and " + S3_OBJECT_KEY + " like '%" + 
getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
-    } else {
-      filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + 
"%'";
-    }
-
-    return source.filter(filter);
-  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index 9595ec1a9e6..ed1a49e33e7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -20,17 +20,21 @@ package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.List;
 
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
+import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;
 
 /**
@@ -39,21 +43,28 @@ import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorComm
  */
 public class CloudDataFetcher implements Serializable {
 
-  private final String fileFormat;
-  private TypedProperties props;
+  private static final String EMPTY_STRING = "";
+
+  private final TypedProperties props;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CloudDataFetcher.class);
 
   private static final long serialVersionUID = 1L;
 
-  public CloudDataFetcher(TypedProperties props, String fileFormat) {
-    this.fileFormat = fileFormat;
+  public CloudDataFetcher(TypedProperties props) {
     this.props = props;
   }
 
+  public static String getFileFormat(TypedProperties props) {
+    // This is to ensure backward compatibility where we were using the
+    // config SOURCE_FILE_FORMAT for file format in previous versions.
+    return StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
DATAFILE_FORMAT, EMPTY_STRING))
+        ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
+        : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
+  }
+
   public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata,
                                                    TypedProperties props, 
Option<SchemaProvider> schemaProviderOption) {
-    return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, 
schemaProviderOption);
+    return loadAsDataset(spark, cloudObjectMetadata, props, 
getFileFormat(props), schemaProviderOption);
   }
-
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
index c92901d14cf..44480d91f65 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -51,10 +52,6 @@ import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorComm
  */
 public class GcsObjectMetadataFetcher implements Serializable {
 
-  /**
-   * The default file format to assume if {@link 
GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION} is not given.
-   */
-  private final String fileFormat;
   private final TypedProperties props;
 
   private static final String GCS_PREFIX = "gs://";
@@ -62,13 +59,8 @@ public class GcsObjectMetadataFetcher implements 
Serializable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GcsObjectMetadataFetcher.class);
 
-  /**
-   * @param fileFormat The default file format to assume if {@link 
GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION}
-   *                   is not given.
-   */
-  public GcsObjectMetadataFetcher(TypedProperties props, String fileFormat) {
+  public GcsObjectMetadataFetcher(TypedProperties props) {
     this.props = props;
-    this.fileFormat = fileFormat;
   }
 
   /**
@@ -86,36 +78,25 @@ public class GcsObjectMetadataFetcher implements 
Serializable {
         .collectAsList();
   }
 
-  /**
-   * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS 
objects. Assumed to be a persisted form
-   *                              of a Cloud Storage Pubsub Notification event.
-   * @return Dataset<Row> after apply the filtering.
-   */
-  public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
-    String filter = createFilter();
-    LOG.info("Adding filter string to Dataset: " + filter);
-
-    return cloudObjectMetadataDF.filter(filter);
-  }
-
   /**
    * Add optional filters that narrow down the list of GCS objects to fetch.
    */
-  private String createFilter() {
+  public static String generateFilter(TypedProperties props) {
     StringBuilder filter = new StringBuilder("size > 0");
 
-    getPropVal(SELECT_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" 
and name like '" + val + "%'"));
-    getPropVal(IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" 
and name not like '" + val + "%'"));
-    getPropVal(IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val -> filter.append(" 
and name not like '%" + val + "%'"));
+    getPropVal(props, SELECT_RELATIVE_PATH_PREFIX).ifPresent(val -> 
filter.append(" and name like '" + val + "%'"));
+    getPropVal(props, IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val -> 
filter.append(" and name not like '" + val + "%'"));
+    getPropVal(props, IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val -> 
filter.append(" and name not like '%" + val + "%'"));
 
     // Match files with a given extension, or use the fileFormat as the 
default.
-    getPropVal(CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat))
+    String fileFormat = CloudDataFetcher.getFileFormat(props);
+    getPropVal(props, CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat))
         .map(val -> filter.append(" and name like '%" + val + "'"));
 
     return filter.toString();
   }
 
-  private Option<String> getPropVal(ConfigProperty<String> configProperty) {
+  private static Option<String> getPropVal(TypedProperties props, 
ConfigProperty<String> configProperty) {
     String value = getStringWithAltKeys(props, configProperty, true);
     if (!isNullOrEmpty(value)) {
       return Option.of(value);
@@ -123,4 +104,16 @@ public class GcsObjectMetadataFetcher implements 
Serializable {
 
     return Option.empty();
   }
+
+  /**
+   * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS 
objects. Assumed to be a persisted form
+   *                              of a Cloud Storage Pubsub Notification event.
+   * @return Dataset<Row> after apply the filtering.
+   */
+  public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
+    String filter = generateFilter(props);
+    LOG.info("Adding filter string to Dataset: " + filter);
+
+    return cloudObjectMetadataDF.filter(filter);
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index bc2906d251f..4e37c17b43a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.config.CloudSourceConfig;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
@@ -283,7 +284,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
                              TypedProperties typedProperties) {
 
     GcsEventsHoodieIncrSource incrSource = new 
GcsEventsHoodieIncrSource(typedProperties, jsc(),
-        spark(), schemaProvider.orElse(null), new 
GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher, 
queryRunner);
+        spark(), schemaProvider.orElse(null), new 
GcsObjectMetadataFetcher(typedProperties), gcsObjectDataFetcher, queryRunner);
 
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
 
@@ -374,7 +375,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", 
basePath());
     
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
         missingCheckpointStrategy.name());
-    
properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format", 
"json");
+    properties.setProperty(CloudSourceConfig.DATAFILE_FORMAT.key(), "json");
     return new TypedProperties(properties);
   }
 

Reply via email to