leaves12138 commented on code in PR #2858:
URL: https://github.com/apache/paimon/pull/2858#discussion_r1601375417
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java:
##########
@@ -19,88 +19,68 @@
package org.apache.paimon.flink.source.operator;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.compact.MultiAwareBucketTableScan;
+import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamTableScan;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.table.data.RowData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-/** It is responsible for monitoring compactor source in streaming mode. */
-public class MultiTablesStreamingCompactorSourceFunction
- extends MultiTablesCompactorSourceFunction {
+import static
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED;
+import static
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY;
- private static final Logger LOG =
-
LoggerFactory.getLogger(MultiTablesStreamingCompactorSourceFunction.class);
+/** It is responsible for monitoring compactor source of multi bucket table in
stream mode. */
+public class CombinedAwareStreamingSourceFunction
+ extends CombinedCompactorSourceFunction<Tuple2<Split, String>> {
- public MultiTablesStreamingCompactorSourceFunction(
+ private final long monitorInterval;
+ private transient MultiTableScanBase<Tuple2<Split, String>> tableScanLogic;
+
+ public CombinedAwareStreamingSourceFunction(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
long monitorInterval) {
- super(
- catalogLoader,
- includingPattern,
- excludingPattern,
- databasePattern,
- true,
- monitorInterval);
+ super(catalogLoader, includingPattern, excludingPattern,
databasePattern, true);
+ this.monitorInterval = monitorInterval;
}
- @SuppressWarnings("BusyWait")
@Override
- public void run(SourceContext<Tuple2<Split, String>> ctx) throws Exception
{
- this.ctx = ctx;
- while (isRunning) {
- boolean isEmpty;
- synchronized (ctx.getCheckpointLock()) {
- if (!isRunning) {
- return;
- }
- try {
- // check for new tables
- updateTableMap();
-
- List<Tuple2<Split, String>> splits = new ArrayList<>();
- for (Map.Entry<Identifier, StreamTableScan> entry :
scansMap.entrySet()) {
- Identifier identifier = entry.getKey();
- StreamTableScan scan = entry.getValue();
- splits.addAll(
- scan.plan().splits().stream()
- .map(split -> new Tuple2<>(split,
identifier.getFullName()))
- .collect(Collectors.toList()));
- }
-
- isEmpty = splits.isEmpty();
- splits.forEach(ctx::collect);
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ tableScanLogic =
Review Comment:
just tableScan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]