Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3226605189


##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +630,193 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
-    public void splitChunks(List<String> createTbls) throws JobException {
-        // todo: When splitting takes a long time, it needs to be changed to 
asynchronous.
-        if (checkNeedSplitChunks(sourceProperties)) {
-            Map<String, List<SnapshotSplit>> tableSplits = new 
LinkedHashMap<>();
-            for (String tbl : createTbls) {
-                List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
-                tableSplits.put(tbl, snapshotSplits);
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /**
+     * One-time setup at CREATE.
+     * - initial/snapshot mode: init split progress; scheduler will drive 
advanceSplits() each tick.
+     * - latest mode (and other non-splitting modes): open the remote reader 
(e.g. PG slot) so the
+     *   binlog phase can start immediately; no snapshot splitting will happen.
+     */
+    @Override
+    public void initOnCreate(List<String> syncTables) throws JobException {
+        if (!checkNeedSplitChunks(sourceProperties)) {
+            initSourceReader();
+            return;
+        }
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        if (!checkNeedSplitChunks(sourceProperties)) {
+            return true;
+        }
+        synchronized (splitsLock) {
+            return cdcSplitProgress.getCurrentSplittingTable() == null
+                    && computeCdcRemainingTables().isEmpty();
+        }
+    }
+
+    /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+    private List<String> computeCdcRemainingTables() {
+        if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Set<String> touched = new HashSet<>();
+        for (SnapshotSplit s : finishedSplits) {
+            touched.add(s.getTableId());
+        }
+        for (SnapshotSplit s : remainingSplits) {
+            touched.add(s.getTableId());
+        }
+        if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+            touched.add(cdcSplitProgress.getCurrentSplittingTable());
+        }
+        List<String> result = new ArrayList<>(cachedSyncTables.size());
+        for (String t : cachedSyncTables) {
+            if (!touched.contains(t)) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void advanceSplits() throws JobException {
+        synchronized (splitsLock) {
+            // 1. Pick next table if not currently splitting one.
+            if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+                List<String> remaining = computeCdcRemainingTables();
+                if (remaining.isEmpty()) {
+                    return;
+                }
+                cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+                cdcSplitProgress.setNextSplitStart(null);
+                cdcSplitProgress.setNextSplitId(null);
+            }
+            String tbl = cdcSplitProgress.getCurrentSplittingTable();
+            Object[] startVal = cdcSplitProgress.getNextSplitStart();
+            Integer splitId = cdcSplitProgress.getNextSplitId();
+
+            // 2. RPC under lock — updateOffset may wait briefly
+            List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, 
splitId);
+            if (batch == null || batch.isEmpty()) {
+                return;
+            }
+
+            // 3. mergeBySplitId (defensive dedup).
+            Set<String> existingIds = new HashSet<>();
+            finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            List<SnapshotSplit> newSplits = new ArrayList<>();
+            for (SnapshotSplit s : batch) {
+                if (!existingIds.contains(s.getSplitId())) {
+                    newSplits.add(s);
+                }
             }
-            // save chunk list to system table
-            saveChunkMeta(tableSplits);
-            this.remainingSplits = tableSplits.values().stream()
-                    .flatMap(List::stream)
+            if (newSplits.size() < batch.size()) {
+                log.info("advanceSplits dedup'd {} duplicate splits (batch={}, 
new={}) for job {} table {}",
+                        batch.size() - newSplits.size(), batch.size(), 
newSplits.size(), getJobId(), tbl);
+            }
+            remainingSplits.addAll(newSplits);
+
+            // 4. UPSERT this table's full chunk_list to system table.
+            List<SnapshotSplit> splitsOfTbl = Stream.concat(
+                            finishedSplits.stream(), remainingSplits.stream())
+                    .filter(s -> tbl.equals(s.getTableId()))
+                    .sorted(Comparator.comparingInt(s -> 
splitIdOf(s.getSplitId())))
                     .collect(Collectors.toList());
-        } else {
-            // The source reader is automatically initialized when the split 
is obtained.
-            // In latest mode, a separate init is required.init source reader
-            initSourceReader();
+            try {
+                StreamingJobUtils.upsertChunkList(getJobId(), tbl, 
splitsOfTbl);
+            } catch (Exception e) {
+                throw new JobException("UPSERT chunk_list failed for " + tbl + 
": " + e.getMessage());
+            }
+
+            // 5. Advance cdcSplitProgress to the last split in the batch.
+            applySplitToProgress(cdcSplitProgress, batch.get(batch.size() - 
1));
+            log.info("advanceSplits jobId={} table={} request(nextStart={}, 
nextSplitId={}) "

Review Comment:
   In `advanceSplits`, `applySplitToProgress(cdcSplitProgress, batchLast)` sets 
`currentSplittingTable` from `SnapshotSplit.tableId`, but `rpcFetchSplitsBatch` 
later passes `currentSplittingTable` back as 
`FetchTableSplitsRequest.snapshotTable`. In practice `SnapshotSplit.tableId` is 
typically qualified (e.g. `schema.table` / `db.table`), while `snapshotTable` 
is expected to be the unqualified table name (the code in cdc_client prefixes 
DB/schema again). This will cause the next fetchSplits RPC to target an invalid 
table id (double-prefixed) and also breaks `computeCdcRemainingTables()` / 
`splitsOfTbl` filtering due to mixed qualified vs unqualified identifiers. Keep 
`SplitProgress.currentSplittingTable` in the same namespace as 
`syncTables`/`snapshotTable` (likely unqualified), and compare/group splits by 
the same normalized key (e.g. extract table name from `split.getTableId()` when 
needed) instead of using `split.getTableId()` directly.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +630,193 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
-    public void splitChunks(List<String> createTbls) throws JobException {
-        // todo: When splitting takes a long time, it needs to be changed to 
asynchronous.
-        if (checkNeedSplitChunks(sourceProperties)) {
-            Map<String, List<SnapshotSplit>> tableSplits = new 
LinkedHashMap<>();
-            for (String tbl : createTbls) {
-                List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
-                tableSplits.put(tbl, snapshotSplits);
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /**
+     * One-time setup at CREATE.
+     * - initial/snapshot mode: init split progress; scheduler will drive 
advanceSplits() each tick.
+     * - latest mode (and other non-splitting modes): open the remote reader 
(e.g. PG slot) so the
+     *   binlog phase can start immediately; no snapshot splitting will happen.
+     */
+    @Override
+    public void initOnCreate(List<String> syncTables) throws JobException {
+        if (!checkNeedSplitChunks(sourceProperties)) {
+            initSourceReader();
+            return;
+        }
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        if (!checkNeedSplitChunks(sourceProperties)) {
+            return true;
+        }
+        synchronized (splitsLock) {
+            return cdcSplitProgress.getCurrentSplittingTable() == null
+                    && computeCdcRemainingTables().isEmpty();
+        }
+    }
+
+    /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+    private List<String> computeCdcRemainingTables() {
+        if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Set<String> touched = new HashSet<>();
+        for (SnapshotSplit s : finishedSplits) {
+            touched.add(s.getTableId());
+        }
+        for (SnapshotSplit s : remainingSplits) {
+            touched.add(s.getTableId());
+        }
+        if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+            touched.add(cdcSplitProgress.getCurrentSplittingTable());
+        }
+        List<String> result = new ArrayList<>(cachedSyncTables.size());
+        for (String t : cachedSyncTables) {
+            if (!touched.contains(t)) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void advanceSplits() throws JobException {
+        synchronized (splitsLock) {
+            // 1. Pick next table if not currently splitting one.
+            if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+                List<String> remaining = computeCdcRemainingTables();
+                if (remaining.isEmpty()) {
+                    return;
+                }
+                cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+                cdcSplitProgress.setNextSplitStart(null);
+                cdcSplitProgress.setNextSplitId(null);
+            }
+            String tbl = cdcSplitProgress.getCurrentSplittingTable();
+            Object[] startVal = cdcSplitProgress.getNextSplitStart();
+            Integer splitId = cdcSplitProgress.getNextSplitId();
+
+            // 2. RPC under lock — updateOffset may wait briefly
+            List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, 
splitId);
+            if (batch == null || batch.isEmpty()) {
+                return;
+            }
+
+            // 3. mergeBySplitId (defensive dedup).
+            Set<String> existingIds = new HashSet<>();
+            finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            List<SnapshotSplit> newSplits = new ArrayList<>();
+            for (SnapshotSplit s : batch) {
+                if (!existingIds.contains(s.getSplitId())) {
+                    newSplits.add(s);
+                }
             }
-            // save chunk list to system table
-            saveChunkMeta(tableSplits);
-            this.remainingSplits = tableSplits.values().stream()
-                    .flatMap(List::stream)
+            if (newSplits.size() < batch.size()) {
+                log.info("advanceSplits dedup'd {} duplicate splits (batch={}, 
new={}) for job {} table {}",
+                        batch.size() - newSplits.size(), batch.size(), 
newSplits.size(), getJobId(), tbl);
+            }
+            remainingSplits.addAll(newSplits);
+
+            // 4. UPSERT this table's full chunk_list to system table.
+            List<SnapshotSplit> splitsOfTbl = Stream.concat(
+                            finishedSplits.stream(), remainingSplits.stream())
+                    .filter(s -> tbl.equals(s.getTableId()))
+                    .sorted(Comparator.comparingInt(s -> 
splitIdOf(s.getSplitId())))
                     .collect(Collectors.toList());
-        } else {
-            // The source reader is automatically initialized when the split 
is obtained.
-            // In latest mode, a separate init is required.init source reader
-            initSourceReader();
+            try {
+                StreamingJobUtils.upsertChunkList(getJobId(), tbl, 
splitsOfTbl);
+            } catch (Exception e) {
+                throw new JobException("UPSERT chunk_list failed for " + tbl + 
": " + e.getMessage());
+            }

Review Comment:
   `advanceSplits()` holds `splitsLock` while performing the remote 
`rpcFetchSplitsBatch()` call and the system-table `upsertChunkList()` (internal 
SQL). Either of these can block for seconds/minutes, which will also block 
`updateOffset()`/`getNextOffset()` and can stall task commits or scheduling. 
Consider doing the RPC + UPSERT outside the critical section: snapshot the 
current progress under lock, release the lock to fetch/UPSERT, then re-acquire 
the lock to merge the batch if the progress/table still matches (or abort if it 
changed).



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +139,105 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
ChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC builds a fresh splitter from the (table, 
nextChunkStart, nextChunkId)
+     * triple supplied by FE, fetches up to {@code batchSize} chunks, then 
closes the splitter.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest 
modes never reach here.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
-                remainingSnapshotSplits = new ArrayList<>();
-        StreamSplit remainingStreamSplit = null;
-
-        // Check startup mode - for PostgreSQL, we use similar logic as MySQL
-        String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
-                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
-        } else {
-            // For non-initial mode, create a stream split
-            Offset startingOffset = createInitialOffset();
-            remainingStreamSplit =
-                    new StreamSplit(
-                            STREAM_SPLIT_ID,
-                            startingOffset,
-                            createNoStoppingOffset(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
+        String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+        TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+        ChunkSplitterState state = buildChunkSplitterState(sourceConfig, 
tableId, ftsReq);
+        ChunkSplitter splitter = 
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
+
+        try {
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+                        chunks = splitter.generateSplits(tableId);
+                for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk : 
chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));
+                }
+                if (!splitter.hasNextChunk()) {
+                    break;
+                }
+            }

Review Comment:
   Same as MySQL path: the `while (result.size() < batchSize)` loop adds all 
elements from `splitter.generateSplits(tableId)` without enforcing the 
`batchSize` limit if a single `generateSplits` call returns multiple chunks. 
This can produce responses larger than intended. Stop adding once 
`result.size()` reaches `batchSize` (or trim) so the RPC stays bounded.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -158,46 +162,131 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
MySqlChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC rebuilds the splitter from (table, 
nextSplitStart, nextSplitId)
+     * supplied by FE, splits up to {@code batchSize} chunks, then closes. 
Note: evenly-distributed
+     * PKs go through a single splitChunks() call returning all chunks at 
once, so batchSize is only
+     * effective on the uneven path.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes go through the chunk path; other 
modes return a
+     * single BinlogSplit instead.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         MySqlSourceConfig sourceConfig = getSourceConfig(ftsReq);
         StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
-        List<MySqlSnapshotSplit> remainingSnapshotSplits = new ArrayList<>();
-        MySqlBinlogSplit remainingBinlogSplit = null;
-        if (startupMode.equals(StartupMode.INITIAL) || 
startupMode.equals(StartupMode.SNAPSHOT)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
-        } else {
-            remainingBinlogSplit =
-                    new MySqlBinlogSplit(
-                            BINLOG_SPLIT_ID,
-                            sourceConfig.getStartupOptions().binlogOffset,
-                            BinlogOffset.ofNonStopping(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
+
+        if (!startupMode.equals(StartupMode.INITIAL) && 
!startupMode.equals(StartupMode.SNAPSHOT)) {
+            BinlogSplit binlogSplit = new BinlogSplit();
+            binlogSplit.setSplitId(BINLOG_SPLIT_ID);
+            
binlogSplit.setStartingOffset(sourceConfig.getStartupOptions().binlogOffset.getOffset());
+            return Collections.singletonList(binlogSplit);
         }
-        List<AbstractSourceSplit> splits = new ArrayList<>();
-        if (!remainingSnapshotSplits.isEmpty()) {
-            for (MySqlSnapshotSplit snapshotSplit : remainingSnapshotSplits) {
-                String splitId = snapshotSplit.splitId();
-                String tableId = snapshotSplit.getTableId().identifier();
-                Object[] splitStart = snapshotSplit.getSplitStart();
-                Object[] splitEnd = snapshotSplit.getSplitEnd();
-                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
-                SnapshotSplit split =
-                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
-                splits.add(split);
+
+        String database = 
ftsReq.getConfig().get(DataSourceConfigKeys.DATABASE);
+        TableId tableId = TableId.parse(database + "." + 
ftsReq.getSnapshotTable());
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+
+        boolean isCaseSensitive;
+        try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig)) {
+            isCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to query table id case 
sensitivity", e);
+        }
+        MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, 
isCaseSensitive);
+        MySqlPartition partition =
+                new 
MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
+
+        ChunkSplitterState state = buildChunkSplitterState(sourceConfig, 
tableId, ftsReq, mySqlSchema, partition);
+        MySqlChunkSplitter splitter = new MySqlChunkSplitter(mySqlSchema, 
sourceConfig, state);
+
+        try {
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                List<MySqlSnapshotSplit> chunks = 
splitter.splitChunks(partition, tableId);
+                for (MySqlSnapshotSplit chunk : chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));
+                }
+                if (!splitter.hasNextChunk()) {
+                    break;
+                }
             }

Review Comment:
   The loop intends to fetch “up to batchSize” splits, but it adds all splits 
returned by `splitter.splitChunks(...)` without stopping once `result.size()` 
reaches `batchSize`. If `splitChunks()` returns multiple chunks in one call, 
this RPC can return more than `batchSize`, undermining the goal of short/ 
bounded fetchSplits responses. Consider breaking out of the inner loop once 
`result.size() == batchSize` (and/or trimming the returned list).



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -163,21 +169,52 @@ public static void deleteJobMeta(Long jobId) {
         }
     }
 
-    public static void insertSplitsToMeta(Long jobId, Map<String, 
List<SnapshotSplit>> tableSplits) throws Exception {
-        List<String> values = new ArrayList<>();
-        int index = 1;
-        for (Map.Entry<String, List<SnapshotSplit>> entry : 
tableSplits.entrySet()) {
-            Map<String, String> params = new HashMap<>();
-            params.put("id", index + "");
-            params.put("job_id", jobId + "");
-            params.put("table_name", entry.getKey());
-            params.put("chunk_list", 
objectMapper.writeValueAsString(entry.getValue()));
-            StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
-            String sql = 
stringSubstitutor.replace(INSERT_INTO_META_TABLE_TEMPLATE);
-            values.add(sql);
-            index++;
+    /**
+     * UPSERT a single table's chunk_list. id is reused if the table already 
has a row,
+     * otherwise allocated as MAX(id)+1. Relies on UNIQUE KEY (id, job_id) for 
in-place override.
+     */
+    public static void upsertChunkList(Long jobId, String tableName, 
List<SnapshotSplit> chunks) throws Exception {
+        createMetaTableIfNotExist();
+        Integer id = querySingleTableId(jobId, tableName);
+        if (id == null) {
+            id = queryNextAvailableId(jobId);
+        }
+        Map<String, String> params = new HashMap<>();
+        params.put("id", String.valueOf(id));
+        params.put("job_id", String.valueOf(jobId));
+        params.put("table_name", tableName);
+        params.put("chunk_list", objectMapper.writeValueAsString(chunks));
+        StringSubstitutor sub = new StringSubstitutor(params);
+        String sql = sub.replace(INSERT_INTO_META_TABLE_TEMPLATE);
+        batchInsert(Collections.singletonList(sql));
+    }

Review Comment:
   `upsertChunkList()` allocates a new row id via `MAX(id)+1` when a table has 
no existing row, and relies on `UNIQUE KEY(id, job_id)` for overwrite. This is 
not safe under concurrent upserts for the same job: two callers can pick the 
same MAX(id) and then one upsert will overwrite the other table’s row (since 
table_name is not part of the unique key), corrupting `streaming_job_meta`. 
Prefer making `(job_id, table_name)` the unique key for true per-table upsert 
semantics, or implement an atomic id allocation strategy that cannot collide 
(and never overwrites an unrelated table row).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to