yuzelin commented on code in PR #6551:
URL: https://github.com/apache/paimon/pull/6551#discussion_r2523146157


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/CombinedFlinkOrphanFilesClean.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.orphan;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.flink.utils.BoundedTwoInputOperator;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
+import org.apache.paimon.operation.OrphanFilesClean;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Flink {@link OrphanFilesClean}, it will submit a job for multiple tables in 
a combined
+ * DataStream.
+ */
+public class CombinedFlinkOrphanFilesClean<T extends FlinkOrphanFilesClean>
+        extends FlinkOrphanFilesClean {
+
+    protected static final Logger LOG =
+            LoggerFactory.getLogger(CombinedFlinkOrphanFilesClean.class);
+
+    protected String databaseName;
+    protected List<T> cleaners;
+    // Map to store cleaners by their full identifier name, for quick lookup 
in ProcessFunctions
+    protected Map<String, T> cleanerMap;
+
+    public CombinedFlinkOrphanFilesClean(
+            String databaseName,
+            List<T> cleaners,
+            long olderThanMillis,
+            boolean dryRun,
+            @Nullable Integer parallelism) {
+        super(cleaners.get(0).getTable(), olderThanMillis, dryRun, 
parallelism);
+        this.databaseName = databaseName;
+        this.cleaners = cleaners;
+        // Initialize cleanerMap for quick lookup
+        this.cleanerMap = new HashMap<>();
+        for (T cleaner : cleaners) {
+            FileStoreTable table = cleaner.getTable();
+            Identifier id = table.catalogEnvironment().identifier();
+            if (id != null) {
+                this.cleanerMap.put(id.getFullName(), cleaner);
+            }
+        }
+    }
+
+    protected DataStream<CleanOrphanFilesResult> 
buildBranchSnapshotDirDeletedStream(
+            StreamExecutionEnvironment env, List<BranchTableInfo> 
branchTableInfos) {
+        return env.fromCollection(branchTableInfos)
+                .process(
+                        new ProcessFunction<BranchTableInfo, Tuple2<Long, 
Long>>() {
+
+                            @Override
+                            public void processElement(
+                                    BranchTableInfo branchTableInfo,
+                                    ProcessFunction<BranchTableInfo, 
Tuple2<Long, Long>>.Context
+                                            ctx,
+                                    Collector<Tuple2<Long, Long>> out) {
+                                T cleaner = 
getCleanerForTable(branchTableInfo);
+                                cleaner.processForBranchSnapshotDirDeleted(
+                                        branchTableInfo.getBranch(), out);
+                            }
+                        })
+                .keyBy(tuple -> 1)
+                .reduce(
+                        (ReduceFunction<Tuple2<Long, Long>>)
+                                (value1, value2) ->
+                                        new Tuple2<>(value1.f0 + value2.f0, 
value1.f1 + value2.f1))
+                .setParallelism(1)
+                .map(tuple -> new CleanOrphanFilesResult(tuple.f0, tuple.f1));
+    }
+
+    @Nullable
+    @Override
+    public DataStream<CleanOrphanFilesResult> 
doOrphanClean(StreamExecutionEnvironment env) {
+        configureFlinkEnvironment(env);
+        LOG.info("Starting orphan files clean for {} tables: {}", 
cleaners.size(), cleaners);
+        List<BranchTableInfo> branchTableInfos = new ArrayList<>();
+        long start = System.currentTimeMillis();
+        for (T cleaner : cleaners) {
+            FileStoreTable table = cleaner.getTable();
+            Identifier identifier = table.catalogEnvironment().identifier();
+            if (identifier == null) {

Review Comment:
   Why not pass identifier from ActionFactory? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to