This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a3a01039e60 [HUDI-7418] Add file extension filter for s3 incr source 
(#10694)
a3a01039e60 is described below

commit a3a01039e60f0b37198626af08a6cd24505b4d08
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Sun Feb 18 23:47:48 2024 -0800

    [HUDI-7418] Add file extension filter for s3 incr source (#10694)
    
    We have support for filtering the input files based on an extension 
(custom) for GCS Incr Source that can be configured. But we don't have the same 
for the S3 incr source (which always assumes that file extension is same as the 
format which may not be the case always).
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../sources/S3EventsHoodieIncrSource.java          | 10 +++++--
 .../sources/TestS3EventsHoodieIncrSource.java      | 34 +++++++++++++++-------
 2 files changed, 32 insertions(+), 12 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 3af87d49489..4cbec4d2212 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -51,6 +51,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
@@ -210,8 +211,13 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
S3_IGNORE_KEY_SUBSTRING, true))) {
       filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + 
getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
     }
-    // add file format filtering by default
-    filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + "%'";
+    // Match files with a given extension, or use the fileFormat as the 
fallback incase the config is not set.
+    if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
CLOUD_DATAFILE_EXTENSION, true))) {
+      filter = filter + " and " + S3_OBJECT_KEY + " like '%" + 
getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
+    } else {
+      filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + 
"%'";
+    }
+
     return source.filter(filter);
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index e0af8d73e26..33faac5361f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.config.CloudSourceConfig;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
@@ -59,6 +60,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -287,22 +289,31 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
   }
 
-  @Test
-  public void testTwoFilesAndContinueAcrossCommits() throws IOException {
+  @ParameterizedTest
+  @ValueSource(strings = {
+      ".json",
+      ".gz"
+  })
+  public void testTwoFilesAndContinueAcrossCommits(String extension) throws 
IOException {
     String commitTimeForWrites = "2";
     String commitTimeForReads = "1";
 
     Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
     inserts = writeS3MetadataRecords(commitTimeForWrites);
 
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    // In the case the extension is explicitly set to something other than the 
file format.
+    if (!extension.endsWith("json")) {
+      
typedProperties.setProperty(CloudSourceConfig.CLOUD_DATAFILE_EXTENSION.key(), 
extension);
+    }
 
     List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
     // Add file paths and sizes to the list
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "2"));
-    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file1%s", 
extension), 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file3%s", 
extension), 200L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", 
extension), 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", 
extension), 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s", 
extension), 150L, "2"));
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
@@ -310,9 +321,12 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, 
"1#path/to/file1.json");
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
100L, "1#path/to/file2.json");
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
1000L, "2#path/to/file5.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
+                  "1#path/to/file1" + extension, typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + 
extension), 100L,
+                  "1#path/to/file2" + extension, typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + 
extension), 1000L,
+                  "2#path/to/file5" + extension, typedProperties);
   }
 
   @Test

Reply via email to