vamshigv commented on code in PR #6228:
URL: https://github.com/apache/hudi/pull/6228#discussion_r931764557


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -172,37 +177,47 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
-    // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
+    // Create S3 paths
+    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .collectAsList();
-    // Create S3 paths
-    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = 
serializableConfiguration.newCopy();
+            String bucket = row.getString(0);
+            String filePath = s3Prefix + bucket + "/" + row.getString(1);
+            try {
+              String decodeUrl = URLDecoder.decode(filePath, 
StandardCharsets.UTF_8.name());
+              if (checkExists) {
+                FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
configuration);
+                try {

Review Comment:
   Can you please point me to some places elsewhere in hudi where this is done ?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -172,37 +177,47 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
-    // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
+    // Create S3 paths
+    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .collectAsList();
-    // Create S3 paths
-    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = 
serializableConfiguration.newCopy();

Review Comment:
   Good catch! IIUC that should be okay to do.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -172,37 +177,47 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
-    // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
+    // Create S3 paths
+    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .collectAsList();
-    // Create S3 paths
-    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {

Review Comment:
   mapPartitions on dataset is experimental for spark 2.4 which is something 
Hudi supports actively. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -172,37 +177,47 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
-    // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
+    // Create S3 paths
+    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .collectAsList();
-    // Create S3 paths
-    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {

Review Comment:
   For spark 2.4 mapPartitions on dataset is experimental where for javardd it 
is not.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -172,37 +177,47 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
-    // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
+    // Create S3 paths
+    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .collectAsList();
-    // Create S3 paths
-    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = 
serializableConfiguration.newCopy();

Review Comment:
   Good catch! After looking closely I think it might not be needed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -172,37 +177,47 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
-    // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
+    // Create S3 paths
+    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .collectAsList();
-    // Create S3 paths
-    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = 
serializableConfiguration.newCopy();
+            String bucket = row.getString(0);
+            String filePath = s3Prefix + bucket + "/" + row.getString(1);
+            try {
+              String decodeUrl = URLDecoder.decode(filePath, 
StandardCharsets.UTF_8.name());
+              if (checkExists) {
+                FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
configuration);
+                try {
+                  if (fs.exists(new Path(decodeUrl))) {
+                    cloudFilesPerPartition.add(decodeUrl);
+                  }
+                } catch (IOException e) {
+                  LOG.error(String.format("Error while checking path exists 
for %s ", decodeUrl), e);
+                }
+              } else {
+                cloudFilesPerPartition.add(decodeUrl);
+              }
+            } catch (Exception exception) {

Review Comment:
   Can you expand why you think of throwable replacing exception? It looks to 
me all the methods between try catch block fail properly with throwing 
exceptions. Any line or block you want to be cautious against and do error 
handling ?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -172,37 +177,47 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
-    // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
+    // Create S3 paths
+    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .collectAsList();
-    // Create S3 paths
-    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = 
serializableConfiguration.newCopy();
+            String bucket = row.getString(0);
+            String filePath = s3Prefix + bucket + "/" + row.getString(1);
+            try {
+              String decodeUrl = URLDecoder.decode(filePath, 
StandardCharsets.UTF_8.name());
+              if (checkExists) {
+                FileSystem fs = FSUtils.getFs(s3Prefix + bucket, 
configuration);
+                try {

Review Comment:
   Can you give me some examples of how it is handled elsewhere in Hudi ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to