codope commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r927282574


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), 
queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, 
null))) {
-      filter = filter + " and s3.object.key like '" + 
props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if 
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, 
null))) {
-      filter = filter + " and s3.object.key not like '" + 
props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if 
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, 
null))) {
-      filter = filter + " and s3.object.key not like '%" + 
props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = 
serializableConfiguration.newCopy();
+            String bucket = row.getString(0);
+            String filePath = s3Prefix + bucket + "/" + row.getString(1);
+            try {
+              String decodeUrl = URLDecoder.decode(filePath, 
StandardCharsets.UTF_8.name());
+              if (checkExists) {
+                FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
configuration);
+                try {
+                  if (fs.exists(new Path(decodeUrl))) {
+                    cloudFilesPerPartition.add(decodeUrl);
+                  }
+                } catch (IOException e) {
+                  LOG.error(String.format("Error while checking path exists 
for %s ", decodeUrl), e);
+                }
+              } else {
+                cloudFilesPerPartition.add(decodeUrl);
+              }
+            } catch (Exception exception) {
+              LOG.warn("Failed to add cloud file ", exception);
+            }
+          });
+          return cloudFilesPerPartition.iterator();
+        }).collect();
     Option<Dataset<Row>> dataset = Option.empty();
     if (!cloudFiles.isEmpty()) {
       DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
-      dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new 
String[0])));
+      Dataset ds = 
addPartitionColumn(dataFrameReader.load(cloudFiles.toArray(new 
String[0])),cloudFiles);
+      dataset = Option.of(ds);
     }
+    LOG.warn("Extracted distinct files " + cloudFiles.size()

Review Comment:
   i assume it was for testing, change log to `debug` level?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf 
hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = 
"hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = 
"hoodie.deltastreamer.source.s3incr.file.extensions";
+
+    static final String ATTACH_SOURCE_PARTITION_COLUMN = 
"hoodie.deltastreamer.source.s3incr.source.partition.exists";
+    static final Boolean DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN = true;

Review Comment:
   Have we fully tested this change? If not, I would suggest keeping the 
default false for now. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String 
fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, 
Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && 
!StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) 
{
+      String partitionKey = 
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+      String partitionPathPattern = String.format("%s=",partitionKey);
+      String filePath = cloudFiles.get(0);
+      List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+          .filter(level -> 
level.contains(partitionPathPattern)).collect(Collectors.toList());
+      if (nestedPartition.size() > 1) {
+        throw new HoodieException("More than one level of partitioning 
exists");
+      }
+      if (nestedPartition.size() == 1) {
+        LOG.info(String.format("adding column name = %s to 
dataset",partitionKey));
+        ds = ds.withColumn(partitionKey, split(split(input_file_name(),
+            partitionPathPattern).getItem(1), "/").getItem(0));
+      }
+    }
+    return ds;
+  }
+
+  private Column s3EventsColumnFilter(String fileFormat) {

Review Comment:
   A minor suggestion to extract such kind of methods to a separate util class 
and keep this class plain and simple. Or if you prefer to keep these methods in 
this class for better readability then move it to the bottom (i.e. after the 
call site) for linear flow.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String 
fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, 
Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && 
!StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) 
{
+      String partitionKey = 
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+      String partitionPathPattern = String.format("%s=",partitionKey);
+      String filePath = cloudFiles.get(0);
+      List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+          .filter(level -> 
level.contains(partitionPathPattern)).collect(Collectors.toList());
+      if (nestedPartition.size() > 1) {
+        throw new HoodieException("More than one level of partitioning 
exists");

Review Comment:
   Is it planned to be supported sometime in future? If yes, let's create a 
tracking JIRA for that.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf 
hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = 
"hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = 
"hoodie.deltastreamer.source.s3incr.file.extensions";
+
+    static final String ATTACH_SOURCE_PARTITION_COLUMN = 
"hoodie.deltastreamer.source.s3incr.source.partition.exists";

Review Comment:
   ```suggestion
   // Add a comment on the purpose of this config and rename as below
       static final String ADD_SOURCE_PARTITION_COLUMN = 
"hoodie.deltastreamer.source.s3incr.add.source.partition.column";
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String 
fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, 
Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && 
!StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) 
{
+      String partitionKey = 
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];

Review Comment:
   return early or log error/warn if `partitionKey` is null or empty?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf 
hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = 
"hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = 
"hoodie.deltastreamer.source.s3incr.file.extensions";

Review Comment:
   Is this a list of supported source data files extensions, e.g. .json, 
.parquet, .avro, etc?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), 
queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, 
null))) {
-      filter = filter + " and s3.object.key like '" + 
props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if 
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, 
null))) {
-      filter = filter + " and s3.object.key not like '" + 
props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if 
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, 
null))) {
-      filter = filter + " and s3.object.key not like '%" + 
props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = 
serializableConfiguration.newCopy();

Review Comment:
   Why creating a copy again? I don't see any config modification happening 
within the executor. Why not pass `serializableConfiguration` simply?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to