This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9c07e10902 [core] Optimize Flink Orphan Files to do more works in
distributed (#5257)
9c07e10902 is described below
commit 9c07e10902f2eef665316cd06766bce1daf29f49
Author: jerry <[email protected]>
AuthorDate: Wed Mar 19 15:46:23 2025 +0800
[core] Optimize Flink Orphan Files to do more works in distributed (#5257)
---
.../apache/paimon/operation/OrphanFilesClean.java | 87 ++++++----
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 178 +++++++++++++++------
2 files changed, 190 insertions(+), 75 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index ee2007b52b..e768f87e59 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -132,32 +132,41 @@ public abstract class OrphanFilesClean implements
Serializable {
Consumer<Path> deletedFilesConsumer,
Consumer<Long> deletedFilesLenInBytesConsumer) {
for (String branch : branches) {
- FileStoreTable branchTable = table.switchToBranch(branch);
- SnapshotManager snapshotManager = branchTable.snapshotManager();
- ChangelogManager changelogManager = branchTable.changelogManager();
-
- // specially handle the snapshot directory
- List<Pair<Path, Long>> nonSnapshotFiles =
-
tryGetNonSnapshotFiles(snapshotManager.snapshotDirectory(), this::oldEnough);
- nonSnapshotFiles.forEach(
- nonSnapshotFile ->
- cleanFile(
- nonSnapshotFile,
- deletedFilesConsumer,
- deletedFilesLenInBytesConsumer));
-
- // specially handle the changelog directory
- List<Pair<Path, Long>> nonChangelogFiles =
-
tryGetNonChangelogFiles(changelogManager.changelogDirectory(), this::oldEnough);
- nonChangelogFiles.forEach(
- nonChangelogFile ->
- cleanFile(
- nonChangelogFile,
- deletedFilesConsumer,
- deletedFilesLenInBytesConsumer));
+ cleanBranchSnapshotDir(branch, deletedFilesConsumer,
deletedFilesLenInBytesConsumer);
}
}
+ protected void cleanBranchSnapshotDir(
+ String branch,
+ Consumer<Path> deletedFilesConsumer,
+ Consumer<Long> deletedFilesLenInBytesConsumer) {
+ LOG.info("Start to clean snapshot directory of branch {}.", branch);
+ FileStoreTable branchTable = table.switchToBranch(branch);
+ SnapshotManager snapshotManager = branchTable.snapshotManager();
+ ChangelogManager changelogManager = branchTable.changelogManager();
+
+ // specially handle the snapshot directory
+ List<Pair<Path, Long>> nonSnapshotFiles =
+ tryGetNonSnapshotFiles(snapshotManager.snapshotDirectory(),
this::oldEnough);
+ nonSnapshotFiles.forEach(
+ nonSnapshotFile ->
+ cleanFile(
+ nonSnapshotFile,
+ deletedFilesConsumer,
+ deletedFilesLenInBytesConsumer));
+
+ // specially handle the changelog directory
+ List<Pair<Path, Long>> nonChangelogFiles =
+ tryGetNonChangelogFiles(changelogManager.changelogDirectory(),
this::oldEnough);
+ nonChangelogFiles.forEach(
+ nonChangelogFile ->
+ cleanFile(
+ nonChangelogFile,
+ deletedFilesConsumer,
+ deletedFilesLenInBytesConsumer));
+ LOG.info("End to clean snapshot directory of branch {}.", branch);
+ }
+
private List<Pair<Path, Long>> tryGetNonSnapshotFiles(
Path snapshotDirectory, Predicate<FileStatus> fileStatusFilter) {
return listPathWithFilter(
@@ -323,22 +332,44 @@ public abstract class OrphanFilesClean implements
Serializable {
/** List directories that contains data files and manifest files. */
protected List<Path> listPaimonFileDirs() {
FileStorePathFactory pathFactory = table.store().pathFactory();
+ return listPaimonFileDirs(
+ table.fullName(),
+ pathFactory.manifestPath().toString(),
+ pathFactory.indexPath().toString(),
+ pathFactory.statisticsPath().toString(),
+ pathFactory.dataFilePath().toString(),
+ partitionKeysNum,
+ table.store().options().dataFileExternalPaths());
+ }
+ protected List<Path> listPaimonFileDirs(
+ String tableName,
+ String manifestPath,
+ String indexPath,
+ String statisticsPath,
+ String dataFilePath,
+ int partitionKeysNum,
+ String dataFileExternalPaths) {
+ LOG.info("Start: listing paimon file directories for table [{}]",
tableName);
+ long start = System.currentTimeMillis();
List<Path> paimonFileDirs = new ArrayList<>();
- paimonFileDirs.add(pathFactory.manifestPath());
- paimonFileDirs.add(pathFactory.indexPath());
- paimonFileDirs.add(pathFactory.statisticsPath());
- paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(),
partitionKeysNum));
+ paimonFileDirs.add(new Path(manifestPath));
+ paimonFileDirs.add(new Path(indexPath));
+ paimonFileDirs.add(new Path(statisticsPath));
+ paimonFileDirs.addAll(listFileDirs(new Path(dataFilePath),
partitionKeysNum));
// add external data paths
- String dataFileExternalPaths =
table.store().options().dataFileExternalPaths();
if (dataFileExternalPaths != null) {
String[] externalPathArr = dataFileExternalPaths.split(",");
for (String externalPath : externalPathArr) {
paimonFileDirs.addAll(listFileDirs(new Path(externalPath),
partitionKeysNum));
}
}
+ LOG.info(
+ "End list paimon file directories for table [{}] spend [{}]
ms",
+ tableName,
+ System.currentTimeMillis() - start);
return paimonFileDirs;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 39dce07c5e..e678d8f684 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -31,11 +31,14 @@ import org.apache.paimon.operation.CleanOrphanFilesResult;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
@@ -48,11 +51,14 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -70,6 +76,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Flink {@link OrphanFilesClean}, it will submit a job for a table. */
public class FlinkOrphanFilesClean extends OrphanFilesClean {
+ protected static final Logger LOG =
LoggerFactory.getLogger(FlinkOrphanFilesClean.class);
+
@Nullable protected final Integer parallelism;
public FlinkOrphanFilesClean(
@@ -93,17 +101,46 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
// Flink 1.17 introduced this config, use string to keep compatibility
flinkConf.setString("execution.batch.adaptive.auto-parallelism.enabled",
"false");
env.configure(flinkConf);
-
+ LOG.info("Starting orphan files clean for table {}", table.name());
+ long start = System.currentTimeMillis();
List<String> branches = validBranches();
+ LOG.info(
+ "End orphan files validBranches: spend [{}] ms",
+ System.currentTimeMillis() - start);
// snapshot and changelog files are the root of everything, so they
are handled specially
// here, and subsequently, we will not count their orphan files.
- AtomicLong deletedFilesCountInLocal = new AtomicLong(0);
- AtomicLong deletedFilesLenInBytesInLocal = new AtomicLong(0);
- cleanSnapshotDir(
- branches,
- path -> deletedFilesCountInLocal.incrementAndGet(),
- deletedFilesLenInBytesInLocal::addAndGet);
+ DataStream<CleanOrphanFilesResult> branchSnapshotDirDeleted =
+ env.fromCollection(branches)
+ .process(
+ new ProcessFunction<String, Tuple2<Long,
Long>>() {
+ @Override
+ public void processElement(
+ String branch,
+ ProcessFunction<String,
Tuple2<Long, Long>>.Context ctx,
+ Collector<Tuple2<Long, Long>> out)
+ throws Exception {
+ AtomicLong deletedFilesCount = new
AtomicLong(0);
+ AtomicLong deletedFilesLenInBytes =
new AtomicLong(0);
+ cleanBranchSnapshotDir(
+ branch,
+ path ->
deletedFilesCount.incrementAndGet(),
+
deletedFilesLenInBytes::addAndGet);
+ out.collect(
+ new Tuple2<>(
+
deletedFilesCount.get(),
+
deletedFilesLenInBytes.get()));
+ }
+ })
+ .keyBy(tuple -> 1)
+ .reduce(
+ (ReduceFunction<Tuple2<Long, Long>>)
+ (value1, value2) ->
+ new Tuple2<>(
+ value1.f0 + value2.f0,
+ value1.f1 + value2.f1))
+ .setParallelism(1)
+ .map(tuple -> new CleanOrphanFilesResult(tuple.f0,
tuple.f1));
// branch and manifest file
final OutputTag<Tuple2<String, String>> manifestOutputTag =
@@ -202,47 +239,102 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
});
usedFiles = usedFiles.union(usedManifestFiles);
-
- List<String> fileDirs =
- listPaimonFileDirs().stream()
- .map(Path::toUri)
- .map(Object::toString)
- .collect(Collectors.toList());
- DataStream<Pair<String, Long>> candidates =
- env.fromCollection(fileDirs)
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+ List<Tuple7<String, String, String, String, String, Integer, String>>
tablePaths =
+ Arrays.asList(
+ new Tuple7<>(
+ table.fullName(),
+ pathFactory.manifestPath().toString(),
+ pathFactory.indexPath().toString(),
+ pathFactory.statisticsPath().toString(),
+ pathFactory.dataFilePath().toString(),
+ partitionKeysNum,
+
table.store().options().dataFileExternalPaths()));
+ DataStream<Tuple2<String, Long>> candidates =
+ env.fromCollection(
+ tablePaths,
+ TypeInformation.of(
+ new TypeHint<
+ Tuple7<
+ String,
+ String,
+ String,
+ String,
+ String,
+ Integer,
+ String>>() {}))
.process(
- new ProcessFunction<String, Pair<String,
Long>>() {
+ new ProcessFunction<
+ Tuple7<
+ String,
+ String,
+ String,
+ String,
+ String,
+ Integer,
+ String>,
+ Tuple2<String, Long>>() {
@Override
public void processElement(
- String dir,
- ProcessFunction<String,
Pair<String, Long>>.Context ctx,
- Collector<Pair<String, Long>> out)
{
- for (FileStatus fileStatus :
- tryBestListingDirs(new
Path(dir))) {
- if (oldEnough(fileStatus)) {
- out.collect(
- Pair.of(
- fileStatus
-
.getPath()
-
.toUri()
-
.toString(),
-
fileStatus.getLen()));
+ Tuple7<
+ String,
+ String,
+ String,
+ String,
+ String,
+ Integer,
+ String>
+ paths,
+ ProcessFunction<
+ Tuple7<
+
String,
+
String,
+
String,
+
String,
+
String,
+
Integer,
+
String>,
+
Tuple2<String, Long>>
+ .Context
+ ctx,
+ Collector<Tuple2<String, Long>>
out) {
+ List<String> dirs =
+ listPaimonFileDirs(
+ paths.f0,
paths.f1, paths.f2,
+ paths.f3,
paths.f4, paths.f5,
+ paths.f6)
+ .stream()
+ .map(Path::toUri)
+ .map(Object::toString)
+
.collect(Collectors.toList());
+ for (String dir : dirs) {
+ for (FileStatus fileStatus :
+ tryBestListingDirs(new
Path(dir))) {
+ if (oldEnough(fileStatus)) {
+ out.collect(
+ new Tuple2(
+ fileStatus
+
.getPath()
+
.toUri()
+
.toString(),
+
fileStatus.getLen()));
+ }
}
}
}
- });
+ })
+ .setParallelism(1);
DataStream<CleanOrphanFilesResult> deleted =
usedFiles
.keyBy(f -> f)
.connect(
- candidates.keyBy(
- pathAndSize -> new
Path(pathAndSize.getKey()).getName()))
+ candidates.keyBy(pathAndSize -> new
Path(pathAndSize.f0).getName()))
.transform(
"files_join",
TypeInformation.of(CleanOrphanFilesResult.class),
new BoundedTwoInputOperator<
- String, Pair<String, Long>,
CleanOrphanFilesResult>() {
+ String, Tuple2<String, Long>,
CleanOrphanFilesResult>() {
private boolean buildEnd;
private long emittedFilesCount;
@@ -288,28 +380,20 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
@Override
public void processElement2(
- StreamRecord<Pair<String, Long>>
element) {
+ StreamRecord<Tuple2<String, Long>>
element) {
checkState(buildEnd, "Should build
ended.");
- Pair<String, Long> fileInfo =
element.getValue();
- String value = fileInfo.getLeft();
+ Tuple2<String, Long> fileInfo =
element.getValue();
+ String value = fileInfo.f0;
Path path = new Path(value);
if (!used.contains(path.getName())) {
emittedFilesCount++;
- emittedFilesLen +=
fileInfo.getRight();
+ emittedFilesLen += fileInfo.f1;
cleanFile(path);
LOG.info("Dry clean: {}", path);
}
}
});
-
- if (deletedFilesCountInLocal.get() != 0 ||
deletedFilesLenInBytesInLocal.get() != 0) {
- deleted =
- deleted.union(
- env.fromElements(
- new CleanOrphanFilesResult(
- deletedFilesCountInLocal.get(),
-
deletedFilesLenInBytesInLocal.get())));
- }
+ deleted = deleted.union(branchSnapshotDirDeleted);
return deleted;
}