This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c852e608302 [improve](streaming-job) admit multiple splits per
scheduler tick to speed up many-table snapshot (#64566)
c852e608302 is described below
commit c852e6083022323b8bcfc9463e8b8ab530c2a777
Author: wudi <[email protected]>
AuthorDate: Tue Jun 23 10:05:42 2026 +0800
[improve](streaming-job) admit multiple splits per scheduler tick to speed
up many-table snapshot (#64566)
## Proposed changes
### Problem
In `FROM ... TO ...` (at-least-once) CDC streaming jobs, the snapshot
split-admission step
(`advanceSplitsIfNeed` → `advanceSplits`) admits only **one table's**
batch of splits per effective
scheduler tick, and the tick interval is bound to `max_interval`. When a
job syncs many small tables
(e.g. 1000 tables), the splitting pace (~1 table/tick) falls far behind
the consumer, so the overall
snapshot is much slower than syncing a single large table.
---
.../insert/streaming/StreamingInsertJob.java | 17 ++-
.../doris/job/offset/SourceOffsetProvider.java | 5 +
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 7 +
.../StreamingInsertJobAdvanceSplitsTest.java | 160 +++++++++++++++++++++
4 files changed, 188 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index be9e6d22f22..855ee75183a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -136,6 +136,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@SerializedName("props")
private Map<String, String> properties;
private StreamingJobProperties jobProperties;
+ // Soft cap on produced-but-unconsumed splits buffered on FE; checked
before each round,
+ // so the actual backlog may overshoot by up to one fetch batch.
+ private static final int MAX_PENDING_SPLITS = 512;
@Getter
@SerializedName("tvf")
private String tvfType;
@@ -739,8 +742,20 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (offsetProvider.noMoreSplits()) {
return;
}
+ // Admit as many splits as possible this tick until the pending
backlog hits the soft cap,
+ // then yield before the next tick (deadline) which resumes the rest.
+ long intervalMs = jobProperties.getMaxIntervalSecond() * 1000L;
+ long deadline = System.currentTimeMillis() + intervalMs -
Math.min(1000L, intervalMs / 10);
try {
- offsetProvider.advanceSplits();
+ while (!offsetProvider.noMoreSplits()
+ && offsetProvider.pendingSplitCount() < MAX_PENDING_SPLITS
+ && System.currentTimeMillis() < deadline) {
+ int before = offsetProvider.pendingSplitCount();
+ offsetProvider.advanceSplits();
+ if (offsetProvider.pendingSplitCount() <= before) {
+ break; // nothing produced this round; avoid spin
+ }
+ }
} catch (Exception ex) {
log.warn("advance splits failed, job id: {}", getJobId(), ex);
if (this.getFailureReason() == null
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index a6f9e582bff..fe03208bd18 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -198,6 +198,11 @@ public interface SourceOffsetProvider {
return true;
}
+ /** Splits produced but not yet consumed (FE-side backlog). */
+ default int pendingSplitCount() {
+ return 0;
+ }
+
/**
* Get the lag of the data source in seconds.
* For CDC sources, lag = (now - last consumed event timestamp) in seconds.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 3cfe6734564..f80147e970f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -653,6 +653,13 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
}
}
+ @Override
+ public int pendingSplitCount() {
+ synchronized (splitsLock) {
+ return remainingSplits.size();
+ }
+ }
+
@Override
public boolean noMoreSplits() {
if (!checkNeedSplitChunks(sourceProperties)) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobAdvanceSplitsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobAdvanceSplitsTest.java
new file mode 100644
index 00000000000..507719685b0
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobAdvanceSplitsTest.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Covers the bounded admission loop in {@link
StreamingInsertJob#advanceSplitsIfNeed()}:
+ * it must keep admitting within one tick yet terminate on every in-loop exit
condition
+ * (noMoreSplits / pending cap / no-progress round) without spinning. The
deadline term is
+ * wall-clock based and is covered by integration/regression rather than here.
+ */
+public class StreamingInsertJobAdvanceSplitsTest {
+
+ // Assumed value of StreamingInsertJob.MAX_PENDING_SPLITS; update both
together if changed.
+ private static final int CAP = 512;
+
+ private static StreamingInsertJob newJob(SourceOffsetProvider provider,
long maxIntervalSec) {
+ StreamingInsertJob job =
Deencapsulation.newInstance(StreamingInsertJob.class);
+ Map<String, String> props = new HashMap<>();
+ props.put(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY,
String.valueOf(maxIntervalSec));
+ Deencapsulation.setField(job, "jobProperties", new
StreamingJobProperties(props));
+ Deencapsulation.setField(job, "offsetProvider", provider);
+ return job;
+ }
+
+ @Test
+ public void testLoopsUntilNoMoreSplits() throws Exception {
+ // Many small tables: one split per round, splitting completes after 5
rounds.
+ FakeOffsetProvider provider = new FakeOffsetProvider(1, 5);
+ newJob(provider, 3600).advanceSplitsIfNeed();
+ Assert.assertEquals("must keep admitting across rounds until
noMoreSplits", 5, provider.rounds);
+ }
+
+ @Test
+ public void testStopsAtPendingCap() throws Exception {
+ // Fast producer that never finishes: must stop once the FE backlog
cap is crossed.
+ FakeOffsetProvider provider = new FakeOffsetProvider(100, -1);
+ newJob(provider, 3600).advanceSplitsIfNeed();
+ Assert.assertTrue("must stop once pending crosses the cap",
provider.pendingSplitCount() >= CAP);
+ Assert.assertEquals("must not overshoot beyond one round past the
cap", 6, provider.rounds);
+ }
+
+ @Test
+ public void testBreaksWhenRoundProducesNothing() throws Exception {
+ // A round that yields no new split (empty RPC / cursor moved) must
break, not spin to deadline.
+ FakeOffsetProvider provider = new FakeOffsetProvider(0, -1);
+ newJob(provider, 3600).advanceSplitsIfNeed();
+ Assert.assertEquals("must break after a no-progress round", 1,
provider.rounds);
+ }
+
+ @Test
+ public void testSkipsWhenAlreadyDone() throws Exception {
+ // Entry guard: noMoreSplits up front means the loop never runs.
+ FakeOffsetProvider provider = new FakeOffsetProvider(1, 0);
+ newJob(provider, 3600).advanceSplitsIfNeed();
+ Assert.assertEquals("entry guard must skip the loop entirely", 0,
provider.rounds);
+ }
+
+ /** Minimal provider whose admission behaviour is fully controllable. */
+ private static class FakeOffsetProvider implements SourceOffsetProvider {
+ private int pending;
+ private int rounds;
+ private final int stepPerRound; // pending increment per
advanceSplits call
+ private final int roundsUntilDone; // noMoreSplits() flips true after
this many rounds (-1 = never)
+
+ FakeOffsetProvider(int stepPerRound, int roundsUntilDone) {
+ this.stepPerRound = stepPerRound;
+ this.roundsUntilDone = roundsUntilDone;
+ }
+
+ @Override
+ public void advanceSplits() {
+ rounds++;
+ pending += stepPerRound;
+ }
+
+ @Override
+ public int pendingSplitCount() {
+ return pending;
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ return roundsUntilDone >= 0 && rounds >= roundsUntilDone;
+ }
+
+ @Override
+ public String getSourceType() {
+ return "fake";
+ }
+
+ @Override
+ public Offset getNextOffset(StreamingJobProperties jobProps,
Map<String, String> properties) {
+ return null;
+ }
+
+ @Override
+ public String getShowCurrentOffset() {
+ return "";
+ }
+
+ @Override
+ public String getShowMaxOffset() {
+ return "";
+ }
+
+ @Override
+ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand, Offset next, long id) {
+ return originCommand;
+ }
+
+ @Override
+ public void updateOffset(Offset offset) {
+ }
+
+ @Override
+ public void fetchRemoteMeta(Map<String, String> properties) {
+ }
+
+ @Override
+ public boolean hasMoreDataToConsume() {
+ return true;
+ }
+
+ @Override
+ public Offset deserializeOffset(String offset) {
+ return null;
+ }
+
+ @Override
+ public Offset deserializeOffsetProperty(String offset) {
+ return null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]