This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d6f656e5bb [HUDI-5927] Improve parallelism of deleting invalid files
(#8172)
0d6f656e5bb is described below
commit 0d6f656e5bb8a8acf75e7d7e8c3485e3748e06be
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Mar 14 11:57:45 2023 -0700
[HUDI-5927] Improve parallelism of deleting invalid files (#8172)
This commit improves the parallelism of deleting invalid files when
finalizing the write, so that the file deletion is parallelized at the file
level instead of the partition level.
---
.../java/org/apache/hudi/table/HoodieTable.java | 32 ++++++++++------------
1 file changed, 15 insertions(+), 17 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8b1056bca6c..9800cf268ac 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -94,6 +94,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -660,23 +661,20 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
private void deleteInvalidFilesByPartitions(HoodieEngineContext context,
Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid
files generated during the write operation: " + config.getTableName());
- context.map(new ArrayList<>(invalidFilesByPartition.values()),
partitionWithFileList -> {
- final FileSystem fileSystem = metaClient.getFs();
- LOG.info("Deleting invalid data files=" + partitionWithFileList);
- if (partitionWithFileList.isEmpty()) {
- return true;
- }
- // Delete
- partitionWithFileList.stream().map(Pair::getValue).forEach(file -> {
- try {
- fileSystem.delete(new Path(file), false);
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- });
-
- return true;
- }, config.getFinalizeWriteParallelism());
+ context.map(invalidFilesByPartition.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ partitionFilePair -> {
+ final FileSystem fileSystem = metaClient.getFs();
+ LOG.info("Deleting invalid data file=" + partitionFilePair);
+ // Delete
+ try {
+ fileSystem.delete(new Path(partitionFilePair.getValue()), false);
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ return true;
+ }, config.getFinalizeWriteParallelism());
}
/**