Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3225989113


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SplitKeyTypeResolver.java:
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.IntSupplier;
+
+/**
+ * Casts JSON-round-tripped split-key values back to the Java type the JDBC 
driver returns
+ * for that column. flink-cdc's chunk splitter uses strict {@code 
Objects.equals} internally,
+ * so passing {@code Integer(3)} where it expects {@code Long(3)} (BIGINT 
column) silently
+ * produces empty chunks.
+ */
+public final class SplitKeyTypeResolver {
+
+    private static final ConcurrentMap<String, Integer> SQL_TYPE_CACHE = new 
ConcurrentHashMap<>();
+
+    private SplitKeyTypeResolver() {}
+
+    /** Returns cached sqlType, or computes via {@code resolver} and caches. */
+    public static int getOrCompute(String cacheKey, IntSupplier resolver) {
+        Integer cached = SQL_TYPE_CACHE.get(cacheKey);
+        if (cached != null) {
+            return cached;
+        }
+        int v = resolver.getAsInt();
+        SQL_TYPE_CACHE.put(cacheKey, v);
+        return v;

Review Comment:
   `getOrCompute` does a non-atomic `get` then `put` on a `ConcurrentMap`, so 
under concurrency the resolver can be invoked multiple times and later results 
may overwrite earlier ones. Using `computeIfAbsent` (and handling resolver 
exceptions) would make caching atomic and avoid redundant schema lookups.
   



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +623,194 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
-    public void splitChunks(List<String> createTbls) throws JobException {
-        // todo: When splitting takes a long time, it needs to be changed to 
asynchronous.
-        if (checkNeedSplitChunks(sourceProperties)) {
-            Map<String, List<SnapshotSplit>> tableSplits = new 
LinkedHashMap<>();
-            for (String tbl : createTbls) {
-                List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
-                tableSplits.put(tbl, snapshotSplits);
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /**
+     * One-time setup at CREATE.
+     * - initial/snapshot mode: init split progress; scheduler will drive 
advanceSplits() each tick.
+     * - latest mode (and other non-splitting modes): open the remote reader 
(e.g. PG slot) so the
+     *   binlog phase can start immediately; no snapshot splitting will happen.
+     */
+    @Override
+    public void initOnCreate(List<String> syncTables) throws JobException {
+        if (!checkNeedSplitChunks(sourceProperties)) {
+            initSourceReader();
+            return;
+        }
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        if (!checkNeedSplitChunks(sourceProperties)) {
+            return true;
+        }
+        synchronized (splitsLock) {
+            return cdcSplitProgress.getCurrentSplittingTable() == null
+                    && computeCdcRemainingTables().isEmpty();
+        }
+    }
+
+    /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+    private List<String> computeCdcRemainingTables() {
+        if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Set<String> touched = new HashSet<>();
+        for (SnapshotSplit s : finishedSplits) {
+            touched.add(s.getTableId());
+        }
+        for (SnapshotSplit s : remainingSplits) {
+            touched.add(s.getTableId());
+        }
+        if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+            touched.add(cdcSplitProgress.getCurrentSplittingTable());
+        }
+        List<String> result = new ArrayList<>(cachedSyncTables.size());
+        for (String t : cachedSyncTables) {
+            if (!touched.contains(t)) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void advanceSplits() throws JobException {
+        synchronized (splitsLock) {
+            // 1. Pick next table if not currently splitting one.
+            if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+                List<String> remaining = computeCdcRemainingTables();
+                if (remaining.isEmpty()) {
+                    return;
+                }
+                cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+                cdcSplitProgress.setNextSplitStart(null);
+                cdcSplitProgress.setNextSplitId(null);
+            }
+            String tbl = cdcSplitProgress.getCurrentSplittingTable();
+            Object[] startVal = cdcSplitProgress.getNextSplitStart();
+            Integer splitId = cdcSplitProgress.getNextSplitId();
+
+            // 2. RPC. OK to hold lock during RPC: no async splitting thread; 
updateOffset on
+            //    task commit waits briefly (max_interval >> RPC time anyway).
+            List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, 
splitId);
+            if (batch == null || batch.isEmpty()) {

Review Comment:
   `advanceSplits()` holds `splitsLock` while performing the network RPC 
(`rpcFetchSplitsBatch`) and the subsequent system-table write 
(`StreamingJobUtils.upsertChunkList`). Since `rpcFetchSplitsBatch` uses the 
heavy RPC timeout and the UPSERT executes internal queries, this can block 
`updateOffset()`/`getNextOffset()` for a long time and delay task 
commits/scheduling. Consider doing RPC + meta-table upsert outside the 
synchronized block (copy the current progress under lock, release lock, do I/O, 
then re-lock to merge/apply if progress is unchanged).



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -101,6 +101,12 @@ public class StreamingJobUtils {
     private static final String SELECT_SPLITS_TABLE_TEMPLATE =
             "SELECT table_name, chunk_list FROM " + 
FULL_QUALIFIED_META_TBL_NAME + " WHERE job_id='%s' ORDER BY id ASC";
 
+    private static final String SELECT_TABLE_ID_TEMPLATE =
+            "SELECT id FROM " + FULL_QUALIFIED_META_TBL_NAME + " WHERE 
job_id='%s' AND table_name='%s'";

Review Comment:
   `SELECT_TABLE_ID_TEMPLATE` / `querySingleTableId` can return multiple rows 
if the same (job_id, table_name) is inserted with different ids (the table's 
UNIQUE KEY is only on (id, job_id)). Without `ORDER BY ... LIMIT 1`, the chosen 
id can be nondeterministic and cause extra duplicate rows. Consider adding 
`ORDER BY id DESC LIMIT 1` to consistently reuse the latest id (and/or enforce 
uniqueness on (job_id, table_name) if feasible).
   



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -158,46 +159,123 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
MySqlChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC rebuilds the splitter from (table, 
nextSplitStart, nextSplitId)
+     * supplied by FE, splits up to {@code batchSize} chunks, then closes. 
Note: evenly-distributed
+     * PKs go through a single splitChunks() call returning all chunks at 
once, so batchSize is only
+     * effective on the uneven path.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes go through the chunk path; other 
modes return a
+     * single BinlogSplit instead.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         MySqlSourceConfig sourceConfig = getSourceConfig(ftsReq);
         StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
-        List<MySqlSnapshotSplit> remainingSnapshotSplits = new ArrayList<>();
-        MySqlBinlogSplit remainingBinlogSplit = null;
-        if (startupMode.equals(StartupMode.INITIAL) || 
startupMode.equals(StartupMode.SNAPSHOT)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
+
+        if (!startupMode.equals(StartupMode.INITIAL) && 
!startupMode.equals(StartupMode.SNAPSHOT)) {
+            BinlogSplit binlogSplit = new BinlogSplit();
+            binlogSplit.setSplitId(BINLOG_SPLIT_ID);
+            
binlogSplit.setStartingOffset(sourceConfig.getStartupOptions().binlogOffset.getOffset());
+            return Collections.singletonList(binlogSplit);
+        }
+
+        String database = 
ftsReq.getConfig().get(DataSourceConfigKeys.DATABASE);
+        TableId tableId = TableId.parse(database + "." + 
ftsReq.getSnapshotTable());
+
+        Object[] pkValues = ftsReq.getNextSplitStart();
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+
+        boolean isCaseSensitive;
+        try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig)) {
+            isCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to query table id case 
sensitivity", e);
+        }
+        MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, 
isCaseSensitive);
+        MySqlPartition partition =
+                new 
MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
+
+        // null -> NO_SPLITTING_TABLE_STATE so splitter may pick evenly path; 
non-null -> resume mid-table.
+        ChunkSplitterState state;
+        if (pkValues == null || pkValues.length == 0) {
+            state = ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
         } else {
-            remainingBinlogSplit =
-                    new MySqlBinlogSplit(
-                            BINLOG_SPLIT_ID,
-                            sourceConfig.getStartupOptions().binlogOffset,
-                            BinlogOffset.ofNonStopping(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
+            // Restore the original JDBC type (JSON downgrades Long to 
Integer).
+            int sqlType = resolveSplitKeySqlType(sourceConfig, tableId, 
mySqlSchema, partition);
+            Object castStart = SplitKeyTypeResolver.cast(pkValues[0], sqlType);
+            int splitId = ftsReq.getNextSplitId() == null ? 0 : 
ftsReq.getNextSplitId();
+            state = new ChunkSplitterState(
+                    tableId, 
ChunkSplitterState.ChunkBound.middleOf(castStart), splitId);
         }
-        List<AbstractSourceSplit> splits = new ArrayList<>();
-        if (!remainingSnapshotSplits.isEmpty()) {
-            for (MySqlSnapshotSplit snapshotSplit : remainingSnapshotSplits) {
-                String splitId = snapshotSplit.splitId();
-                String tableId = snapshotSplit.getTableId().identifier();
-                Object[] splitStart = snapshotSplit.getSplitStart();
-                Object[] splitEnd = snapshotSplit.getSplitEnd();
-                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
-                SnapshotSplit split =
-                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
-                splits.add(split);
+        MySqlChunkSplitter splitter = new MySqlChunkSplitter(mySqlSchema, 
sourceConfig, state);
+
+        try {
+            // open() inside try so a throw still routes through finally for 
close().
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                List<MySqlSnapshotSplit> chunks = 
splitter.splitChunks(partition, tableId);
+                for (MySqlSnapshotSplit chunk : chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));
+                }
+                if (!splitter.hasNextChunk()) {

Review Comment:
   `batchSize` is not enforced when a single `splitChunks()` call returns more 
than `batchSize` chunks (you add the whole `chunks` list before re-checking 
`result.size()`). This can defeat the goal of keeping fetchSplits RPCs short 
and makes the `batchSize` request parameter misleading. Consider stopping once 
`result.size() == batchSize` (or truncating the returned list) and ensuring the 
resume state progresses consistently for the next RPC.
   



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +136,97 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
ChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC builds a fresh splitter from the (table, 
nextChunkStart, nextChunkId)
+     * triple supplied by FE, fetches up to {@code batchSize} chunks, then 
closes the splitter.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest 
modes never reach here.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
-                remainingSnapshotSplits = new ArrayList<>();
-        StreamSplit remainingStreamSplit = null;
-
-        // Check startup mode - for PostgreSQL, we use similar logic as MySQL
-        String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
-                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
+        String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+        TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+        // Build ChunkSplitterState from FE-side (nextSplitStart, nextSplitId).
+        //   null -> NO_SPLITTING_TABLE_STATE so splitter analyzes the table 
and may pick evenly path.
+        //   non-null -> resume mid-table, forced into unevenly path.
+        Object[] pkValues = ftsReq.getNextSplitStart();
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+        ChunkSplitterState state;
+        if (pkValues == null || pkValues.length == 0) {
+            state = ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
         } else {
-            // For non-initial mode, create a stream split
-            Offset startingOffset = createInitialOffset();
-            remainingStreamSplit =
-                    new StreamSplit(
-                            STREAM_SPLIT_ID,
-                            startingOffset,
-                            createNoStoppingOffset(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
+            // Restore the original JDBC type (JSON downgrades Long to 
Integer).
+            int sqlType = resolveSplitKeySqlType(sourceConfig, tableId);
+            Object castStart = SplitKeyTypeResolver.cast(pkValues[0], sqlType);
+            int splitId = ftsReq.getNextSplitId() == null ? 0 : 
ftsReq.getNextSplitId();
+            state = new ChunkSplitterState(tableId, 
ChunkBound.middleOf(castStart), splitId);
         }
+        ChunkSplitter splitter = 
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
 
-        List<AbstractSourceSplit> splits = new ArrayList<>();
-        if (!remainingSnapshotSplits.isEmpty()) {
-            for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
-                    snapshotSplit : remainingSnapshotSplits) {
-                String splitId = snapshotSplit.splitId();
-                String tableId = snapshotSplit.getTableId().identifier();
-                Object[] splitStart = snapshotSplit.getSplitStart();
-                Object[] splitEnd = snapshotSplit.getSplitEnd();
-                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
-                SnapshotSplit split =
-                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
-                splits.add(split);
+        try {
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+                        chunks = splitter.generateSplits(tableId);
+                for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk : 
chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));
+                }
+                if (!splitter.hasNextChunk()) {

Review Comment:
   `batchSize` is not enforced if `splitter.generateSplits(tableId)` returns 
more than `batchSize` splits in one call (the loop condition is only checked 
before the call). This can cause a single fetchSplits RPC to return an 
unexpectedly large payload and potentially run long, despite the `batchSize` 
request field. Consider capping additions to `result` to `batchSize` per RPC 
and keeping resume state consistent with the last returned split.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to