xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r927831833


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf 
hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = 
"hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = 
"hoodie.deltastreamer.source.s3incr.file.extensions";
+
+    static final String ATTACH_SOURCE_PARTITION_COLUMN = 
"hoodie.deltastreamer.source.s3incr.source.partition.exists";

Review Comment:
   ditto. same naming rules here



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), 
queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, 
null))) {
-      filter = filter + " and s3.object.key like '" + 
props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if 
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, 
null))) {
-      filter = filter + " and s3.object.key not like '" + 
props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if 
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, 
null))) {
-      filter = filter + " and s3.object.key not like '%" + 
props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = 
serializableConfiguration.newCopy();
+            String bucket = row.getString(0);
+            String filePath = s3Prefix + bucket + "/" + row.getString(1);
+            try {
+              String decodeUrl = URLDecoder.decode(filePath, 
StandardCharsets.UTF_8.name());
+              if (checkExists) {
+                FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
configuration);
+                try {
+                  if (fs.exists(new Path(decodeUrl))) {

Review Comment:
   creating hadoop Path gives much more memory overhead than normal 
instantiation. If just for checking, let's find a better way.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf 
hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = 
"hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = 
"hoodie.deltastreamer.source.s3incr.file.extensions";

Review Comment:
   
   ```suggestion
       static final String S3INCR_FILE_EXTENSIONS_OPTIONS = 
"hoodie.deltastreamer.source.s3incr.file.extensions";
   ```
   
   should align with the actual key, and suffix `OPTIONS` since it is a key not 
the extensions 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String 
fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, 
Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && 
!StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) 
{
+      String partitionKey = 
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+      String partitionPathPattern = String.format("%s=",partitionKey);
+      String filePath = cloudFiles.get(0);
+      List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+          .filter(level -> 
level.contains(partitionPathPattern)).collect(Collectors.toList());
+      if (nestedPartition.size() > 1) {
+        throw new HoodieException("More than one level of partitioning 
exists");

Review Comment:
   multiple level partition is very common. so this is a major limitation? if 
push this out, how would it affect existing users?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to