This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4538fb2fc3f070883a03cc254a6958f38bfffd1d
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Tue Mar 5 17:32:51 2024 -0800

    [HUDI-7418] Create a common method for filtering in S3 and GCS sources and 
add tests for filtering out extensions (#10724)
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../hudi/utilities/config/CloudSourceConfig.java   |  4 +-
 .../config/S3EventsHoodieIncrSourceConfig.java     |  6 ++
 .../sources/GcsEventsHoodieIncrSource.java         |  8 +--
 .../sources/S3EventsHoodieIncrSource.java          | 50 +++-------------
 .../helpers/CloudObjectsSelectorCommon.java        | 68 ++++++++++++++++++++++
 .../helpers/gcs/GcsObjectMetadataFetcher.java      | 39 +------------
 .../sources/TestGcsEventsHoodieIncrSource.java     | 42 +++++++++----
 .../sources/TestS3EventsHoodieIncrSource.java      |  6 +-
 8 files changed, 124 insertions(+), 99 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index 54be9cabef9..e3bdca1a395 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -85,14 +85,14 @@ public class CloudSourceConfig extends HoodieConfig {
       .noDefaultValue()
       .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.cloud.data.select.relpath.prefix")
       .markAdvanced()
-      .withDocumentation("Only selects objects in the bucket whose relative 
path matches this prefix");
+      .withDocumentation("Only selects objects in the bucket whose relative 
path starts with this prefix");
 
   public static final ConfigProperty<String> IGNORE_RELATIVE_PATH_PREFIX = 
ConfigProperty
       .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.ignore.relpath.prefix")
       .noDefaultValue()
       .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.cloud.data.ignore.relpath.prefix")
       .markAdvanced()
-      .withDocumentation("Ignore objects in the bucket whose relative path 
matches this prefix");
+      .withDocumentation("Ignore objects in the bucket whose relative path 
starts this prefix");
 
   public static final ConfigProperty<String> IGNORE_RELATIVE_PATH_SUBSTR = 
ConfigProperty
       .key(STREAMER_CONFIG_PREFIX + 
"source.cloud.data.ignore.relpath.substring")
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/S3EventsHoodieIncrSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/S3EventsHoodieIncrSourceConfig.java
index 3db572b1f84..23ecb96d795 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/S3EventsHoodieIncrSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/S3EventsHoodieIncrSourceConfig.java
@@ -47,6 +47,8 @@ public class S3EventsHoodieIncrSourceConfig extends 
HoodieConfig {
       .markAdvanced()
       .withDocumentation("Control whether we do existence check for files 
before consuming them");
 
+  @Deprecated
+  // Use {@link CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX}
   public static final ConfigProperty<String> S3_KEY_PREFIX = ConfigProperty
       .key(STREAMER_CONFIG_PREFIX + "source.s3incr.key.prefix")
       .noDefaultValue()
@@ -61,6 +63,8 @@ public class S3EventsHoodieIncrSourceConfig extends 
HoodieConfig {
       .markAdvanced()
       .withDocumentation("The file system prefix.");
 
+  @Deprecated
+  // Use {@link CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX}
   public static final ConfigProperty<String> S3_IGNORE_KEY_PREFIX = 
ConfigProperty
       .key(STREAMER_CONFIG_PREFIX + "source.s3incr.ignore.key.prefix")
       .noDefaultValue()
@@ -68,6 +72,8 @@ public class S3EventsHoodieIncrSourceConfig extends 
HoodieConfig {
       .markAdvanced()
       .withDocumentation("Control whether to ignore the s3 objects starting 
with this prefix");
 
+  @Deprecated
+  // Use {@link CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR}
   public static final ConfigProperty<String> S3_IGNORE_KEY_SUBSTRING = 
ConfigProperty
       .key(STREAMER_CONFIG_PREFIX + "source.s3incr.ignore.key.substring")
       .noDefaultValue()
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 208aaaf3b5b..07950742909 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
@@ -114,10 +115,6 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
   private final Option<SchemaProvider> schemaProvider;
   private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
 
-
-  public static final String GCS_OBJECT_KEY = "name";
-  public static final String GCS_OBJECT_SIZE = "size";
-
   private static final Logger LOG = 
LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class);
 
   public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,
@@ -161,7 +158,8 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
         sparkContext, srcPath, numInstantsPerFetch,
         Option.of(cloudObjectIncrCheckpoint.getCommit()),
         missingCheckpointStrategy, handlingMode, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-        GCS_OBJECT_KEY, GCS_OBJECT_SIZE, true,
+        CloudObjectsSelectorCommon.GCS_OBJECT_KEY,
+        CloudObjectsSelectorCommon.GCS_OBJECT_SIZE, true,
         Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
     LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and 
queryInfo:" + queryInfo);
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index c4ab7339fbb..84b267709ad 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -23,13 +23,13 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
@@ -50,15 +50,11 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
 import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX;
-import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX;
-import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING;
 import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK;
-import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
@@ -87,18 +83,9 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     @Deprecated
     static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = 
S3_INCR_ENABLE_EXISTS_CHECK.defaultValue();
 
-    // control whether to filter the s3 objects starting with this prefix
-    @Deprecated
-    static final String S3_KEY_PREFIX = 
S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key();
     @Deprecated
     static final String S3_FS_PREFIX = 
S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.key();
 
-    // control whether to ignore the s3 objects starting with this prefix
-    @Deprecated
-    static final String S3_IGNORE_KEY_PREFIX = 
S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key();
-    // control whether to ignore the s3 objects with this substring
-    @Deprecated
-    static final String S3_IGNORE_KEY_SUBSTRING = 
S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key();
     /**
      * {@link #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader 
while loading dataset.
      * Example Hudi Streamer conf
@@ -108,10 +95,6 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     public static final String SPARK_DATASOURCE_OPTIONS = 
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS.key();
   }
 
-  public static final String S3_OBJECT_KEY = "s3.object.key";
-  public static final String S3_OBJECT_SIZE = "s3.object.size";
-  public static final String S3_BUCKET_NAME = "s3.bucket.name";
-
   public S3EventsHoodieIncrSource(
       TypedProperties props,
       JavaSparkContext sparkContext,
@@ -140,27 +123,6 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     this.snapshotLoadQuerySplitter = 
SnapshotLoadQuerySplitter.getInstance(props);
   }
 
-  public static String generateFilter(TypedProperties props) {
-    String fileFormat = CloudDataFetcher.getFileFormat(props);
-    String filter = S3_OBJECT_SIZE + " > 0";
-    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_KEY_PREFIX, 
true))) {
-      filter = filter + " and " + S3_OBJECT_KEY + " like '" + 
getStringWithAltKeys(props, S3_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
S3_IGNORE_KEY_PREFIX, true))) {
-      filter = filter + " and " + S3_OBJECT_KEY + " not like '" + 
getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
S3_IGNORE_KEY_SUBSTRING, true))) {
-      filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + 
getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // Match files with a given extension, or use the fileFormat as the 
fallback incase the config is not set.
-    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
CLOUD_DATAFILE_EXTENSION, true))) {
-      filter = filter + " and " + S3_OBJECT_KEY + " like '%" + 
getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
-    } else {
-      filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + 
"%'";
-    }
-    return filter;
-  }
-
   @Override
   public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpoint, long sourceLimit) {
     CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = 
CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
@@ -171,7 +133,8 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
             Option.of(cloudObjectIncrCheckpoint.getCommit()),
             missingCheckpointStrategy, handlingMode,
             HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-            S3_OBJECT_KEY, S3_OBJECT_SIZE, true,
+            CloudObjectsSelectorCommon.S3_OBJECT_KEY,
+            CloudObjectsSelectorCommon.S3_OBJECT_SIZE, true,
             Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
     LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:" 
+ queryInfo);
 
@@ -181,7 +144,8 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     }
     Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = 
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
     queryInfo = queryInfoDatasetPair.getLeft();
-    Dataset<Row> filteredSourceData = 
queryInfoDatasetPair.getRight().filter(generateFilter(props));
+    Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter(
+        
CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.S3, 
props));
 
     LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
     Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
@@ -199,7 +163,9 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     // Create S3 paths
     SerializableConfiguration serializableHadoopConf = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
     List<CloudObjectMetadata> cloudObjectMetadata = 
checkPointAndDataset.getRight().get()
-        .select(S3_BUCKET_NAME, S3_OBJECT_KEY, S3_OBJECT_SIZE)
+        .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME,
+                CloudObjectsSelectorCommon.S3_OBJECT_KEY,
+                CloudObjectsSelectorCommon.S3_OBJECT_SIZE)
         .distinct()
         .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, 
serializableHadoopConf, checkIfFileExists), 
Encoders.kryo(CloudObjectMetadata.class))
         .collectAsList();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 5ed7dcae897..8676bf41cb5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
@@ -56,9 +57,16 @@ import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE
 import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
+import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX;
+import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING;
+import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX;
 import static org.apache.spark.sql.functions.input_file_name;
 import static org.apache.spark.sql.functions.split;
 
@@ -71,6 +79,13 @@ public class CloudObjectsSelectorCommon {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CloudObjectsSelectorCommon.class);
 
+  public static final String S3_OBJECT_KEY = "s3.object.key";
+  public static final String S3_OBJECT_SIZE = "s3.object.size";
+  public static final String S3_BUCKET_NAME = "s3.bucket.name";
+  public static final String GCS_OBJECT_KEY = "name";
+  public static final String GCS_OBJECT_SIZE = "size";
+  private static final String SPACE_DELIMTER = " ";
+
   /**
    * Return a function that extracts filepaths from a list of Rows.
    * Here Row is assumed to have the schema [bucket_name, 
filepath_relative_to_bucket, object_size]
@@ -151,6 +166,45 @@ public class CloudObjectsSelectorCommon {
     }
   }
 
+  public static String generateFilter(Type type,
+                                      TypedProperties props) {
+    String fileFormat = CloudDataFetcher.getFileFormat(props);
+    Option<String> selectRelativePathPrefix = getPropVal(props, 
SELECT_RELATIVE_PATH_PREFIX);
+    Option<String> ignoreRelativePathPrefix = getPropVal(props, 
IGNORE_RELATIVE_PATH_PREFIX);
+    Option<String> ignoreRelativePathSubStr = getPropVal(props, 
IGNORE_RELATIVE_PATH_SUBSTR);
+
+    String objectKey;
+    String objectSizeKey;
+    // This is for backwards compatibility of configs for s3.
+    if (type.equals(Type.S3)) {
+      objectKey = S3_OBJECT_KEY;
+      objectSizeKey = S3_OBJECT_SIZE;
+      selectRelativePathPrefix = selectRelativePathPrefix.or(() -> 
getPropVal(props, S3_KEY_PREFIX));
+      ignoreRelativePathPrefix = ignoreRelativePathPrefix.or(() -> 
getPropVal(props, S3_IGNORE_KEY_PREFIX));
+      ignoreRelativePathSubStr = ignoreRelativePathSubStr.or(() -> 
getPropVal(props, S3_IGNORE_KEY_SUBSTRING));
+    } else {
+      objectKey = GCS_OBJECT_KEY;
+      objectSizeKey = GCS_OBJECT_SIZE;
+    }
+
+    StringBuilder filter = new StringBuilder(String.format("%s > 0", 
objectSizeKey));
+    if (selectRelativePathPrefix.isPresent()) {
+      filter.append(SPACE_DELIMTER).append(String.format("and %s like '%s%%'", 
objectKey, selectRelativePathPrefix.get()));
+    }
+    if (ignoreRelativePathPrefix.isPresent()) {
+      filter.append(SPACE_DELIMTER).append(String.format("and %s not like 
'%s%%'", objectKey, ignoreRelativePathPrefix.get()));
+    }
+    if (ignoreRelativePathSubStr.isPresent()) {
+      filter.append(SPACE_DELIMTER).append(String.format("and %s not like 
'%%%s%%'", objectKey, ignoreRelativePathSubStr.get()));
+    }
+
+    // Match files with a given extension, or use the fileFormat as the 
default.
+    getPropVal(props, CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat))
+        .map(val -> filter.append(SPACE_DELIMTER).append(String.format("and %s 
like '%%%s'", objectKey, val)));
+
+    return filter.toString();
+  }
+
   public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata,
                                                    TypedProperties props, 
String fileFormat, Option<SchemaProvider> schemaProviderOption) {
     if (LOG.isDebugEnabled()) {
@@ -233,4 +287,18 @@ public class CloudObjectsSelectorCommon {
   public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String 
fileFormat) {
     return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, 
Option.empty());
   }
+
+  private static Option<String> getPropVal(TypedProperties props, 
ConfigProperty<String> configProperty) {
+    String value = getStringWithAltKeys(props, configProperty, true);
+    if (!StringUtils.isNullOrEmpty(value)) {
+      return Option.of(value);
+    }
+
+    return Option.empty();
+  }
+
+  public enum Type {
+    S3,
+    GCS
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
index 44480d91f65..29a50e81fb0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
@@ -18,12 +18,10 @@
 
 package org.apache.hudi.utilities.sources.helpers.gcs;
 
-import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -35,12 +33,6 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.List;
 
-import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
-import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition;
 
 /**
@@ -78,40 +70,13 @@ public class GcsObjectMetadataFetcher implements 
Serializable {
         .collectAsList();
   }
 
-  /**
-   * Add optional filters that narrow down the list of GCS objects to fetch.
-   */
-  public static String generateFilter(TypedProperties props) {
-    StringBuilder filter = new StringBuilder("size > 0");
-
-    getPropVal(props, SELECT_RELATIVE_PATH_PREFIX).ifPresent(val -> 
filter.append(" and name like '" + val + "%'"));
-    getPropVal(props, IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val -> 
filter.append(" and name not like '" + val + "%'"));
-    getPropVal(props, IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val -> 
filter.append(" and name not like '%" + val + "%'"));
-
-    // Match files with a given extension, or use the fileFormat as the 
default.
-    String fileFormat = CloudDataFetcher.getFileFormat(props);
-    getPropVal(props, CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat))
-        .map(val -> filter.append(" and name like '%" + val + "'"));
-
-    return filter.toString();
-  }
-
-  private static Option<String> getPropVal(TypedProperties props, 
ConfigProperty<String> configProperty) {
-    String value = getStringWithAltKeys(props, configProperty, true);
-    if (!isNullOrEmpty(value)) {
-      return Option.of(value);
-    }
-
-    return Option.empty();
-  }
-
   /**
    * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS 
objects. Assumed to be a persisted form
    *                              of a Cloud Storage Pubsub Notification event.
    * @return Dataset<Row> after apply the filtering.
    */
   public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
-    String filter = generateFilter(props);
+    String filter = 
CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.GCS, 
props);
     LOG.info("Adding filter string to Dataset: " + filter);
 
     return cloudObjectMetadataDF.filter(filter);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 4e37c17b43a..c1844c7a2a1 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -60,6 +60,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -86,6 +87,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
   private static final Schema GCS_METADATA_SCHEMA = 
SchemaTestUtil.getSchemaFromResource(
       TestGcsEventsHoodieIncrSource.class, 
"/streamer-config/gcs-metadata.avsc", true);
+  private static final String IGNORE_FILE_EXTENSION = ".ignore";
 
   private ObjectMapper mapper = new ObjectMapper();
 
@@ -196,28 +198,44 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     readAndAssert(READ_UPTO_LATEST_COMMIT, 
Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
   }
 
-  @Test
-  public void testTwoFilesAndContinueAcrossCommits() throws IOException {
+  @ParameterizedTest
+  @ValueSource(strings = {
+      ".json",
+      ".gz"
+  })
+  public void testTwoFilesAndContinueAcrossCommits(String extension) throws 
IOException {
     String commitTimeForWrites = "2";
     String commitTimeForReads = "1";
 
     Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
+
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    // In the case the extension is explicitly set to something other than the 
file format.
+    if (!extension.endsWith("json")) {
+      
typedProperties.setProperty(CloudSourceConfig.CLOUD_DATAFILE_EXTENSION.key(), 
extension);
+    }
+
     List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
-    // Add file paths and sizes to the list
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "2"));
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2"));
+    // Add file paths and sizes to the list.
+    // Check with a couple of invalid file extensions to ensure they are 
filtered out.
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file1%s", 
extension), 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", 
IGNORE_FILE_EXTENSION), 800L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file3%s", 
extension), 200L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", 
extension), 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", 
extension), 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", 
IGNORE_FILE_EXTENSION), 200L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s", 
extension), 150L, "2"));
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
100L, "1#path/to/file2.json");
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
1000L, "2#path/to/file5.json");
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
+                  "1#path/to/file1" + extension, typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + 
extension), 100L,
+                  "1#path/to/file2" + extension, typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + 
extension), 1000L,
+                  "2#path/to/file5" + extension, typedProperties);
   }
 
   @ParameterizedTest
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 33faac5361f..90fbeb3bb35 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -87,6 +87,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
   private ObjectMapper mapper = new ObjectMapper();
 
   private static final String MY_BUCKET = "some-bucket";
+  private static final String IGNORE_FILE_EXTENSION = ".ignore";
 
   private Option<SchemaProvider> schemaProvider;
   @Mock
@@ -308,11 +309,14 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     }
 
     List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
-    // Add file paths and sizes to the list
+    // Add file paths and sizes to the list.
+    // Check with a couple of invalid file extensions to ensure they are 
filtered out.
     filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file1%s", 
extension), 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", 
IGNORE_FILE_EXTENSION), 800L, "1"));
     filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file3%s", 
extension), 200L, "1"));
     filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", 
extension), 150L, "1"));
     filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", 
extension), 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", 
IGNORE_FILE_EXTENSION), 200L, "2"));
     filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s", 
extension), 150L, "2"));
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);

Reply via email to