lokesh-lingarajan-0310 commented on code in PR #9433:
URL: https://github.com/apache/hudi/pull/9433#discussion_r1294712040
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -157,26 +157,24 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastChec
}
Dataset<Row> source = queryRunner.run(queryInfo);
- if (source.isEmpty()) {
- LOG.info("Source of file names is empty. Returning empty result and
endInstant: "
- + queryInfo.getEndInstant());
- return Pair.of(Option.empty(), queryInfo.getEndInstant());
- }
-
Dataset<Row> filteredSourceData = applyFilter(source, fileFormat);
LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
- Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset =
+ Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
filteredSourceData, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
+ if (!checkPointAndDataset.getRight().isPresent()) {
+ LOG.info("Empty source, returning endpoint:" +
queryInfo.getEndInstant());
+ return Pair.of(Option.empty(), queryInfo.getEndInstant());
+ }
LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX,
true).toLowerCase();
String s3Prefix = s3FS + "://";
// Create S3 paths
SerializableConfiguration serializableHadoopConf = new
SerializableConfiguration(sparkContext.hadoopConfiguration());
- List<CloudObjectMetadata> cloudObjectMetadata =
checkPointAndDataset.getRight()
+ List<CloudObjectMetadata> cloudObjectMetadata =
checkPointAndDataset.getRight().get()
Review Comment:
we are doing that in line 166
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]