Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3216293298
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /** Initialize at CREATE time. syncTables is the list of source tables
this job syncs. */
+ @Override
+ public void initSplitProgress(List<String> syncTables) {
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
Review Comment:
JdbcSourceOffsetProvider.initSplitProgress()/noMoreSplits()/advanceSplits()
currently drive snapshot split fetching for any job with non-empty syncTables,
regardless of startup offset mode. This will incorrectly trigger snapshot
splitting for OFFSET_LATEST/OFFSET_EARLIEST/JSON specific-offset jobs (default
is latest), which should be binlog-only and previously initialized the source
reader instead of fetching snapshot splits. Suggest: if
!checkNeedSplitChunks(sourceProperties), make initSplitProgress/advanceSplits a
no-op (and ensure noMoreSplits returns true), and preserve the old latest-mode
behavior by calling initSourceReader() once.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -246,8 +252,8 @@ private void initSourceJob() {
StreamingJobUtils.resolveAndValidateSource(
dataSourceType, sourceProperties,
String.valueOf(getJobId()), createTbls);
this.offsetProvider =
createOffsetProvider(getConvertedSourceProperties());
- JdbcSourceOffsetProvider rdsOffsetProvider =
(JdbcSourceOffsetProvider) this.offsetProvider;
- rdsOffsetProvider.splitChunks(createTbls);
+ // Initialize split progress; advanceSplits is driven later by the
scheduler each tick.
+ this.offsetProvider.initSplitProgress(this.syncTables);
} catch (Exception ex) {
Review Comment:
initSourceJob() now always calls
offsetProvider.initSplitProgress(syncTables) and relies on scheduler-driven
advanceSplits(), but this changes behavior for non-snapshot startup modes (e.g.
default OFFSET_LATEST). Previously splitChunks() would skip snapshot splitting
and call initSourceReader() for latest mode; with the current flow, scheduler
ticks can start fetching snapshot splits for latest/binlog-only jobs (or fail
because reader init was skipped). Consider gating split-progress
initialization/scheduler split-advancing to INITIAL/SNAPSHOT modes (or making
the JdbcSourceOffsetProvider implementation a no-op for non-splitting modes
while still initializing the reader for latest).
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /** Initialize at CREATE time. syncTables is the list of source tables
this job syncs. */
+ @Override
+ public void initSplitProgress(List<String> syncTables) {
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
+ }
+ }
+
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(s.getTableId());
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(s.getTableId());
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+ touched.add(cdcSplitProgress.getCurrentSplittingTable());
+ }
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void advanceSplits() throws JobException {
+ synchronized (splitsLock) {
+ // 1. Pick next table if not currently splitting one.
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ String tbl = cdcSplitProgress.getCurrentSplittingTable();
+ Object[] startVal = cdcSplitProgress.getNextSplitStart();
+ Integer splitId = cdcSplitProgress.getNextSplitId();
+
+ // 2. RPC. OK to hold lock during RPC: no async splitting thread;
updateOffset on
+ // task commit waits briefly (max_interval >> RPC time anyway).
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+
+ // 3. mergeBySplitId (defensive dedup).
+ Set<String> existingIds = new HashSet<>();
+ finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ List<SnapshotSplit> newSplits = new ArrayList<>();
+ for (SnapshotSplit s : batch) {
+ if (!existingIds.contains(s.getSplitId())) {
+ newSplits.add(s);
+ }
+ }
+ remainingSplits.addAll(newSplits);
+
+ // 4. UPSERT this table's full chunk_list to system table.
+ List<SnapshotSplit> splitsOfTbl = Stream.concat(
+ finishedSplits.stream(), remainingSplits.stream())
+ .filter(s -> tbl.equals(s.getTableId()))
+ .sorted(Comparator.comparingInt(s ->
splitIdOf(s.getSplitId())))
+ .collect(Collectors.toList());
+ try {
+ StreamingJobUtils.upsertChunkList(getJobId(), tbl,
splitsOfTbl);
+ } catch (Exception e) {
+ throw new JobException("UPSERT chunk_list failed for " + tbl +
": " + e.getMessage());
+ }
+
+ // 5. Advance cdcSplitProgress to the last split in the batch.
+ applySplitToProgress(cdcSplitProgress, batch.get(batch.size() -
1));
+ }
+ }
+
+ /** Parse the trailing integer id from flink-cdc splitId format
"tableId:id". */
+ private static int splitIdOf(String splitId) {
+ int colon = splitId.lastIndexOf(':');
+ return Integer.parseInt(splitId.substring(colon + 1));
+ }
Review Comment:
splitIdOf() assumes the flink-cdc snapshot splitId is always in the form
"tableId:id" and will throw StringIndexOutOfBounds/NumberFormatException
otherwise. Since this is called during replay + progress advancement, a
malformed/changed splitId format would crash the scheduler tick. Consider
validating the presence of ':' and parsing failures, and surfacing a
JobException with a clear message (including the offending splitId) instead of
letting runtime exceptions propagate.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +132,78 @@ public void initialize(String jobId, DataSource
dataSource, Map<String, String>
LOG.info("Initialized poll executor with parallelism: {}",
parallelism);
}
+ /**
+ * Fetch a batch of snapshot splits by driving flink-cdc {@link
ChunkSplitter} directly.
+ *
+ * <p>Stateless: each RPC builds a fresh splitter from the (table,
nextChunkStart, nextChunkId)
+ * triple supplied by FE, fetches up to {@code batchSize} chunks, then
closes the splitter.
+ *
+ * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest
modes never reach here.
+ */
@Override
public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest
ftsReq) {
- LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(),
ftsReq.getJobId());
+ LOG.info("Get table {} splits for job {} (nextSplitId={},
hasNextSplitStart={})",
+ ftsReq.getSnapshotTable(), ftsReq.getJobId(),
+ ftsReq.getNextSplitId(),
+ ftsReq.getNextSplitStart() != null);
JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
- remainingSnapshotSplits = new ArrayList<>();
- StreamSplit remainingStreamSplit = null;
-
- // Check startup mode - for PostgreSQL, we use similar logic as MySQL
- String startupMode =
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
- if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
- ||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
- remainingSnapshotSplits =
- startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
- } else {
- // For non-initial mode, create a stream split
- Offset startingOffset = createInitialOffset();
- remainingStreamSplit =
- new StreamSplit(
- STREAM_SPLIT_ID,
- startingOffset,
- createNoStoppingOffset(),
- new ArrayList<>(),
- new HashMap<>(),
- 0);
- }
+ String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+ TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+ // Reconstruct flink-cdc ChunkBound from FE-side nextSplitStart.
+ // null / empty -> START_BOUND (first time splitting this table,
start from min(pk))
+ // non-empty -> middleOf(value[0]) (continue from previous
split's splitEnd)
+ // END_BOUND never appears in transit: when a table is fully split, FE
clears
+ // committedSplitProgress.currentSplittingTable to null, so the next
RPC will be for
+ // a different table or no RPC at all.
+ Object[] pkValues = ftsReq.getNextSplitStart();
+ ChunkBound bound = (pkValues == null || pkValues.length == 0)
+ ? ChunkBound.START_BOUND
+ : ChunkBound.middleOf(pkValues[0]);
Review Comment:
ChunkBound reconstruction uses nextSplitStart[0] only
(ChunkBound.middleOf(pkValues[0])). If nextSplitStart ever contains multiple PK
values (e.g. composite key or future multi-column split key support), this will
silently ignore the remaining components and can resume from an incorrect
position. Consider validating pkValues length and failing fast with a clear
error when it is not the expected shape, rather than implicitly dropping values.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +132,78 @@ public void initialize(String jobId, DataSource
dataSource, Map<String, String>
LOG.info("Initialized poll executor with parallelism: {}",
parallelism);
}
+ /**
+ * Fetch a batch of snapshot splits by driving flink-cdc {@link
ChunkSplitter} directly.
+ *
+ * <p>Stateless: each RPC builds a fresh splitter from the (table,
nextChunkStart, nextChunkId)
+ * triple supplied by FE, fetches up to {@code batchSize} chunks, then
closes the splitter.
+ *
+ * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest
modes never reach here.
+ */
@Override
public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest
ftsReq) {
- LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(),
ftsReq.getJobId());
+ LOG.info("Get table {} splits for job {} (nextSplitId={},
hasNextSplitStart={})",
+ ftsReq.getSnapshotTable(), ftsReq.getJobId(),
+ ftsReq.getNextSplitId(),
+ ftsReq.getNextSplitStart() != null);
JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
- remainingSnapshotSplits = new ArrayList<>();
- StreamSplit remainingStreamSplit = null;
-
- // Check startup mode - for PostgreSQL, we use similar logic as MySQL
- String startupMode =
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
- if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
- ||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
- remainingSnapshotSplits =
- startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
- } else {
- // For non-initial mode, create a stream split
- Offset startingOffset = createInitialOffset();
- remainingStreamSplit =
- new StreamSplit(
- STREAM_SPLIT_ID,
- startingOffset,
- createNoStoppingOffset(),
- new ArrayList<>(),
- new HashMap<>(),
- 0);
- }
+ String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+ TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+ // Reconstruct flink-cdc ChunkBound from FE-side nextSplitStart.
+ // null / empty -> START_BOUND (first time splitting this table,
start from min(pk))
+ // non-empty -> middleOf(value[0]) (continue from previous
split's splitEnd)
+ // END_BOUND never appears in transit: when a table is fully split, FE
clears
+ // committedSplitProgress.currentSplittingTable to null, so the next
RPC will be for
+ // a different table or no RPC at all.
+ Object[] pkValues = ftsReq.getNextSplitStart();
+ ChunkBound bound = (pkValues == null || pkValues.length == 0)
+ ? ChunkBound.START_BOUND
+ : ChunkBound.middleOf(pkValues[0]);
+
+ int splitId = ftsReq.getNextSplitId() == null ? 0 :
ftsReq.getNextSplitId();
+ int batchSize = ftsReq.getBatchSize() == null ? 100 :
ftsReq.getBatchSize();
+
+ ChunkSplitterState state = new ChunkSplitterState(tableId, bound,
splitId);
+ ChunkSplitter splitter =
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
+ splitter.open();
Review Comment:
getSourceSplits() now always reconstructs a flink-cdc ChunkSplitter and
generates snapshot splits, but the previous implementation returned a
StreamSplit/BinlogSplit for non INITIAL/SNAPSHOT startup modes. If FE ever
calls /api/fetchSplits for OFFSET_LATEST/OFFSET_EARLIEST/JSON specific offsets
(including via regressions), this will force an unintended snapshot phase.
Please reintroduce explicit startup-mode handling here (generate snapshot
splits only for INITIAL/SNAPSHOT; otherwise return a StreamSplit/BinlogSplit or
fail fast with a clear error).
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /** Initialize at CREATE time. syncTables is the list of source tables
this job syncs. */
+ @Override
+ public void initSplitProgress(List<String> syncTables) {
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
+ }
+ }
+
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(s.getTableId());
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(s.getTableId());
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+ touched.add(cdcSplitProgress.getCurrentSplittingTable());
+ }
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void advanceSplits() throws JobException {
+ synchronized (splitsLock) {
+ // 1. Pick next table if not currently splitting one.
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ String tbl = cdcSplitProgress.getCurrentSplittingTable();
+ Object[] startVal = cdcSplitProgress.getNextSplitStart();
+ Integer splitId = cdcSplitProgress.getNextSplitId();
+
+ // 2. RPC. OK to hold lock during RPC: no async splitting thread;
updateOffset on
+ // task commit waits briefly (max_interval >> RPC time anyway).
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+
+ // 3. mergeBySplitId (defensive dedup).
+ Set<String> existingIds = new HashSet<>();
+ finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ List<SnapshotSplit> newSplits = new ArrayList<>();
+ for (SnapshotSplit s : batch) {
+ if (!existingIds.contains(s.getSplitId())) {
+ newSplits.add(s);
+ }
+ }
+ remainingSplits.addAll(newSplits);
+
+ // 4. UPSERT this table's full chunk_list to system table.
+ List<SnapshotSplit> splitsOfTbl = Stream.concat(
+ finishedSplits.stream(), remainingSplits.stream())
+ .filter(s -> tbl.equals(s.getTableId()))
+ .sorted(Comparator.comparingInt(s ->
splitIdOf(s.getSplitId())))
+ .collect(Collectors.toList());
+ try {
+ StreamingJobUtils.upsertChunkList(getJobId(), tbl,
splitsOfTbl);
+ } catch (Exception e) {
+ throw new JobException("UPSERT chunk_list failed for " + tbl +
": " + e.getMessage());
+ }
+
+ // 5. Advance cdcSplitProgress to the last split in the batch.
+ applySplitToProgress(cdcSplitProgress, batch.get(batch.size() -
1));
+ }
+ }
+
Review Comment:
advanceSplits() holds splitsLock while performing rpcFetchSplitsBatch()
(BRPC) and while UPSERTing chunk_list into the internal meta table. Both
operations can block for up to the heavy RPC timeout / query latency, which can
stall updateOffset() (task commit) and other scheduler logic that also
synchronizes on splitsLock. Consider releasing the lock around the RPC +
meta-table write: snapshot the current (table,start,id) under lock, perform
remote work unlocked, then re-lock to merge results if progress is unchanged
(or use a small state version/compare-and-swap) to avoid long lock holds.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /** Initialize at CREATE time. syncTables is the list of source tables
this job syncs. */
+ @Override
+ public void initSplitProgress(List<String> syncTables) {
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
+ }
+ }
+
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(s.getTableId());
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(s.getTableId());
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+ touched.add(cdcSplitProgress.getCurrentSplittingTable());
+ }
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void advanceSplits() throws JobException {
+ synchronized (splitsLock) {
+ // 1. Pick next table if not currently splitting one.
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ String tbl = cdcSplitProgress.getCurrentSplittingTable();
+ Object[] startVal = cdcSplitProgress.getNextSplitStart();
+ Integer splitId = cdcSplitProgress.getNextSplitId();
+
+ // 2. RPC. OK to hold lock during RPC: no async splitting thread;
updateOffset on
+ // task commit waits briefly (max_interval >> RPC time anyway).
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+
+ // 3. mergeBySplitId (defensive dedup).
+ Set<String> existingIds = new HashSet<>();
+ finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ List<SnapshotSplit> newSplits = new ArrayList<>();
+ for (SnapshotSplit s : batch) {
+ if (!existingIds.contains(s.getSplitId())) {
+ newSplits.add(s);
+ }
+ }
+ remainingSplits.addAll(newSplits);
+
+ // 4. UPSERT this table's full chunk_list to system table.
+ List<SnapshotSplit> splitsOfTbl = Stream.concat(
+ finishedSplits.stream(), remainingSplits.stream())
+ .filter(s -> tbl.equals(s.getTableId()))
+ .sorted(Comparator.comparingInt(s ->
splitIdOf(s.getSplitId())))
+ .collect(Collectors.toList());
+ try {
Review Comment:
advanceSplits() rebuilds existingIds and re-materializes + sorts the full
per-table split list on every tick (Stream.concat(finished, remaining) ->
filter -> sort -> collect). For large tables with many splits, this becomes
O(total_splits_per_table log N) per batch and can be a noticeable scheduler
overhead. Consider maintaining a per-table ordered list/map incrementally
(append-only by splitId) so each tick only processes the new batch, or at least
avoid re-sorting the full list every time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]