XiaoHongbo-Hope commented on code in PR #6958: URL: https://github.com/apache/paimon/pull/6958#discussion_r2678631097
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java:
##########
@@ -235,28 +242,107 @@ public void endInput() throws IOException {
});
usedFiles = usedFiles.union(usedManifestFiles);
- DataStream<Tuple2<String, Long>> candidates =
+
+ final OutputTag<Path> emptyDirOutputTag = new
OutputTag<Path>("empty-dir-output") {};
+ SingleOutputStreamOperator<Tuple2<String, Long>> candidates =
env.fromCollection(Collections.singletonList(1),
TypeInformation.of(Integer.class))
.process(
- new ProcessFunction<Integer, Tuple2<String,
Long>>() {
+ new ProcessFunction<Integer, String>() {
@Override
public void processElement(
Integer i,
- ProcessFunction<Integer,
Tuple2<String, Long>>.Context
+ ProcessFunction<Integer,
String>.Context ctx,
+ Collector<String> out) {
+ FileStorePathFactory pathFactory =
+ table.store().pathFactory();
+ listPaimonFileDirs(
+ table.fullName(),
+
pathFactory.manifestPath().toString(),
+
pathFactory.indexPath().toString(),
+
pathFactory.statisticsPath().toString(),
+
pathFactory.dataFilePath().toString(),
+ partitionKeysNum,
+
table.coreOptions().dataFileExternalPaths())
+ .stream()
+ .map(Path::toUri)
+ .map(Object::toString)
+ .forEach(out::collect);
+ }
+ })
+ .name("list-dirs")
+ .forceNonParallel()
+ .process(
+ new ProcessFunction<String, Tuple2<String,
Long>>() {
+ @Override
+ public void processElement(
+ String dir,
+ ProcessFunction<String,
Tuple2<String, Long>>.Context
ctx,
Collector<Tuple2<String, Long>>
out) {
- listPaimonFilesForTable(out);
+ Path dirPath = new Path(dir);
+ List<FileStatus> files =
tryBestListingDirs(dirPath);
+ for (FileStatus file : files) {
+ if (oldEnough(file)) {
+ out.collect(
+ Tuple2.of(
+
file.getPath().toUri().toString(),
+
file.getLen()));
+ }
+ }
+ if (files.isEmpty()) {
+ ctx.output(emptyDirOutputTag,
dirPath);
+ }
}
})
- .setParallelism(1);
+ .name("collect-candidate-files");
+
+ candidates
+ .getSideOutput(emptyDirOutputTag)
+ .transform(
+ "clean-empty-dirs",
+ STRING_TYPE_INFO,
+ new BoundedOneInputOperator<Path, String>() {
+
+ private Set<Path> emptyDirs = new HashSet<>();
+
+ @Override
+ public void processElement(StreamRecord<Path>
element) {
+ emptyDirs.add(element.getValue());
+ }
+
+ @Override
+ public void endInput() throws IOException {
+ // delete empty dir
+ while (!emptyDirs.isEmpty()) {
+ Set<Path> newEmptyDir = new HashSet<>();
+ for (Path emptyDir : emptyDirs) {
+ try {
+ if (fileIO.delete(emptyDir,
false)) {
+ output.collect(
+ new
StreamRecord<>(emptyDir.toString()));
+ // recursive cleaning
+
newEmptyDir.add(emptyDir.getParent());
+ }
+ } catch (IOException ignored) {
Review Comment:
We can add a log here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
