leaves12138 commented on code in PR #2858:
URL: https://github.com/apache/paimon/pull/2858#discussion_r1601373535


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java:
##########
@@ -39,65 +39,53 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
-/** It is responsible for monitoring compactor source in batch mode. */
-public class MultiTablesBatchCompactorSourceFunction extends 
MultiTablesCompactorSourceFunction {
+import static 
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED;
+import static 
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY;
 
-    private static final Logger LOG =
-            
LoggerFactory.getLogger(MultiTablesBatchCompactorSourceFunction.class);
+/** It is responsible for monitoring compactor source of aware bucket table in 
batch mode. */
+public class CombinedAwareBatchSourceFunction
+        extends CombinedCompactorSourceFunction<Tuple2<Split, String>> {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(CombinedAwareBatchSourceFunction.class);
 
-    public MultiTablesBatchCompactorSourceFunction(
+    private MultiTableScanBase<Tuple2<Split, String>> tableScanLogic;
+
+    public CombinedAwareBatchSourceFunction(
             Catalog.Loader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
-            Pattern databasePattern,
-            long monitorInterval) {
-        super(
-                catalogLoader,
-                includingPattern,
-                excludingPattern,
-                databasePattern,
-                false,
-                monitorInterval);
+            Pattern databasePattern) {
+        super(catalogLoader, includingPattern, excludingPattern, 
databasePattern, false);
     }
 
     @Override
-    public void run(SourceContext<Tuple2<Split, String>> ctx) throws Exception 
{
-        this.ctx = ctx;
-        if (isRunning) {
-            boolean isEmpty;
-            synchronized (ctx.getCheckpointLock()) {
-                if (!isRunning) {
-                    return;
-                }
-                try {
-                    // batch mode do not need check for new tables
-                    List<Tuple2<Split, String>> splits = new ArrayList<>();
-                    for (Map.Entry<Identifier, StreamTableScan> entry : 
scansMap.entrySet()) {
-                        Identifier identifier = entry.getKey();
-                        StreamTableScan scan = entry.getValue();
-                        splits.addAll(
-                                scan.plan().splits().stream()
-                                        .map(split -> new Tuple2<>(split, 
identifier.getFullName()))
-                                        .collect(Collectors.toList()));
-                    }
-
-                    isEmpty = splits.isEmpty();
-                    splits.forEach(ctx::collect);
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        tableScanLogic =

Review Comment:
   just tableScan?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to