Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3216682019
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -141,9 +160,10 @@ public Offset getNextOffset(StreamingJobProperties
jobProps, Map<String, String>
List<SnapshotSplit> snapshotSplits = new
ArrayList<>(remainingSplits.subList(0, splitsNum));
nextOffset.setSplits(snapshotSplits);
return nextOffset;
- } else if (currentOffset != null && currentOffset.snapshotSplit()) {
- // initial mode: snapshot to binlog
- // snapshot-only mode must be intercepted by hasReachedEnd()
before reaching here
+ } else if (currentOffset != null && currentOffset.snapshotSplit() &&
noMoreSplits()) {
+ // initial mode: snapshot to binlog. noMoreSplits() guards against
switching while
+ // splitting is still in progress (remainingSplits empty doesn't
mean fully cut).
+ // snapshot-only mode must be intercepted by hasReachedEnd()
before reaching here.
Review Comment:
`remainingSplits`/`finishedSplits` are mutated under `splitsLock` (in
`advanceSplits()`/`updateOffset()`), but `getNextOffset()` reads
`remainingSplits` without the same lock. Since `getNextOffset()` runs in task
threads while commits/scheduler ticks can concurrently update splits, this can
lead to races/`ConcurrentModificationException` or selecting an inconsistent
subList. Please guard reads with `splitsLock` as well (or switch these
collections to thread-safe snapshots/copies).
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /** Initialize at CREATE time. syncTables is the list of source tables
this job syncs. */
+ @Override
+ public void initSplitProgress(List<String> syncTables) {
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
+ }
+ }
+
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(s.getTableId());
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(s.getTableId());
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+ touched.add(cdcSplitProgress.getCurrentSplittingTable());
+ }
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void advanceSplits() throws JobException {
+ synchronized (splitsLock) {
+ // 1. Pick next table if not currently splitting one.
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ String tbl = cdcSplitProgress.getCurrentSplittingTable();
+ Object[] startVal = cdcSplitProgress.getNextSplitStart();
+ Integer splitId = cdcSplitProgress.getNextSplitId();
+
+ // 2. RPC. OK to hold lock during RPC: no async splitting thread;
updateOffset on
+ // task commit waits briefly (max_interval >> RPC time anyway).
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+
+ // 3. mergeBySplitId (defensive dedup).
+ Set<String> existingIds = new HashSet<>();
+ finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ List<SnapshotSplit> newSplits = new ArrayList<>();
+ for (SnapshotSplit s : batch) {
+ if (!existingIds.contains(s.getSplitId())) {
+ newSplits.add(s);
+ }
+ }
+ remainingSplits.addAll(newSplits);
+
+ // 4. UPSERT this table's full chunk_list to system table.
+ List<SnapshotSplit> splitsOfTbl = Stream.concat(
+ finishedSplits.stream(), remainingSplits.stream())
+ .filter(s -> tbl.equals(s.getTableId()))
+ .sorted(Comparator.comparingInt(s ->
splitIdOf(s.getSplitId())))
+ .collect(Collectors.toList());
+ try {
+ StreamingJobUtils.upsertChunkList(getJobId(), tbl,
splitsOfTbl);
+ } catch (Exception e) {
+ throw new JobException("UPSERT chunk_list failed for " + tbl +
": " + e.getMessage());
+ }
+
+ // 5. Advance cdcSplitProgress to the last split in the batch.
+ applySplitToProgress(cdcSplitProgress, batch.get(batch.size() -
1));
+ }
+ }
+
+ /** Parse the trailing integer id from flink-cdc splitId format
"tableId:id". */
+ private static int splitIdOf(String splitId) {
+ int colon = splitId.lastIndexOf(':');
+ return Integer.parseInt(splitId.substring(colon + 1));
Review Comment:
`splitIdOf()` assumes a splitId format `tableId:id` and will throw if the
delimiter is missing or the suffix isn’t numeric. This is a new hard dependency
that can crash split fetching/sorting during replay. Please either (1) validate
and throw a clear `JobException` mentioning the unexpected splitId, or (2)
parse more defensively (e.g. extract the trailing integer with a regex /
support other delimiters) so older/newer flink-cdc splitId formats won’t break
FE.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +132,78 @@ public void initialize(String jobId, DataSource
dataSource, Map<String, String>
LOG.info("Initialized poll executor with parallelism: {}",
parallelism);
}
+ /**
+ * Fetch a batch of snapshot splits by driving flink-cdc {@link
ChunkSplitter} directly.
+ *
+ * <p>Stateless: each RPC builds a fresh splitter from the (table,
nextChunkStart, nextChunkId)
+ * triple supplied by FE, fetches up to {@code batchSize} chunks, then
closes the splitter.
+ *
+ * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest
modes never reach here.
+ */
@Override
public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest
ftsReq) {
- LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(),
ftsReq.getJobId());
+ LOG.info("Get table {} splits for job {} (nextSplitId={},
hasNextSplitStart={})",
+ ftsReq.getSnapshotTable(), ftsReq.getJobId(),
+ ftsReq.getNextSplitId(),
+ ftsReq.getNextSplitStart() != null);
JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
- remainingSnapshotSplits = new ArrayList<>();
- StreamSplit remainingStreamSplit = null;
-
- // Check startup mode - for PostgreSQL, we use similar logic as MySQL
- String startupMode =
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
- if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
- ||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
- remainingSnapshotSplits =
- startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
- } else {
- // For non-initial mode, create a stream split
- Offset startingOffset = createInitialOffset();
- remainingStreamSplit =
- new StreamSplit(
- STREAM_SPLIT_ID,
- startingOffset,
- createNoStoppingOffset(),
- new ArrayList<>(),
- new HashMap<>(),
- 0);
- }
+ String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+ TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+ // Reconstruct flink-cdc ChunkBound from FE-side nextSplitStart.
+ // null / empty -> START_BOUND (first time splitting this table,
start from min(pk))
+ // non-empty -> middleOf(value[0]) (continue from previous
split's splitEnd)
+ // END_BOUND never appears in transit: when a table is fully split, FE
clears
+ // committedSplitProgress.currentSplittingTable to null, so the next
RPC will be for
+ // a different table or no RPC at all.
+ Object[] pkValues = ftsReq.getNextSplitStart();
+ ChunkBound bound = (pkValues == null || pkValues.length == 0)
+ ? ChunkBound.START_BOUND
+ : ChunkBound.middleOf(pkValues[0]);
+
+ int splitId = ftsReq.getNextSplitId() == null ? 0 :
ftsReq.getNextSplitId();
+ int batchSize = ftsReq.getBatchSize() == null ? 100 :
ftsReq.getBatchSize();
+
+ ChunkSplitterState state = new ChunkSplitterState(tableId, bound,
splitId);
+ ChunkSplitter splitter =
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
+ splitter.open();
- List<AbstractSourceSplit> splits = new ArrayList<>();
- if (!remainingSnapshotSplits.isEmpty()) {
- for
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
- snapshotSplit : remainingSnapshotSplits) {
- String splitId = snapshotSplit.splitId();
- String tableId = snapshotSplit.getTableId().identifier();
- Object[] splitStart = snapshotSplit.getSplitStart();
- Object[] splitEnd = snapshotSplit.getSplitEnd();
- List<String> splitKey =
snapshotSplit.getSplitKeyType().getFieldNames();
- SnapshotSplit split =
- new SnapshotSplit(splitId, tableId, splitKey,
splitStart, splitEnd, null);
- splits.add(split);
+ try {
+ List<AbstractSourceSplit> result = new ArrayList<>();
+ while (result.size() < batchSize) {
+
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+ chunks = splitter.generateSplits(tableId);
Review Comment:
`splitter.open()` is executed before the `try` block. If `open()` throws,
the method will skip the `finally` that closes the splitter, potentially
leaking resources held by the dialect/splitter. Please move `open()` inside the
`try` (or wrap it so the `finally` always runs once the splitter is created).
##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java:
##########
@@ -33,6 +33,18 @@
public class FetchTableSplitsRequest extends JobBaseConfig {
private String snapshotTable;
+ /**
+ * PK values of the next split start (inclusive lower bound). null = fresh
start (START_BOUND).
+ * Same shape as SnapshotSplit.splitStart/splitEnd; cdc_client takes [0]
to construct ChunkBound.
+ */
+ private Object[] nextSplitStart;
+
+ /** Next split id; null = 0 (fresh start). */
+ private Integer nextSplitId;
+
+ /** Max splits to fetch in this RPC; null = default 100. */
+ private Integer batchSize;
Review Comment:
The request contract says `nextSplitStart` has the same shape as
`SnapshotSplit.splitStart/splitEnd`, but the comment also says cdc_client only
uses element `[0]`. This is misleading if composite split keys are possible,
and it bakes a single-column assumption into the API. Please either
clarify/enforce single-column split keys in validation, or update the contract
to support composite boundaries end-to-end.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -279,10 +308,16 @@ public boolean hasMoreDataToConsume() {
}
if (currentOffset.snapshotSplit()) {
- if (isSnapshotOnlyMode() && remainingSplits.isEmpty()) {
+ if (!remainingSplits.isEmpty()) {
+ return true;
+ }
+ // remainingSplits empty: if splitting is still in progress,
scheduler should delay
+ // task dispatch (advanceSplits will push more splits next tick).
+ if (!noMoreSplits()) {
return false;
}
- return true;
+ // Splitting fully done: snapshot-only completes here; initial
mode falls through to binlog.
+ return !isSnapshotOnlyMode();
Review Comment:
`hasMoreDataToConsume()` reads
`remainingSplits`/`currentOffset`/split-progress state without `splitsLock`,
while `advanceSplits()` and `updateOffset()` concurrently mutate these
structures under the lock. This can cause inconsistent scheduling decisions or
concurrent modification failures. Please take `splitsLock` (or use immutable
snapshots) for the snapshot-phase checks here.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +132,78 @@ public void initialize(String jobId, DataSource
dataSource, Map<String, String>
LOG.info("Initialized poll executor with parallelism: {}",
parallelism);
}
+ /**
+ * Fetch a batch of snapshot splits by driving flink-cdc {@link
ChunkSplitter} directly.
+ *
+ * <p>Stateless: each RPC builds a fresh splitter from the (table,
nextChunkStart, nextChunkId)
+ * triple supplied by FE, fetches up to {@code batchSize} chunks, then
closes the splitter.
+ *
+ * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest
modes never reach here.
+ */
@Override
public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest
ftsReq) {
- LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(),
ftsReq.getJobId());
+ LOG.info("Get table {} splits for job {} (nextSplitId={},
hasNextSplitStart={})",
+ ftsReq.getSnapshotTable(), ftsReq.getJobId(),
+ ftsReq.getNextSplitId(),
+ ftsReq.getNextSplitStart() != null);
JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
- remainingSnapshotSplits = new ArrayList<>();
- StreamSplit remainingStreamSplit = null;
-
- // Check startup mode - for PostgreSQL, we use similar logic as MySQL
- String startupMode =
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
- if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
- ||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
- remainingSnapshotSplits =
- startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
- } else {
- // For non-initial mode, create a stream split
- Offset startingOffset = createInitialOffset();
- remainingStreamSplit =
- new StreamSplit(
- STREAM_SPLIT_ID,
- startingOffset,
- createNoStoppingOffset(),
- new ArrayList<>(),
- new HashMap<>(),
- 0);
- }
+ String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+ TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+ // Reconstruct flink-cdc ChunkBound from FE-side nextSplitStart.
+ // null / empty -> START_BOUND (first time splitting this table,
start from min(pk))
+ // non-empty -> middleOf(value[0]) (continue from previous
split's splitEnd)
+ // END_BOUND never appears in transit: when a table is fully split, FE
clears
+ // committedSplitProgress.currentSplittingTable to null, so the next
RPC will be for
+ // a different table or no RPC at all.
+ Object[] pkValues = ftsReq.getNextSplitStart();
+ ChunkBound bound = (pkValues == null || pkValues.length == 0)
+ ? ChunkBound.START_BOUND
+ : ChunkBound.middleOf(pkValues[0]);
Review Comment:
`ChunkBound` is reconstructed from `nextSplitStart` using only
`pkValues[0]`. This drops additional PK columns if the split boundary is
composite (and the request type explicitly models boundaries as `Object[]`).
Please confirm this path only supports single-column split keys, or pass the
full boundary object through so composite PK tables can resume splitting
correctly.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /** Initialize at CREATE time. syncTables is the list of source tables
this job syncs. */
+ @Override
+ public void initSplitProgress(List<String> syncTables) {
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
+ }
+ }
+
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(s.getTableId());
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(s.getTableId());
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+ touched.add(cdcSplitProgress.getCurrentSplittingTable());
+ }
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void advanceSplits() throws JobException {
+ synchronized (splitsLock) {
+ // 1. Pick next table if not currently splitting one.
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ String tbl = cdcSplitProgress.getCurrentSplittingTable();
+ Object[] startVal = cdcSplitProgress.getNextSplitStart();
+ Integer splitId = cdcSplitProgress.getNextSplitId();
+
+ // 2. RPC. OK to hold lock during RPC: no async splitting thread;
updateOffset on
+ // task commit waits briefly (max_interval >> RPC time anyway).
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+
+ // 3. mergeBySplitId (defensive dedup).
+ Set<String> existingIds = new HashSet<>();
+ finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ List<SnapshotSplit> newSplits = new ArrayList<>();
+ for (SnapshotSplit s : batch) {
+ if (!existingIds.contains(s.getSplitId())) {
+ newSplits.add(s);
+ }
+ }
+ remainingSplits.addAll(newSplits);
+
+ // 4. UPSERT this table's full chunk_list to system table.
+ List<SnapshotSplit> splitsOfTbl = Stream.concat(
+ finishedSplits.stream(), remainingSplits.stream())
+ .filter(s -> tbl.equals(s.getTableId()))
+ .sorted(Comparator.comparingInt(s ->
splitIdOf(s.getSplitId())))
+ .collect(Collectors.toList());
+ try {
+ StreamingJobUtils.upsertChunkList(getJobId(), tbl,
splitsOfTbl);
+ } catch (Exception e) {
+ throw new JobException("UPSERT chunk_list failed for " + tbl +
": " + e.getMessage());
+ }
+
+ // 5. Advance cdcSplitProgress to the last split in the batch.
+ applySplitToProgress(cdcSplitProgress, batch.get(batch.size() -
1));
+ }
+ }
+
+ /** Parse the trailing integer id from flink-cdc splitId format
"tableId:id". */
+ private static int splitIdOf(String splitId) {
+ int colon = splitId.lastIndexOf(':');
+ return Integer.parseInt(splitId.substring(colon + 1));
+ }
+
+ /** Reset progress to "no table being split" state. */
+ private static void clearProgress(SplitProgress progress) {
+ progress.setCurrentSplittingTable(null);
+ progress.setNextSplitStart(null);
+ progress.setNextSplitId(null);
+ }
+
+ /**
+ * Apply a split's position to a progress object.
+ * - splitEnd null/empty (final split of table) → clear all fields.
+ * - splitEnd non-empty → set currentSplittingTable to split.tableId,
advance start/id.
+ */
+ private static void applySplitToProgress(SplitProgress progress,
SnapshotSplit split) {
+ if (split.getSplitEnd() == null || split.getSplitEnd().length == 0) {
+ clearProgress(progress);
+ } else {
+ progress.setCurrentSplittingTable(split.getTableId());
+ progress.setNextSplitStart(split.getSplitEnd());
+ progress.setNextSplitId(splitIdOf(split.getSplitId()) + 1);
+ }
+ }
+
+ /** RPC fetchSplits with (table, nextSplitStart, nextSplitId, batchSize).
protected for UT subclass. */
+ protected List<SnapshotSplit> rpcFetchSplitsBatch(String table, Object[]
nextSplitStart, Integer nextSplitId)
+ throws JobException {
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+ FetchTableSplitsRequest requestParams = new FetchTableSplitsRequest(
+ getJobId(), sourceType.name(), sourceProperties,
getFrontendAddress(), table);
+ requestParams.setNextSplitStart(nextSplitStart);
+ requestParams.setNextSplitId(nextSplitId);
+ InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
+ .setApi("/api/fetchSplits")
+ .setParams(new Gson().toJson(requestParams))
+ .build();
Review Comment:
`rpcFetchSplitsBatch`’s comment says it sends `(table, nextSplitStart,
nextSplitId, batchSize)`, and `FetchTableSplitsRequest` now has `batchSize`,
but the request never sets it. That makes batch sizing effectively
non-configurable from FE. Please either set `requestParams.setBatchSize(...)`
(from a config/property) or remove the unused field/comment to avoid API drift.
##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -180,6 +186,54 @@ public static void insertSplitsToMeta(Long jobId,
Map<String, List<SnapshotSplit
batchInsert(values);
}
+ /**
+ * UPSERT a single table's chunk_list. id is reused if the table already
has a row,
+ * otherwise allocated as MAX(id)+1. Relies on UNIQUE KEY (id, job_id) for
in-place override.
+ */
+ public static void upsertChunkList(Long jobId, String tableName,
List<SnapshotSplit> chunks) throws Exception {
+ createMetaTableIfNotExist();
+ Integer id = querySingleTableId(jobId, tableName);
+ if (id == null) {
+ id = queryNextAvailableId(jobId);
+ }
+ Map<String, String> params = new HashMap<>();
+ params.put("id", String.valueOf(id));
+ params.put("job_id", String.valueOf(jobId));
+ params.put("table_name", tableName);
+ params.put("chunk_list", objectMapper.writeValueAsString(chunks));
+ StringSubstitutor sub = new StringSubstitutor(params);
+ String sql = sub.replace(INSERT_INTO_META_TABLE_TEMPLATE);
+ batchInsert(Collections.singletonList(sql));
Review Comment:
`upsertChunkList()` builds SQL via string substitution with `tableName` and
the JSON `chunk_list` embedded inside single-quoted literals. If either value
contains a single quote (or other special characters), this can break the
internal query or allow SQL injection into the internal meta table. Please
escape/quote these values safely (e.g. replace `'` with `''` for SQL literals)
or use a parameterized execution path if available.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]