nsivabalan commented on a change in pull request #4838:
URL: https://github.com/apache/hudi/pull/4838#discussion_r808974095



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
##########
@@ -106,8 +110,8 @@ public S3EventsHoodieIncrSource(
             sparkContext, srcPath, numInstantsPerFetch, beginInstant, 
missingCheckpointStrategy);
 
     if 
(queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue()))
 {
-      LOG.warn("Already caught up. Begin Checkpoint was :" + 
queryTypeAndInstantEndpts.getKey());
-      return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey());
+      LOG.warn("Already caught up. Begin Checkpoint was :" + 
queryTypeAndInstantEndpts.getValue().getKey());

Review comment:
       probably we can make this a separate PR and add tests. I can take it up 
if need be. Can you remove that from this PR.  

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
##########
@@ -174,7 +178,27 @@ public S3EventsHoodieIncrSource(
     }
     Option<Dataset<Row>> dataset = Option.empty();
     if (!cloudFiles.isEmpty()) {
+      Log.info("Number of files to be processed=" + cloudFiles.size());
       dataset = 
Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new 
String[0])));
+      if (props.containsKey(HoodieIncrSource.Config.DROP_COLUMNS)) {
+        String dropColumns = 
props.getString(HoodieIncrSource.Config.DROP_COLUMNS);
+        dataset = dataset.map(ds -> ds.drop(dropColumns.split(",")));
+      }
+      if (props.containsKey(HoodieIncrSource.Config.CAST_TO_STRING)) {

Review comment:
       in general this should go in as a general feature support to any 
detlastreamer sources. Lets think about what it takes to add such feature. If 
its not feasible, we can add it just incr source. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to