This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c852e608302 [improve](streaming-job) admit multiple splits per 
scheduler tick to speed up many-table snapshot (#64566)
c852e608302 is described below

commit c852e6083022323b8bcfc9463e8b8ab530c2a777
Author: wudi <[email protected]>
AuthorDate: Tue Jun 23 10:05:42 2026 +0800

    [improve](streaming-job) admit multiple splits per scheduler tick to speed 
up many-table snapshot (#64566)
    
    ## Proposed changes
    
    ### Problem
    In `FROM ... TO ...` (at-least-once) CDC streaming jobs, the snapshot
    split-admission step
    (`advanceSplitsIfNeed` → `advanceSplits`) admits only **one table's**
    batch of splits per effective
    scheduler tick, and the tick interval is bound to `max_interval`. When a
    job syncs many small tables
    (e.g. 1000 tables), the splitting pace (~1 table/tick) falls far behind
    the consumer, so the overall
    snapshot is much slower than syncing a single large table.
---
 .../insert/streaming/StreamingInsertJob.java       |  17 ++-
 .../doris/job/offset/SourceOffsetProvider.java     |   5 +
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |   7 +
 .../StreamingInsertJobAdvanceSplitsTest.java       | 160 +++++++++++++++++++++
 4 files changed, 188 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index be9e6d22f22..855ee75183a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -136,6 +136,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @SerializedName("props")
     private Map<String, String> properties;
     private StreamingJobProperties jobProperties;
+    // Soft cap on produced-but-unconsumed splits buffered on FE; checked 
before each round,
+    // so the actual backlog may overshoot by up to one fetch batch.
+    private static final int MAX_PENDING_SPLITS = 512;
     @Getter
     @SerializedName("tvf")
     private String tvfType;
@@ -739,8 +742,20 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         if (offsetProvider.noMoreSplits()) {
             return;
         }
+        // Admit as many splits as possible this tick until the pending 
backlog hits the soft cap,
+        // then yield before the next tick (deadline) which resumes the rest.
+        long intervalMs = jobProperties.getMaxIntervalSecond() * 1000L;
+        long deadline = System.currentTimeMillis() + intervalMs - 
Math.min(1000L, intervalMs / 10);
         try {
-            offsetProvider.advanceSplits();
+            while (!offsetProvider.noMoreSplits()
+                    && offsetProvider.pendingSplitCount() < MAX_PENDING_SPLITS
+                    && System.currentTimeMillis() < deadline) {
+                int before = offsetProvider.pendingSplitCount();
+                offsetProvider.advanceSplits();
+                if (offsetProvider.pendingSplitCount() <= before) {
+                    break; // nothing produced this round; avoid spin
+                }
+            }
         } catch (Exception ex) {
             log.warn("advance splits failed, job id: {}", getJobId(), ex);
             if (this.getFailureReason() == null
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index a6f9e582bff..fe03208bd18 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -198,6 +198,11 @@ public interface SourceOffsetProvider {
         return true;
     }
 
+    /** Splits produced but not yet consumed (FE-side backlog). */
+    default int pendingSplitCount() {
+        return 0;
+    }
+
     /**
      * Get the lag of the data source in seconds.
      * For CDC sources, lag = (now - last consumed event timestamp) in seconds.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 3cfe6734564..f80147e970f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -653,6 +653,13 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         }
     }
 
+    @Override
+    public int pendingSplitCount() {
+        synchronized (splitsLock) {
+            return remainingSplits.size();
+        }
+    }
+
     @Override
     public boolean noMoreSplits() {
         if (!checkNeedSplitChunks(sourceProperties)) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobAdvanceSplitsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobAdvanceSplitsTest.java
new file mode 100644
index 00000000000..507719685b0
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobAdvanceSplitsTest.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Covers the bounded admission loop in {@link 
StreamingInsertJob#advanceSplitsIfNeed()}:
+ * it must keep admitting within one tick yet terminate on every in-loop exit 
condition
+ * (noMoreSplits / pending cap / no-progress round) without spinning. The 
deadline term is
+ * wall-clock based and is covered by integration/regression rather than here.
+ */
+public class StreamingInsertJobAdvanceSplitsTest {
+
+    // Assumed value of StreamingInsertJob.MAX_PENDING_SPLITS; update both 
together if changed.
+    private static final int CAP = 512;
+
+    private static StreamingInsertJob newJob(SourceOffsetProvider provider, 
long maxIntervalSec) {
+        StreamingInsertJob job = 
Deencapsulation.newInstance(StreamingInsertJob.class);
+        Map<String, String> props = new HashMap<>();
+        props.put(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY, 
String.valueOf(maxIntervalSec));
+        Deencapsulation.setField(job, "jobProperties", new 
StreamingJobProperties(props));
+        Deencapsulation.setField(job, "offsetProvider", provider);
+        return job;
+    }
+
+    @Test
+    public void testLoopsUntilNoMoreSplits() throws Exception {
+        // Many small tables: one split per round, splitting completes after 5 
rounds.
+        FakeOffsetProvider provider = new FakeOffsetProvider(1, 5);
+        newJob(provider, 3600).advanceSplitsIfNeed();
+        Assert.assertEquals("must keep admitting across rounds until 
noMoreSplits", 5, provider.rounds);
+    }
+
+    @Test
+    public void testStopsAtPendingCap() throws Exception {
+        // Fast producer that never finishes: must stop once the FE backlog 
cap is crossed.
+        FakeOffsetProvider provider = new FakeOffsetProvider(100, -1);
+        newJob(provider, 3600).advanceSplitsIfNeed();
+        Assert.assertTrue("must stop once pending crosses the cap", 
provider.pendingSplitCount() >= CAP);
+        Assert.assertEquals("must not overshoot beyond one round past the 
cap", 6, provider.rounds);
+    }
+
+    @Test
+    public void testBreaksWhenRoundProducesNothing() throws Exception {
+        // A round that yields no new split (empty RPC / cursor moved) must 
break, not spin to deadline.
+        FakeOffsetProvider provider = new FakeOffsetProvider(0, -1);
+        newJob(provider, 3600).advanceSplitsIfNeed();
+        Assert.assertEquals("must break after a no-progress round", 1, 
provider.rounds);
+    }
+
+    @Test
+    public void testSkipsWhenAlreadyDone() throws Exception {
+        // Entry guard: noMoreSplits up front means the loop never runs.
+        FakeOffsetProvider provider = new FakeOffsetProvider(1, 0);
+        newJob(provider, 3600).advanceSplitsIfNeed();
+        Assert.assertEquals("entry guard must skip the loop entirely", 0, 
provider.rounds);
+    }
+
+    /** Minimal provider whose admission behaviour is fully controllable. */
+    private static class FakeOffsetProvider implements SourceOffsetProvider {
+        private int pending;
+        private int rounds;
+        private final int stepPerRound;     // pending increment per 
advanceSplits call
+        private final int roundsUntilDone;  // noMoreSplits() flips true after 
this many rounds (-1 = never)
+
+        FakeOffsetProvider(int stepPerRound, int roundsUntilDone) {
+            this.stepPerRound = stepPerRound;
+            this.roundsUntilDone = roundsUntilDone;
+        }
+
+        @Override
+        public void advanceSplits() {
+            rounds++;
+            pending += stepPerRound;
+        }
+
+        @Override
+        public int pendingSplitCount() {
+            return pending;
+        }
+
+        @Override
+        public boolean noMoreSplits() {
+            return roundsUntilDone >= 0 && rounds >= roundsUntilDone;
+        }
+
+        @Override
+        public String getSourceType() {
+            return "fake";
+        }
+
+        @Override
+        public Offset getNextOffset(StreamingJobProperties jobProps, 
Map<String, String> properties) {
+            return null;
+        }
+
+        @Override
+        public String getShowCurrentOffset() {
+            return "";
+        }
+
+        @Override
+        public String getShowMaxOffset() {
+            return "";
+        }
+
+        @Override
+        public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset next, long id) {
+            return originCommand;
+        }
+
+        @Override
+        public void updateOffset(Offset offset) {
+        }
+
+        @Override
+        public void fetchRemoteMeta(Map<String, String> properties) {
+        }
+
+        @Override
+        public boolean hasMoreDataToConsume() {
+            return true;
+        }
+
+        @Override
+        public Offset deserializeOffset(String offset) {
+            return null;
+        }
+
+        @Override
+        public Offset deserializeOffsetProperty(String offset) {
+            return null;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to