This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c07e10902 [core] Optimize Flink Orphan Files to do more works in 
distributed  (#5257)
9c07e10902 is described below

commit 9c07e10902f2eef665316cd06766bce1daf29f49
Author: jerry <[email protected]>
AuthorDate: Wed Mar 19 15:46:23 2025 +0800

    [core] Optimize Flink Orphan Files to do more works in distributed  (#5257)
---
 .../apache/paimon/operation/OrphanFilesClean.java  |  87 ++++++----
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java | 178 +++++++++++++++------
 2 files changed, 190 insertions(+), 75 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index ee2007b52b..e768f87e59 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -132,32 +132,41 @@ public abstract class OrphanFilesClean implements 
Serializable {
             Consumer<Path> deletedFilesConsumer,
             Consumer<Long> deletedFilesLenInBytesConsumer) {
         for (String branch : branches) {
-            FileStoreTable branchTable = table.switchToBranch(branch);
-            SnapshotManager snapshotManager = branchTable.snapshotManager();
-            ChangelogManager changelogManager = branchTable.changelogManager();
-
-            // specially handle the snapshot directory
-            List<Pair<Path, Long>> nonSnapshotFiles =
-                    
tryGetNonSnapshotFiles(snapshotManager.snapshotDirectory(), this::oldEnough);
-            nonSnapshotFiles.forEach(
-                    nonSnapshotFile ->
-                            cleanFile(
-                                    nonSnapshotFile,
-                                    deletedFilesConsumer,
-                                    deletedFilesLenInBytesConsumer));
-
-            // specially handle the changelog directory
-            List<Pair<Path, Long>> nonChangelogFiles =
-                    
tryGetNonChangelogFiles(changelogManager.changelogDirectory(), this::oldEnough);
-            nonChangelogFiles.forEach(
-                    nonChangelogFile ->
-                            cleanFile(
-                                    nonChangelogFile,
-                                    deletedFilesConsumer,
-                                    deletedFilesLenInBytesConsumer));
+            cleanBranchSnapshotDir(branch, deletedFilesConsumer, 
deletedFilesLenInBytesConsumer);
         }
     }
 
+    protected void cleanBranchSnapshotDir(
+            String branch,
+            Consumer<Path> deletedFilesConsumer,
+            Consumer<Long> deletedFilesLenInBytesConsumer) {
+        LOG.info("Start to clean snapshot directory of branch {}.", branch);
+        FileStoreTable branchTable = table.switchToBranch(branch);
+        SnapshotManager snapshotManager = branchTable.snapshotManager();
+        ChangelogManager changelogManager = branchTable.changelogManager();
+
+        // specially handle the snapshot directory
+        List<Pair<Path, Long>> nonSnapshotFiles =
+                tryGetNonSnapshotFiles(snapshotManager.snapshotDirectory(), 
this::oldEnough);
+        nonSnapshotFiles.forEach(
+                nonSnapshotFile ->
+                        cleanFile(
+                                nonSnapshotFile,
+                                deletedFilesConsumer,
+                                deletedFilesLenInBytesConsumer));
+
+        // specially handle the changelog directory
+        List<Pair<Path, Long>> nonChangelogFiles =
+                tryGetNonChangelogFiles(changelogManager.changelogDirectory(), 
this::oldEnough);
+        nonChangelogFiles.forEach(
+                nonChangelogFile ->
+                        cleanFile(
+                                nonChangelogFile,
+                                deletedFilesConsumer,
+                                deletedFilesLenInBytesConsumer));
+        LOG.info("End to clean snapshot directory of branch {}.", branch);
+    }
+
     private List<Pair<Path, Long>> tryGetNonSnapshotFiles(
             Path snapshotDirectory, Predicate<FileStatus> fileStatusFilter) {
         return listPathWithFilter(
@@ -323,22 +332,44 @@ public abstract class OrphanFilesClean implements 
Serializable {
     /** List directories that contains data files and manifest files. */
     protected List<Path> listPaimonFileDirs() {
         FileStorePathFactory pathFactory = table.store().pathFactory();
+        return listPaimonFileDirs(
+                table.fullName(),
+                pathFactory.manifestPath().toString(),
+                pathFactory.indexPath().toString(),
+                pathFactory.statisticsPath().toString(),
+                pathFactory.dataFilePath().toString(),
+                partitionKeysNum,
+                table.store().options().dataFileExternalPaths());
+    }
 
+    protected List<Path> listPaimonFileDirs(
+            String tableName,
+            String manifestPath,
+            String indexPath,
+            String statisticsPath,
+            String dataFilePath,
+            int partitionKeysNum,
+            String dataFileExternalPaths) {
+        LOG.info("Start: listing paimon file directories for table [{}]", 
tableName);
+        long start = System.currentTimeMillis();
         List<Path> paimonFileDirs = new ArrayList<>();
 
-        paimonFileDirs.add(pathFactory.manifestPath());
-        paimonFileDirs.add(pathFactory.indexPath());
-        paimonFileDirs.add(pathFactory.statisticsPath());
-        paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), 
partitionKeysNum));
+        paimonFileDirs.add(new Path(manifestPath));
+        paimonFileDirs.add(new Path(indexPath));
+        paimonFileDirs.add(new Path(statisticsPath));
+        paimonFileDirs.addAll(listFileDirs(new Path(dataFilePath), 
partitionKeysNum));
 
         // add external data paths
-        String dataFileExternalPaths = 
table.store().options().dataFileExternalPaths();
         if (dataFileExternalPaths != null) {
             String[] externalPathArr = dataFileExternalPaths.split(",");
             for (String externalPath : externalPathArr) {
                 paimonFileDirs.addAll(listFileDirs(new Path(externalPath), 
partitionKeysNum));
             }
         }
+        LOG.info(
+                "End list paimon file directories for table [{}] spend [{}] 
ms",
+                tableName,
+                System.currentTimeMillis() - start);
         return paimonFileDirs;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 39dce07c5e..e678d8f684 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -31,11 +31,14 @@ import org.apache.paimon.operation.CleanOrphanFilesResult;
 import org.apache.paimon.operation.OrphanFilesClean;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.FileStorePathFactory;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ExecutionOptions;
@@ -48,11 +51,14 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -70,6 +76,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Flink {@link OrphanFilesClean}, it will submit a job for a table. */
 public class FlinkOrphanFilesClean extends OrphanFilesClean {
 
+    protected static final Logger LOG = 
LoggerFactory.getLogger(FlinkOrphanFilesClean.class);
+
     @Nullable protected final Integer parallelism;
 
     public FlinkOrphanFilesClean(
@@ -93,17 +101,46 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
         // Flink 1.17 introduced this config, use string to keep compatibility
         
flinkConf.setString("execution.batch.adaptive.auto-parallelism.enabled", 
"false");
         env.configure(flinkConf);
-
+        LOG.info("Starting orphan files clean for table {}", table.name());
+        long start = System.currentTimeMillis();
         List<String> branches = validBranches();
+        LOG.info(
+                "End orphan files validBranches: spend [{}] ms",
+                System.currentTimeMillis() - start);
 
         // snapshot and changelog files are the root of everything, so they 
are handled specially
         // here, and subsequently, we will not count their orphan files.
-        AtomicLong deletedFilesCountInLocal = new AtomicLong(0);
-        AtomicLong deletedFilesLenInBytesInLocal = new AtomicLong(0);
-        cleanSnapshotDir(
-                branches,
-                path -> deletedFilesCountInLocal.incrementAndGet(),
-                deletedFilesLenInBytesInLocal::addAndGet);
+        DataStream<CleanOrphanFilesResult> branchSnapshotDirDeleted =
+                env.fromCollection(branches)
+                        .process(
+                                new ProcessFunction<String, Tuple2<Long, 
Long>>() {
+                                    @Override
+                                    public void processElement(
+                                            String branch,
+                                            ProcessFunction<String, 
Tuple2<Long, Long>>.Context ctx,
+                                            Collector<Tuple2<Long, Long>> out)
+                                            throws Exception {
+                                        AtomicLong deletedFilesCount = new 
AtomicLong(0);
+                                        AtomicLong deletedFilesLenInBytes = 
new AtomicLong(0);
+                                        cleanBranchSnapshotDir(
+                                                branch,
+                                                path -> 
deletedFilesCount.incrementAndGet(),
+                                                
deletedFilesLenInBytes::addAndGet);
+                                        out.collect(
+                                                new Tuple2<>(
+                                                        
deletedFilesCount.get(),
+                                                        
deletedFilesLenInBytes.get()));
+                                    }
+                                })
+                        .keyBy(tuple -> 1)
+                        .reduce(
+                                (ReduceFunction<Tuple2<Long, Long>>)
+                                        (value1, value2) ->
+                                                new Tuple2<>(
+                                                        value1.f0 + value2.f0,
+                                                        value1.f1 + value2.f1))
+                        .setParallelism(1)
+                        .map(tuple -> new CleanOrphanFilesResult(tuple.f0, 
tuple.f1));
 
         // branch and manifest file
         final OutputTag<Tuple2<String, String>> manifestOutputTag =
@@ -202,47 +239,102 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                                 });
 
         usedFiles = usedFiles.union(usedManifestFiles);
-
-        List<String> fileDirs =
-                listPaimonFileDirs().stream()
-                        .map(Path::toUri)
-                        .map(Object::toString)
-                        .collect(Collectors.toList());
-        DataStream<Pair<String, Long>> candidates =
-                env.fromCollection(fileDirs)
+        FileStorePathFactory pathFactory = table.store().pathFactory();
+        List<Tuple7<String, String, String, String, String, Integer, String>> 
tablePaths =
+                Arrays.asList(
+                        new Tuple7<>(
+                                table.fullName(),
+                                pathFactory.manifestPath().toString(),
+                                pathFactory.indexPath().toString(),
+                                pathFactory.statisticsPath().toString(),
+                                pathFactory.dataFilePath().toString(),
+                                partitionKeysNum,
+                                
table.store().options().dataFileExternalPaths()));
+        DataStream<Tuple2<String, Long>> candidates =
+                env.fromCollection(
+                                tablePaths,
+                                TypeInformation.of(
+                                        new TypeHint<
+                                                Tuple7<
+                                                        String,
+                                                        String,
+                                                        String,
+                                                        String,
+                                                        String,
+                                                        Integer,
+                                                        String>>() {}))
                         .process(
-                                new ProcessFunction<String, Pair<String, 
Long>>() {
+                                new ProcessFunction<
+                                        Tuple7<
+                                                String,
+                                                String,
+                                                String,
+                                                String,
+                                                String,
+                                                Integer,
+                                                String>,
+                                        Tuple2<String, Long>>() {
                                     @Override
                                     public void processElement(
-                                            String dir,
-                                            ProcessFunction<String, 
Pair<String, Long>>.Context ctx,
-                                            Collector<Pair<String, Long>> out) 
{
-                                        for (FileStatus fileStatus :
-                                                tryBestListingDirs(new 
Path(dir))) {
-                                            if (oldEnough(fileStatus)) {
-                                                out.collect(
-                                                        Pair.of(
-                                                                fileStatus
-                                                                        
.getPath()
-                                                                        
.toUri()
-                                                                        
.toString(),
-                                                                
fileStatus.getLen()));
+                                            Tuple7<
+                                                            String,
+                                                            String,
+                                                            String,
+                                                            String,
+                                                            String,
+                                                            Integer,
+                                                            String>
+                                                    paths,
+                                            ProcessFunction<
+                                                                    Tuple7<
+                                                                            
String,
+                                                                            
String,
+                                                                            
String,
+                                                                            
String,
+                                                                            
String,
+                                                                            
Integer,
+                                                                            
String>,
+                                                                    
Tuple2<String, Long>>
+                                                            .Context
+                                                    ctx,
+                                            Collector<Tuple2<String, Long>> 
out) {
+                                        List<String> dirs =
+                                                listPaimonFileDirs(
+                                                                paths.f0, 
paths.f1, paths.f2,
+                                                                paths.f3, 
paths.f4, paths.f5,
+                                                                paths.f6)
+                                                        .stream()
+                                                        .map(Path::toUri)
+                                                        .map(Object::toString)
+                                                        
.collect(Collectors.toList());
+                                        for (String dir : dirs) {
+                                            for (FileStatus fileStatus :
+                                                    tryBestListingDirs(new 
Path(dir))) {
+                                                if (oldEnough(fileStatus)) {
+                                                    out.collect(
+                                                            new Tuple2(
+                                                                    fileStatus
+                                                                            
.getPath()
+                                                                            
.toUri()
+                                                                            
.toString(),
+                                                                    
fileStatus.getLen()));
+                                                }
                                             }
                                         }
                                     }
-                                });
+                                })
+                        .setParallelism(1);
 
         DataStream<CleanOrphanFilesResult> deleted =
                 usedFiles
                         .keyBy(f -> f)
                         .connect(
-                                candidates.keyBy(
-                                        pathAndSize -> new 
Path(pathAndSize.getKey()).getName()))
+                                candidates.keyBy(pathAndSize -> new 
Path(pathAndSize.f0).getName()))
                         .transform(
                                 "files_join",
                                 
TypeInformation.of(CleanOrphanFilesResult.class),
                                 new BoundedTwoInputOperator<
-                                        String, Pair<String, Long>, 
CleanOrphanFilesResult>() {
+                                        String, Tuple2<String, Long>, 
CleanOrphanFilesResult>() {
 
                                     private boolean buildEnd;
                                     private long emittedFilesCount;
@@ -288,28 +380,20 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
 
                                     @Override
                                     public void processElement2(
-                                            StreamRecord<Pair<String, Long>> 
element) {
+                                            StreamRecord<Tuple2<String, Long>> 
element) {
                                         checkState(buildEnd, "Should build 
ended.");
-                                        Pair<String, Long> fileInfo = 
element.getValue();
-                                        String value = fileInfo.getLeft();
+                                        Tuple2<String, Long> fileInfo = 
element.getValue();
+                                        String value = fileInfo.f0;
                                         Path path = new Path(value);
                                         if (!used.contains(path.getName())) {
                                             emittedFilesCount++;
-                                            emittedFilesLen += 
fileInfo.getRight();
+                                            emittedFilesLen += fileInfo.f1;
                                             cleanFile(path);
                                             LOG.info("Dry clean: {}", path);
                                         }
                                     }
                                 });
-
-        if (deletedFilesCountInLocal.get() != 0 || 
deletedFilesLenInBytesInLocal.get() != 0) {
-            deleted =
-                    deleted.union(
-                            env.fromElements(
-                                    new CleanOrphanFilesResult(
-                                            deletedFilesCountInLocal.get(),
-                                            
deletedFilesLenInBytesInLocal.get())));
-        }
+        deleted = deleted.union(branchSnapshotDirDeleted);
 
         return deleted;
     }

Reply via email to