xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r927831833
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
* - --hoodie-conf
hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
*/
static final String SPARK_DATASOURCE_OPTIONS =
"hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+ // ToDo make it a list of extensions
+ static final String S3_ACTUAL_FILE_EXTENSIONS =
"hoodie.deltastreamer.source.s3incr.file.extensions";
+
+ static final String ATTACH_SOURCE_PARTITION_COLUMN =
"hoodie.deltastreamer.source.s3incr.source.partition.exists";
Review Comment:
ditto. same naming rules here
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
return Pair.of(Option.empty(),
queryTypeAndInstantEndpts.getRight().getRight());
}
- String filter = "s3.object.size > 0";
- if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX,
null))) {
- filter = filter + " and s3.object.key like '" +
props.getString(Config.S3_KEY_PREFIX) + "%'";
- }
- if
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX,
null))) {
- filter = filter + " and s3.object.key not like '" +
props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
- }
- if
(!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING,
null))) {
- filter = filter + " and s3.object.key not like '%" +
props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
- }
- // add file format filtering by default
- filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+ Column filterColumn = s3EventsColumnFilter(fileFormat);
String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";
// Extract distinct file keys from s3 meta hoodie table
- final List<Row> cloudMetaDf = source
- .filter(filter)
- .select("s3.bucket.name", "s3.object.key")
- .distinct()
- .collectAsList();
// Create S3 paths
final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK,
Config.DEFAULT_ENABLE_EXISTS_CHECK);
- List<String> cloudFiles = new ArrayList<>();
- for (Row row : cloudMetaDf) {
- // construct file path, row index 0 refers to bucket and 1 refers to key
- String bucket = row.getString(0);
- String filePath = s3Prefix + bucket + "/" + row.getString(1);
- if (checkExists) {
- FileSystem fs = FSUtils.getFs(s3Prefix + bucket,
sparkSession.sparkContext().hadoopConfiguration());
- try {
- if (fs.exists(new Path(filePath))) {
- cloudFiles.add(filePath);
- }
- } catch (IOException e) {
- LOG.error(String.format("Error while checking path exists for %s ",
filePath), e);
- }
- } else {
- cloudFiles.add(filePath);
- }
- }
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(sparkContext.hadoopConfiguration());
+ List<String> cloudFiles = source
+ .filter(filterColumn)
+ .select("s3.bucket.name", "s3.object.key")
+ .distinct()
+ .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+ List<String> cloudFilesPerPartition = new ArrayList<>();
+ fileListIterator.forEachRemaining(row -> {
+ final Configuration configuration =
serializableConfiguration.newCopy();
+ String bucket = row.getString(0);
+ String filePath = s3Prefix + bucket + "/" + row.getString(1);
+ try {
+ String decodeUrl = URLDecoder.decode(filePath,
StandardCharsets.UTF_8.name());
+ if (checkExists) {
+ FileSystem fs = FSUtils.getFs(s3Prefix + bucket,
configuration);
+ try {
+ if (fs.exists(new Path(decodeUrl))) {
Review Comment:
creating hadoop Path gives much more memory overhead than normal
instantiation. If just for checking, let's find a better way.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
* - --hoodie-conf
hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
*/
static final String SPARK_DATASOURCE_OPTIONS =
"hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+ // ToDo make it a list of extensions
+ static final String S3_ACTUAL_FILE_EXTENSIONS =
"hoodie.deltastreamer.source.s3incr.file.extensions";
Review Comment:
```suggestion
static final String S3INCR_FILE_EXTENSIONS_OPTIONS =
"hoodie.deltastreamer.source.s3incr.file.extensions";
```
should align with the actual key, and suffix `OPTIONS` since it is a key not
the extensions
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String
fileFormat) {
return dataFrameReader;
}
+ private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+ if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN,
Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+ &&
!StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()))
{
+ String partitionKey =
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+ String partitionPathPattern = String.format("%s=",partitionKey);
+ String filePath = cloudFiles.get(0);
+ List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+ .filter(level ->
level.contains(partitionPathPattern)).collect(Collectors.toList());
+ if (nestedPartition.size() > 1) {
+ throw new HoodieException("More than one level of partitioning
exists");
Review Comment:
multiple level partition is very common. so this is a major limitation? if
push this out, how would it affect existing users?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]