xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r927845153
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
return Pair.of(Option.empty(),
queryTypeAndInstantEndpts.getRight().getRight());
}
- String filter = "s3.object.size > 0";
- if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX,
null))) {
- filter = filter + " and s3.object.key like '" +
props.getString(Config.S3_KEY_PREFIX) + "%'";
- }
- if
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX,
null))) {
- filter = filter + " and s3.object.key not like '" +
props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
- }
- if
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING,
null))) {
- filter = filter + " and s3.object.key not like '%" +
props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
- }
- // add file format filtering by default
- filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+ Column filterColumn = s3EventsColumnFilter(fileFormat);
String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";
// Extract distinct file keys from s3 meta hoodie table
- final List<Row> cloudMetaDf = source
- .filter(filter)
- .select("s3.bucket.name", "s3.object.key")
- .distinct()
- .collectAsList();
// Create S3 paths
final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK,
Config.DEFAULT_ENABLE_EXISTS_CHECK);
- List<String> cloudFiles = new ArrayList<>();
- for (Row row : cloudMetaDf) {
- // construct file path, row index 0 refers to bucket and 1 refers to key
- String bucket = row.getString(0);
- String filePath = s3Prefix + bucket + "/" + row.getString(1);
- if (checkExists) {
- FileSystem fs = FSUtils.getFs(s3Prefix + bucket,
sparkSession.sparkContext().hadoopConfiguration());
- try {
- if (fs.exists(new Path(filePath))) {
- cloudFiles.add(filePath);
- }
- } catch (IOException e) {
- LOG.error(String.format("Error while checking path exists for %s ",
filePath), e);
- }
- } else {
- cloudFiles.add(filePath);
- }
- }
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(sparkContext.hadoopConfiguration());
+ List<String> cloudFiles = source
+ .filter(filterColumn)
+ .select("s3.bucket.name", "s3.object.key")
+ .distinct()
+ .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
Review Comment:
why convert to RDD? you should be able to do mapPartitions with Dataset too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]