lvyanquan commented on code in PR #6606:
URL: https://github.com/apache/paimon/pull/6606#discussion_r2554556784


##########
docs/layouts/shortcodes/generated/cdc_configuration.html:
##########
@@ -0,0 +1,48 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>database</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Name of the database to be scanned. By default, all databases 
will be scanned.</td>
+        </tr>
+        <tr>
+            <td><h5>table</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Name of the table to be scanned. By default, all tables will 
be scanned.</td>
+        </tr>
+        <tr>
+            <td><h5>table.discovery-interval</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>The discovery interval of new tables. Only effective when 
database or table is not set.</td>
+        </tr>

Review Comment:
   Missing documentation for parameters with the prefix `catalog.properties`.



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java:
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.pipeline.cdc.source.enumerator;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
+import 
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.DATABASE;
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.TABLE;
+import static 
org.apache.paimon.flink.pipeline.cdc.CDCOptions.TABLE_DISCOVERY_INTERVAL;
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.toCDCOption;
+
+/** {@link SplitEnumerator} for CDC source. */
+public class CDCSourceEnumerator
+        implements SplitEnumerator<TableAwareFileStoreSourceSplit, 
CDCCheckpoint> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CDCSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
context;
+    private final long discoveryInterval;
+    private final long tableDiscoveryInterval;
+    private long lastTableDiscoveryTime = Long.MAX_VALUE;
+    private final AtomicInteger currentTableIndex = new AtomicInteger(0);
+    private final String database;
+    @Nullable private final String table;
+    private final Catalog catalog;
+
+    private final int splitMaxNum;
+    private final CDCSplitAssigner splitAssigner;
+    private final Set<Integer> readersAwaitingSplit;
+    private final SplitIdGenerator splitIdGenerator;
+    private final Map<Identifier, TableStatus> tableStatusMap = new 
HashMap<>();
+    private final Set<Identifier> discoveredNonFileStoreTables = new 
HashSet<>();
+    private int numTablesWithSubtaskAssigned;
+
+    private boolean stopTriggerScan = false;
+
+    public CDCSourceEnumerator(
+            SplitEnumeratorContext<TableAwareFileStoreSourceSplit> context,
+            long discoveryInterval,
+            CatalogContext catalogContext,
+            Configuration cdcConfig,
+            @Nullable CDCCheckpoint checkpoint) {
+        this.context = context;
+        this.discoveryInterval = discoveryInterval;
+        this.tableDiscoveryInterval =
+                
cdcConfig.get(toCDCOption(TABLE_DISCOVERY_INTERVAL)).toMillis();
+
+        this.database = cdcConfig.get(toCDCOption(DATABASE));
+        this.table = cdcConfig.get(toCDCOption(TABLE));
+        this.catalog = CatalogFactory.createCatalog(catalogContext);
+
+        if (checkpoint != null) {
+            for (Identifier identifier : 
checkpoint.getCurrentSnapshotIdMap().keySet()) {
+                Table table;
+                try {
+                    table = catalog.getTable(identifier);
+                } catch (Catalog.TableNotExistException e) {
+                    LOG.warn(
+                            "Table {} not found after recovery. The most 
possible cause is it has been deleted. Skipping this table.",
+                            identifier,
+                            e);
+                    continue;
+                }
+
+                if (!(table instanceof FileStoreTable)) {
+                    LOG.warn(
+                            "Table {} used to be a FileStoreTable in the last 
checkpoint, but after recovery it turned out to be a {}. Skipping this table.",
+                            identifier,
+                            table.getClass().getSimpleName());
+                    continue;
+                }
+
+                TableStatus tableStatus = new TableStatus(context, 
(FileStoreTable) table);
+                
tableStatus.restore(checkpoint.getCurrentSnapshotIdMap().get(identifier));
+                tableStatusMap.put(identifier, tableStatus);
+            }
+        }
+        this.numTablesWithSubtaskAssigned = 0;
+
+        int splitMaxPerTask = 
catalogContext.options().get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK);
+        this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+
+        this.splitAssigner = new CDCSplitAssigner(splitMaxPerTask, 
context.currentParallelism());
+        this.readersAwaitingSplit = new LinkedHashSet<>();
+        this.splitIdGenerator = new SplitIdGenerator();
+
+        if (checkpoint != null) {
+            addSplits(checkpoint.getSplits());
+        }
+    }
+
+    protected void addSplits(Collection<TableAwareFileStoreSourceSplit> 
splits) {
+        splits.forEach(this::addSplit);
+    }
+
+    private void addSplit(TableAwareFileStoreSourceSplit split) {
+        splitAssigner.addSplit(assignSuggestedTask(split), split);
+    }
+
+    @Override
+    public void start() {
+        context.callAsync(
+                this::scanNextSnapshot, this::processDiscoveredSplits, 0, 
discoveryInterval);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // no resources to close
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        // this source is purely lazy-pull-based, nothing to do upon 
registration
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        readersAwaitingSplit.add(subtaskId);
+        assignSplits();
+        // if current task assigned no split, we check conditions to scan one 
more time
+        if (readersAwaitingSplit.contains(subtaskId)) {
+            if (stopTriggerScan) {
+                return;
+            }
+            stopTriggerScan = true;
+            context.callAsync(this::scanNextSnapshot, 
this::processDiscoveredSplits);
+        }
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        LOG.error("Received unrecognized event: {}", sourceEvent);
+    }
+
+    @Override
+    public void addSplitsBack(List<TableAwareFileStoreSourceSplit> splits, int 
subtaskId) {
+        LOG.debug("File Source Enumerator adds splits back: {}", splits);
+        splitAssigner.addSplitsBack(subtaskId, new ArrayList<>(splits));
+    }
+
+    @Override
+    public CDCCheckpoint snapshotState(long checkpointId) {
+        Collection<TableAwareFileStoreSourceSplit> splits = 
splitAssigner.remainingSplits();
+
+        Map<Identifier, Long> nextSnapshotIdMap = new HashMap<>();
+        for (Map.Entry<Identifier, TableStatus> entry : 
tableStatusMap.entrySet()) {
+            nextSnapshotIdMap.put(entry.getKey(), 
entry.getValue().nextSnapshotId);
+        }
+        final CDCCheckpoint checkpoint = new CDCCheckpoint(splits, 
nextSnapshotIdMap);
+
+        LOG.debug("Source Checkpoint is {}", checkpoint);
+        return checkpoint;
+    }
+
+    private synchronized Optional<TableAwarePlan> scanNextSnapshot() throws 
Exception {
+        if (splitAssigner.numberOfRemainingSplits() >= splitMaxNum) {
+            return Optional.empty();
+        }
+
+        discoverTables();
+
+        if (tableStatusMap.isEmpty()) {
+            return Optional.empty();
+        }
+
+        // Round-robin selection of tables
+        List<Identifier> tableIdentifiers = new 
ArrayList<>(tableStatusMap.keySet());
+        int index = currentTableIndex.getAndUpdate(i -> (i + 1) % 
tableIdentifiers.size());
+        Identifier currentIdentifier = tableIdentifiers.get(index);
+        StreamTableScan currentScan = 
tableStatusMap.get(currentIdentifier).scan;
+
+        TableScan.Plan plan = currentScan.plan();
+        return Optional.of(new TableAwarePlan(plan, currentIdentifier, 
currentScan.checkpoint()));
+    }
+
+    // this method could not be synchronized, because it runs in 
coordinatorThread, which will make
+    // it serialize.
+    private void processDiscoveredSplits(Optional<TableAwarePlan> 
planOptional, Throwable error) {
+        if (error != null) {
+            if (error instanceof EndOfScanException) {
+                // finished
+                LOG.debug("Catching EndOfStreamException, the stream is 
finished.");
+                assignSplits();
+            } else {
+                LOG.error("Failed to enumerate files", error);
+                throw new RuntimeException(error);
+            }
+            return;
+        }
+
+        if (!planOptional.isPresent()) {
+            return;
+        }
+        TableAwarePlan tableAwarePlan = planOptional.get();
+        TableScan.Plan plan = tableAwarePlan.plan;
+        tableStatusMap.get(tableAwarePlan.identifier).nextSnapshotId =
+                tableAwarePlan.nextSnapshotId;
+        if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
+            stopTriggerScan = true;
+            return;
+        }
+
+        stopTriggerScan = false;
+        if (plan.splits().isEmpty()) {
+            return;
+        }
+
+        FileStoreTable table = 
tableStatusMap.get(tableAwarePlan.identifier).table;
+        List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
+        for (Split split : plan.splits()) {
+            TableSchema lastSchema = 
tableStatusMap.get(tableAwarePlan.identifier).schema;
+            TableAwareFileStoreSourceSplit tableAwareFileStoreSourceSplit =
+                    toTableAwareSplit(
+                            splitIdGenerator.getNextId(),
+                            split,
+                            table,
+                            tableAwarePlan.identifier,
+                            lastSchema);
+            tableStatusMap.get(tableAwarePlan.identifier).schema =
+                    tableAwareFileStoreSourceSplit.getSchema();
+            splits.add(tableAwareFileStoreSourceSplit);
+        }
+
+        addSplits(splits);
+        assignSplits();
+    }
+
+    @VisibleForTesting
+    protected TableAwareFileStoreSourceSplit toTableAwareSplit(
+            String splitId,
+            Split split,
+            FileStoreTable table,
+            Identifier identifier,
+            @Nullable TableSchema lastSchema) {
+        Preconditions.checkState(split instanceof DataSplit);
+        long snapshotId = ((DataSplit) split).snapshotId();
+        long schemaId = table.snapshot(snapshotId).schemaId();
+        TableSchema schema = table.schemaManager().schema(schemaId);
+        return new TableAwareFileStoreSourceSplit(
+                splitId, split, 0, identifier, lastSchema, schema);
+    }
+
+    /**
+     * Method should be synchronized because {@link #handleSplitRequest} and 
{@link
+     * #processDiscoveredSplits} have thread conflicts.
+     */
+    protected synchronized void assignSplits() {
+        // create assignment
+        Map<Integer, List<TableAwareFileStoreSourceSplit>> assignment = new 
HashMap<>();
+        Iterator<Integer> readersAwait = readersAwaitingSplit.iterator();
+        Set<Integer> subtaskIds = context.registeredReaders().keySet();
+        while (readersAwait.hasNext()) {
+            Integer task = readersAwait.next();
+            if (!subtaskIds.contains(task)) {
+                readersAwait.remove();
+                continue;
+            }
+            List<TableAwareFileStoreSourceSplit> splits = 
splitAssigner.getNext(task);
+            if (!splits.isEmpty()) {
+                assignment.put(task, splits);
+            }
+        }
+
+        assignment.keySet().forEach(readersAwaitingSplit::remove);
+        context.assignSplits(new SplitsAssignment<>(assignment));
+    }
+
+    protected int assignSuggestedTask(TableAwareFileStoreSourceSplit split) {
+        if (tableStatusMap.get(split.getIdentifier()).subtaskId != null) {
+            return tableStatusMap.get(split.getIdentifier()).subtaskId;
+        }
+
+        int subtaskId = numTablesWithSubtaskAssigned % 
context.currentParallelism();
+        this.tableStatusMap.get(split.getIdentifier()).subtaskId = subtaskId;
+        this.numTablesWithSubtaskAssigned++;
+        LOG.info("Assigning table {} to subtask {}", split.getIdentifier(), 
subtaskId);
+        return subtaskId;
+    }
+
+    private void discoverTables() throws Exception {
+        if (lastTableDiscoveryTime < System.currentTimeMillis() - 
tableDiscoveryInterval) {
+            return;
+        }
+        lastTableDiscoveryTime = System.currentTimeMillis();
+
+        List<String> databaseNames;
+        if (database == null || database.isEmpty()) {
+            Preconditions.checkArgument(
+                    table == null || table.isEmpty(),
+                    "Tables should not be specified when databases is not. But 
tables is specified as "
+                            + table);
+            // If database parameter is not specified, dynamically scan all 
databases
+            databaseNames = catalog.listDatabases();
+        } else {
+            databaseNames = Collections.singletonList(database);
+        }
+
+        for (String dbName : databaseNames) {
+            List<String> tableNames;
+            if (table == null || table.isEmpty()) {
+                // If tables parameter is not specified but database is 
specified, dynamically scan
+                // all tables in the database
+                tableNames = catalog.listTables(dbName);
+            } else {
+                tableNames = Collections.singletonList(table);
+            }
+
+            for (String tableName : tableNames) {

Review Comment:
   It seems that regular expression matching is not supported here. I'm not 
sure whether Paimon Source has a requirement to process only certain tables, as 
sharding by database/table is unlikely in Paimon tables. 
   
   Therefore, you may consider whether to allow configuring database/table 
names as regular expressions (or adding a new compatible parameter in the 
future, which would also be acceptable).



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java:
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.pipeline.cdc.source.enumerator;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
+import 
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.DATABASE;
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.TABLE;
+import static 
org.apache.paimon.flink.pipeline.cdc.CDCOptions.TABLE_DISCOVERY_INTERVAL;
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.toCDCOption;
+
+/** {@link SplitEnumerator} for CDC source. */
+public class CDCSourceEnumerator
+        implements SplitEnumerator<TableAwareFileStoreSourceSplit, 
CDCCheckpoint> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CDCSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
context;
+    private final long discoveryInterval;
+    private final long tableDiscoveryInterval;
+    private long lastTableDiscoveryTime = Long.MAX_VALUE;
+    private final AtomicInteger currentTableIndex = new AtomicInteger(0);
+    private final String database;
+    @Nullable private final String table;
+    private final Catalog catalog;
+
+    private final int splitMaxNum;
+    private final CDCSplitAssigner splitAssigner;
+    private final Set<Integer> readersAwaitingSplit;
+    private final SplitIdGenerator splitIdGenerator;
+    private final Map<Identifier, TableStatus> tableStatusMap = new 
HashMap<>();
+    private final Set<Identifier> discoveredNonFileStoreTables = new 
HashSet<>();
+    private int numTablesWithSubtaskAssigned;
+
+    private boolean stopTriggerScan = false;
+
+    public CDCSourceEnumerator(
+            SplitEnumeratorContext<TableAwareFileStoreSourceSplit> context,
+            long discoveryInterval,
+            CatalogContext catalogContext,
+            Configuration cdcConfig,
+            @Nullable CDCCheckpoint checkpoint) {
+        this.context = context;
+        this.discoveryInterval = discoveryInterval;
+        this.tableDiscoveryInterval =
+                
cdcConfig.get(toCDCOption(TABLE_DISCOVERY_INTERVAL)).toMillis();
+
+        this.database = cdcConfig.get(toCDCOption(DATABASE));
+        this.table = cdcConfig.get(toCDCOption(TABLE));
+        this.catalog = CatalogFactory.createCatalog(catalogContext);
+
+        if (checkpoint != null) {
+            for (Identifier identifier : 
checkpoint.getCurrentSnapshotIdMap().keySet()) {
+                Table table;
+                try {
+                    table = catalog.getTable(identifier);
+                } catch (Catalog.TableNotExistException e) {
+                    LOG.warn(
+                            "Table {} not found after recovery. The most 
possible cause is it has been deleted. Skipping this table.",
+                            identifier,
+                            e);
+                    continue;
+                }
+
+                if (!(table instanceof FileStoreTable)) {
+                    LOG.warn(
+                            "Table {} used to be a FileStoreTable in the last 
checkpoint, but after recovery it turned out to be a {}. Skipping this table.",
+                            identifier,
+                            table.getClass().getSimpleName());
+                    continue;
+                }
+
+                TableStatus tableStatus = new TableStatus(context, 
(FileStoreTable) table);
+                
tableStatus.restore(checkpoint.getCurrentSnapshotIdMap().get(identifier));
+                tableStatusMap.put(identifier, tableStatus);
+            }
+        }
+        this.numTablesWithSubtaskAssigned = 0;
+
+        int splitMaxPerTask = 
catalogContext.options().get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK);
+        this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+
+        this.splitAssigner = new CDCSplitAssigner(splitMaxPerTask, 
context.currentParallelism());
+        this.readersAwaitingSplit = new LinkedHashSet<>();
+        this.splitIdGenerator = new SplitIdGenerator();
+
+        if (checkpoint != null) {
+            addSplits(checkpoint.getSplits());
+        }
+    }
+
+    protected void addSplits(Collection<TableAwareFileStoreSourceSplit> 
splits) {
+        splits.forEach(this::addSplit);
+    }
+
+    private void addSplit(TableAwareFileStoreSourceSplit split) {
+        splitAssigner.addSplit(assignSuggestedTask(split), split);
+    }
+
+    @Override
+    public void start() {
+        context.callAsync(
+                this::scanNextSnapshot, this::processDiscoveredSplits, 0, 
discoveryInterval);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // no resources to close
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        // this source is purely lazy-pull-based, nothing to do upon 
registration
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        readersAwaitingSplit.add(subtaskId);
+        assignSplits();
+        // if current task assigned no split, we check conditions to scan one 
more time
+        if (readersAwaitingSplit.contains(subtaskId)) {
+            if (stopTriggerScan) {
+                return;
+            }
+            stopTriggerScan = true;
+            context.callAsync(this::scanNextSnapshot, 
this::processDiscoveredSplits);
+        }
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        LOG.error("Received unrecognized event: {}", sourceEvent);
+    }
+
+    @Override
+    public void addSplitsBack(List<TableAwareFileStoreSourceSplit> splits, int 
subtaskId) {
+        LOG.debug("File Source Enumerator adds splits back: {}", splits);
+        splitAssigner.addSplitsBack(subtaskId, new ArrayList<>(splits));
+    }
+
+    @Override
+    public CDCCheckpoint snapshotState(long checkpointId) {
+        Collection<TableAwareFileStoreSourceSplit> splits = 
splitAssigner.remainingSplits();
+
+        Map<Identifier, Long> nextSnapshotIdMap = new HashMap<>();
+        for (Map.Entry<Identifier, TableStatus> entry : 
tableStatusMap.entrySet()) {
+            nextSnapshotIdMap.put(entry.getKey(), 
entry.getValue().nextSnapshotId);
+        }
+        final CDCCheckpoint checkpoint = new CDCCheckpoint(splits, 
nextSnapshotIdMap);
+
+        LOG.debug("Source Checkpoint is {}", checkpoint);
+        return checkpoint;
+    }
+
+    private synchronized Optional<TableAwarePlan> scanNextSnapshot() throws 
Exception {
+        if (splitAssigner.numberOfRemainingSplits() >= splitMaxNum) {
+            return Optional.empty();
+        }
+
+        discoverTables();
+
+        if (tableStatusMap.isEmpty()) {
+            return Optional.empty();
+        }
+
+        // Round-robin selection of tables
+        List<Identifier> tableIdentifiers = new 
ArrayList<>(tableStatusMap.keySet());
+        int index = currentTableIndex.getAndUpdate(i -> (i + 1) % 
tableIdentifiers.size());
+        Identifier currentIdentifier = tableIdentifiers.get(index);
+        StreamTableScan currentScan = 
tableStatusMap.get(currentIdentifier).scan;
+
+        TableScan.Plan plan = currentScan.plan();
+        return Optional.of(new TableAwarePlan(plan, currentIdentifier, 
currentScan.checkpoint()));
+    }
+
+    // this method could not be synchronized, because it runs in 
coordinatorThread, which will make
+    // it serialize.
+    private void processDiscoveredSplits(Optional<TableAwarePlan> 
planOptional, Throwable error) {
+        if (error != null) {
+            if (error instanceof EndOfScanException) {
+                // finished
+                LOG.debug("Catching EndOfStreamException, the stream is 
finished.");
+                assignSplits();
+            } else {
+                LOG.error("Failed to enumerate files", error);
+                throw new RuntimeException(error);
+            }
+            return;
+        }
+
+        if (!planOptional.isPresent()) {
+            return;
+        }
+        TableAwarePlan tableAwarePlan = planOptional.get();
+        TableScan.Plan plan = tableAwarePlan.plan;
+        tableStatusMap.get(tableAwarePlan.identifier).nextSnapshotId =
+                tableAwarePlan.nextSnapshotId;
+        if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
+            stopTriggerScan = true;
+            return;
+        }
+
+        stopTriggerScan = false;
+        if (plan.splits().isEmpty()) {
+            return;
+        }
+
+        FileStoreTable table = 
tableStatusMap.get(tableAwarePlan.identifier).table;
+        List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
+        for (Split split : plan.splits()) {
+            TableSchema lastSchema = 
tableStatusMap.get(tableAwarePlan.identifier).schema;
+            TableAwareFileStoreSourceSplit tableAwareFileStoreSourceSplit =
+                    toTableAwareSplit(
+                            splitIdGenerator.getNextId(),
+                            split,
+                            table,
+                            tableAwarePlan.identifier,
+                            lastSchema);
+            tableStatusMap.get(tableAwarePlan.identifier).schema =
+                    tableAwareFileStoreSourceSplit.getSchema();
+            splits.add(tableAwareFileStoreSourceSplit);
+        }
+
+        addSplits(splits);
+        assignSplits();
+    }
+
+    @VisibleForTesting
+    protected TableAwareFileStoreSourceSplit toTableAwareSplit(
+            String splitId,
+            Split split,
+            FileStoreTable table,
+            Identifier identifier,
+            @Nullable TableSchema lastSchema) {
+        Preconditions.checkState(split instanceof DataSplit);
+        long snapshotId = ((DataSplit) split).snapshotId();
+        long schemaId = table.snapshot(snapshotId).schemaId();
+        TableSchema schema = table.schemaManager().schema(schemaId);
+        return new TableAwareFileStoreSourceSplit(
+                splitId, split, 0, identifier, lastSchema, schema);
+    }
+
+    /**
+     * Method should be synchronized because {@link #handleSplitRequest} and 
{@link
+     * #processDiscoveredSplits} have thread conflicts.
+     */
+    protected synchronized void assignSplits() {
+        // create assignment
+        Map<Integer, List<TableAwareFileStoreSourceSplit>> assignment = new 
HashMap<>();
+        Iterator<Integer> readersAwait = readersAwaitingSplit.iterator();
+        Set<Integer> subtaskIds = context.registeredReaders().keySet();
+        while (readersAwait.hasNext()) {
+            Integer task = readersAwait.next();
+            if (!subtaskIds.contains(task)) {
+                readersAwait.remove();
+                continue;
+            }
+            List<TableAwareFileStoreSourceSplit> splits = 
splitAssigner.getNext(task);
+            if (!splits.isEmpty()) {
+                assignment.put(task, splits);
+            }
+        }
+
+        assignment.keySet().forEach(readersAwaitingSplit::remove);
+        context.assignSplits(new SplitsAssignment<>(assignment));
+    }
+
+    protected int assignSuggestedTask(TableAwareFileStoreSourceSplit split) {
+        if (tableStatusMap.get(split.getIdentifier()).subtaskId != null) {
+            return tableStatusMap.get(split.getIdentifier()).subtaskId;
+        }
+
+        int subtaskId = numTablesWithSubtaskAssigned % 
context.currentParallelism();
+        this.tableStatusMap.get(split.getIdentifier()).subtaskId = subtaskId;
+        this.numTablesWithSubtaskAssigned++;
+        LOG.info("Assigning table {} to subtask {}", split.getIdentifier(), 
subtaskId);
+        return subtaskId;
+    }
+
+    private void discoverTables() throws Exception {
+        if (lastTableDiscoveryTime < System.currentTimeMillis() - 
tableDiscoveryInterval) {
+            return;
+        }
+        lastTableDiscoveryTime = System.currentTimeMillis();
+
+        List<String> databaseNames;
+        if (database == null || database.isEmpty()) {
+            Preconditions.checkArgument(
+                    table == null || table.isEmpty(),
+                    "Tables should not be specified when databases is not. But 
tables is specified as "

Review Comment:
   Tables should not be specified when databases is not  =>
   Tables should not be specified when databases is null.



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java:
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.pipeline.cdc.source.enumerator;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
+import 
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.DATABASE;
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.TABLE;
+import static 
org.apache.paimon.flink.pipeline.cdc.CDCOptions.TABLE_DISCOVERY_INTERVAL;
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.toCDCOption;
+
+/** {@link SplitEnumerator} for CDC source. */
+public class CDCSourceEnumerator
+        implements SplitEnumerator<TableAwareFileStoreSourceSplit, 
CDCCheckpoint> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CDCSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
context;
+    private final long discoveryInterval;
+    private final long tableDiscoveryInterval;
+    private long lastTableDiscoveryTime = Long.MAX_VALUE;
+    private final AtomicInteger currentTableIndex = new AtomicInteger(0);
+    private final String database;
+    @Nullable private final String table;
+    private final Catalog catalog;
+
+    private final int splitMaxNum;
+    private final CDCSplitAssigner splitAssigner;
+    private final Set<Integer> readersAwaitingSplit;
+    private final SplitIdGenerator splitIdGenerator;
+    private final Map<Identifier, TableStatus> tableStatusMap = new 
HashMap<>();
+    private final Set<Identifier> discoveredNonFileStoreTables = new 
HashSet<>();
+    private int numTablesWithSubtaskAssigned;
+
+    private boolean stopTriggerScan = false;
+
+    public CDCSourceEnumerator(
+            SplitEnumeratorContext<TableAwareFileStoreSourceSplit> context,
+            long discoveryInterval,
+            CatalogContext catalogContext,
+            Configuration cdcConfig,
+            @Nullable CDCCheckpoint checkpoint) {
+        this.context = context;
+        this.discoveryInterval = discoveryInterval;
+        this.tableDiscoveryInterval =
+                
cdcConfig.get(toCDCOption(TABLE_DISCOVERY_INTERVAL)).toMillis();
+
+        this.database = cdcConfig.get(toCDCOption(DATABASE));
+        this.table = cdcConfig.get(toCDCOption(TABLE));
+        this.catalog = CatalogFactory.createCatalog(catalogContext);
+
+        if (checkpoint != null) {
+            for (Identifier identifier : 
checkpoint.getCurrentSnapshotIdMap().keySet()) {
+                Table table;
+                try {
+                    table = catalog.getTable(identifier);
+                } catch (Catalog.TableNotExistException e) {
+                    LOG.warn(
+                            "Table {} not found after recovery. The most 
possible cause is it has been deleted. Skipping this table.",
+                            identifier,
+                            e);
+                    continue;
+                }
+
+                if (!(table instanceof FileStoreTable)) {
+                    LOG.warn(
+                            "Table {} used to be a FileStoreTable in the last 
checkpoint, but after recovery it turned out to be a {}. Skipping this table.",
+                            identifier,
+                            table.getClass().getSimpleName());
+                    continue;
+                }
+
+                TableStatus tableStatus = new TableStatus(context, 
(FileStoreTable) table);
+                
tableStatus.restore(checkpoint.getCurrentSnapshotIdMap().get(identifier));
+                tableStatusMap.put(identifier, tableStatus);

Review Comment:
   It's better to add more logs here for easier troubleshooting.



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/CDCOptions.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.pipeline.cdc;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.FallbackKey;
+
+import org.apache.flink.util.IterableUtils;
+
+import java.time.Duration;
+
+/** Options for the cdc source. */
+public class CDCOptions {
+    /** prefix for passing properties for catalog creation. */
+    public static final String PREFIX_CATALOG_PROPERTIES = 
"catalog.properties.";

Review Comment:
   Should we provide the ability to specify parameters like `scan.mode` and 
`consumer-id` for the source table?



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceReader.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.pipeline.cdc.source.reader;
+
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.pipeline.cdc.source.CDCSource;
+import 
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitState;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.cdc.common.event.Event;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
+
+import java.util.Map;
+
+/** A {@link SourceReader} that read records from {@link 
TableAwareFileStoreSourceSplit}. */
+public class CDCSourceReader
+        extends SingleThreadMultiplexSourceReaderBase<
+                RecordIterator<Event>,
+                Event,
+                TableAwareFileStoreSourceSplit,
+                FileStoreSourceSplitState> {
+
+    private final IOManager ioManager;
+
+    public CDCSourceReader(
+            SourceReaderContext readerContext,
+            FileStoreSourceReaderMetrics metrics,
+            IOManager ioManager,
+            CDCSource.TableReadManager tableReadManager) {
+        super(
+                () -> new CDCSourceSplitReader(metrics, tableReadManager),
+                (element, output, state) ->
+                        CDCRecordsWithSplitIds.emitRecord(
+                                readerContext, element, output, state, 
metrics),
+                readerContext.getConfiguration(),
+                readerContext);
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public void start() {
+        // we request a split only if we did not get splits during the 
checkpoint restore
+        if (getNumberOfCurrentlyAssignedSplits() == 0) {
+            context.sendSplitRequest();
+        }
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, FileStoreSourceSplitState> 
finishedSplitIds) {
+        // this method is called each time when we consume one split
+        // it is possible that one response from the coordinator contains 
multiple splits
+        // we should only require for more splits after we've consumed all 
given splits
+        if (getNumberOfCurrentlyAssignedSplits() == 0) {
+            context.sendSplitRequest();
+        }
+    }
+
+    @Override
+    protected FileStoreSourceSplitState 
initializedState(TableAwareFileStoreSourceSplit split) {
+        return new FileStoreSourceSplitState(split);

Review Comment:
   We can print the split information here to monitor the data reading progress 
during job startup.
   
   We can also add logs in `toSplitType` to track progress during checkpointing 
(this depends on your preference, as `toSplitType` is not only called during 
checkpointing).



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCRecordsWithSplitIds.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.pipeline.cdc.source.reader;
+
+import org.apache.paimon.flink.source.FileStoreSourceSplitState;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
+import org.apache.paimon.utils.Reference;
+
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * A {@link RecordsWithSplitIds} which contains only one iterator record. This 
can ensure that there
+ * will be no checkpoint segmentation in iterator consumption.
+ */
+public class CDCRecordsWithSplitIds implements 
RecordsWithSplitIds<RecordIterator<Event>> {
+
+    @Nullable private String splitId;
+
+    @Nullable private Reference<RecordIterator<Event>> recordsForSplitCurrent;
+
+    @Nullable private final RecordIterator<Event> recordsForSplit;
+
+    private final Set<String> finishedSplits;
+
+    private CDCRecordsWithSplitIds(
+            @Nullable String splitId,
+            @Nullable RecordIterator<Event> recordsForSplit,
+            Set<String> finishedSplits) {
+        this.splitId = splitId;
+        this.recordsForSplit = recordsForSplit;
+        this.finishedSplits = finishedSplits;
+    }
+
+    @Nullable
+    @Override
+    public String nextSplit() {
+        // move the split one (from current value to null)
+        final String nextSplit = this.splitId;
+        this.splitId = null;
+
+        // move the iterator, from null to value (if first move) or to null 
(if second move)
+        this.recordsForSplitCurrent =
+                nextSplit != null ? new Reference<>(this.recordsForSplit) : 
null;
+
+        return nextSplit;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<Event> nextRecordFromSplit() {
+        if (this.recordsForSplitCurrent == null) {
+            throw new IllegalStateException();
+        }
+
+        RecordIterator<Event> recordsForSplit = 
this.recordsForSplitCurrent.get();
+        this.recordsForSplitCurrent.set(null);
+        return recordsForSplit;
+    }
+
+    @Override
+    public Set<String> finishedSplits() {
+        return finishedSplits;
+    }
+
+    @Override
+    public void recycle() {
+        if (recordsForSplit != null) {
+            recordsForSplit.releaseBatch();
+        }
+    }
+
+    public static CDCRecordsWithSplitIds forRecords(
+            String splitId, RecordIterator<Event> recordsForSplit) {
+        return new CDCRecordsWithSplitIds(splitId, recordsForSplit, 
Collections.emptySet());
+    }
+
+    public static CDCRecordsWithSplitIds finishedSplit(String splitId) {
+        return new CDCRecordsWithSplitIds(null, null, 
Collections.singleton(splitId));
+    }
+
+    public static void emitRecord(
+            SourceReaderContext context,
+            RecordIterator<Event> element,
+            SourceOutput<Event> output,
+            FileStoreSourceSplitState state,
+            FileStoreSourceReaderMetrics metrics) {
+        long timestamp = TimestampAssigner.NO_TIMESTAMP;
+        if (metrics.getLatestFileCreationTime() != 
FileStoreSourceReaderMetrics.UNDEFINED) {
+            timestamp = metrics.getLatestFileCreationTime();
+        }
+
+        // This metric only counts the number of RecordIterator<Event> emitted,
+        // however what we really want is to count the number of Events 
emitted,
+        // so we replenish the missing record count here.
+        org.apache.flink.metrics.Counter numRecordsIn =
+                
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();

Review Comment:
   The Counter here should be at the Source level. 
   Should we provide metrics for individual tables?



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java:
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.pipeline.cdc.source.enumerator;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
+import 
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.DATABASE;
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.TABLE;
+import static 
org.apache.paimon.flink.pipeline.cdc.CDCOptions.TABLE_DISCOVERY_INTERVAL;
+import static org.apache.paimon.flink.pipeline.cdc.CDCOptions.toCDCOption;
+
+/** {@link SplitEnumerator} for CDC source. */
+public class CDCSourceEnumerator
+        implements SplitEnumerator<TableAwareFileStoreSourceSplit, 
CDCCheckpoint> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CDCSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
context;
+    private final long discoveryInterval;
+    private final long tableDiscoveryInterval;
+    private long lastTableDiscoveryTime = Long.MAX_VALUE;
+    private final AtomicInteger currentTableIndex = new AtomicInteger(0);
+    private final String database;
+    @Nullable private final String table;
+    private final Catalog catalog;
+
+    private final int splitMaxNum;
+    private final CDCSplitAssigner splitAssigner;
+    private final Set<Integer> readersAwaitingSplit;
+    private final SplitIdGenerator splitIdGenerator;
+    private final Map<Identifier, TableStatus> tableStatusMap = new 
HashMap<>();
+    private final Set<Identifier> discoveredNonFileStoreTables = new 
HashSet<>();
+    private int numTablesWithSubtaskAssigned;
+
+    private boolean stopTriggerScan = false;
+
+    public CDCSourceEnumerator(
+            SplitEnumeratorContext<TableAwareFileStoreSourceSplit> context,
+            long discoveryInterval,
+            CatalogContext catalogContext,
+            Configuration cdcConfig,
+            @Nullable CDCCheckpoint checkpoint) {
+        this.context = context;
+        this.discoveryInterval = discoveryInterval;
+        this.tableDiscoveryInterval =
+                
cdcConfig.get(toCDCOption(TABLE_DISCOVERY_INTERVAL)).toMillis();
+
+        this.database = cdcConfig.get(toCDCOption(DATABASE));
+        this.table = cdcConfig.get(toCDCOption(TABLE));
+        this.catalog = CatalogFactory.createCatalog(catalogContext);
+
+        if (checkpoint != null) {
+            for (Identifier identifier : 
checkpoint.getCurrentSnapshotIdMap().keySet()) {
+                Table table;
+                try {
+                    table = catalog.getTable(identifier);
+                } catch (Catalog.TableNotExistException e) {
+                    LOG.warn(
+                            "Table {} not found after recovery. The most 
possible cause is it has been deleted. Skipping this table.",
+                            identifier,
+                            e);
+                    continue;
+                }
+
+                if (!(table instanceof FileStoreTable)) {
+                    LOG.warn(
+                            "Table {} used to be a FileStoreTable in the last 
checkpoint, but after recovery it turned out to be a {}. Skipping this table.",
+                            identifier,
+                            table.getClass().getSimpleName());
+                    continue;
+                }
+
+                TableStatus tableStatus = new TableStatus(context, 
(FileStoreTable) table);
+                
tableStatus.restore(checkpoint.getCurrentSnapshotIdMap().get(identifier));
+                tableStatusMap.put(identifier, tableStatus);
+            }
+        }
+        this.numTablesWithSubtaskAssigned = 0;
+
+        int splitMaxPerTask = 
catalogContext.options().get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK);
+        this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+
+        this.splitAssigner = new CDCSplitAssigner(splitMaxPerTask, 
context.currentParallelism());
+        this.readersAwaitingSplit = new LinkedHashSet<>();
+        this.splitIdGenerator = new SplitIdGenerator();
+
+        if (checkpoint != null) {

Review Comment:
   The processing here can be merged into the previous `if (checkpoint != 
null)` condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to