This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 65661ffff67 [opt](group-commit) Skip createLocation in group commit
stream load sink (#63561)
65661ffff67 is described below
commit 65661ffff6757d9e5254cfff8ebfb049542a0fc2
Author: Xin Liao <[email protected]>
AuthorDate: Tue May 26 18:11:33 2026 +0800
[opt](group-commit) Skip createLocation in group commit stream load sink
(#63561)
## Summary
The BE-side `GroupCommitBlockSinkOperatorX::init` does **not** consume
`TOlapTableSink.location` or `slave_location` (it only reads `tuple_id`
/ `schema` / `db_id` / `table_id` / `partition` / `group_commit_mode` /
`load_id` / `max_filter_ratio`). However, FE still runs
`createLocation`, which iterates `O(partitions * indexes * tablets *
replicas)` and, for every replica, takes the `CloudSystemInfoService` RW
read lock via `CloudReplica.getCurrentClusterId`.
Under high-concurrency group commit stream load on wide-partition tables
(3000+ partitions in a real production incident), CAS contention on the
RW lock's `state` cache line saturated all FE CPUs, and the cluster
could not recover even after scaling out (more cores = more CAS
contenders = worse contention).
## Change
- Introduce a `protected initLocationParams(TOlapTableSink)` hook on
`OlapTableSink`. Default behavior delegates to `createLocation`, so
non-group-commit sinks are unaffected.
- Route both `init(...)` overloads in `OlapTableSink` through the hook.
- `GroupCommitBlockSink` overrides the hook to return empty placeholder
`TOlapTableLocationParam` objects. `TOlapTableSink.location` is a
required thrift field, so we still set non-null placeholders, but no
tablet/replica enumeration happens.
Effect on the group-commit path:
- Per-request FE CPU: `O(partitions * indexes * tablets * replicas)` →
`O(1)`
- `CloudSystemInfoService` RW lock acquisitions: hundreds of concurrent
CAS spinners → 0
---
.../apache/doris/planner/GroupCommitBlockSink.java | 21 +++++++
.../org/apache/doris/planner/OlapTableSink.java | 13 +++-
.../doris/planner/GroupCommitBlockSinkTest.java | 69 ++++++++++++++++++++++
3 files changed, 101 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
index a45b65ca19a..0b1ef04496f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
@@ -20,12 +20,18 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TGroupCommitMode;
+import org.apache.doris.thrift.TOlapTableLocationParam;
+import org.apache.doris.thrift.TOlapTableSink;
+import org.apache.doris.thrift.TTabletLocation;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -63,6 +69,21 @@ public class GroupCommitBlockSink extends OlapTableSink {
return tDataSink;
}
+ // BE-side GroupCommitBlockSinkOperatorX::init does not consume
location/slave_location
+ // (it only reads
tuple_id/schema/db_id/table_id/partition/group_commit_mode/load_id/
+ // max_filter_ratio). Skip the per-tablet replica enumeration in
createLocation, which
+ // is the dominant FE CPU cost under high-concurrency group-commit stream
load.
+ // We still return placeholder TOlapTableLocationParam objects because
+ // TOlapTableSink.location is a required thrift field.
+ @Override
+ protected List<TOlapTableLocationParam> initLocationParams(TOlapTableSink
tSink) throws UserException {
+ TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
+ TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
+ locationParam.setTablets(Lists.<TTabletLocation>newArrayList());
+ slaveLocationParam.setTablets(Lists.<TTabletLocation>newArrayList());
+ return Arrays.asList(locationParam, slaveLocationParam);
+ }
+
public static TGroupCommitMode parseGroupCommit(String groupCommit) {
if (groupCommit == null) {
return null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 4d95fababc1..e3a9954b627 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -236,7 +236,7 @@ public class OlapTableSink extends DataSink {
partition.setTabletVersionGapBackends(gapBackends);
}
}
- tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);
+ tOlapTableLocationParams = initLocationParams(tSink);
tSink.setTableId(dstTable.getId());
tSink.setTupleId(tupleDescriptor.getId().asInt());
@@ -294,7 +294,7 @@ public class OlapTableSink extends DataSink {
partition.setTabletVersionGapBackends(gapBackends);
}
}
- tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);
+ tOlapTableLocationParams = initLocationParams(tSink);
tSink.setTableId(dstTable.getId());
tSink.setTupleId(tupleDescriptor.getId().asInt());
@@ -737,6 +737,15 @@ public class OlapTableSink extends DataSink {
}
}
+ // Hook for subclasses to control how the tablet location params are
populated.
+ // Default behavior computes the full tablet -> backend mapping via
createLocation,
+ // which under high-concurrency stream load on large tables is the
dominant FE CPU
+ // cost. Subclasses whose BE counterpart does not consume
TOlapTableSink.location
+ // (e.g. GroupCommitBlockSink) can override this hook to skip that work.
+ protected List<TOlapTableLocationParam> initLocationParams(TOlapTableSink
tSink) throws UserException {
+ return createLocation(tSink.getDbId(), dstTable);
+ }
+
public List<TOlapTableLocationParam> createDummyLocation(OlapTable table)
throws UserException {
TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java
new file mode 100644
index 00000000000..281afd9b59e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.thrift.TGroupCommitMode;
+import org.apache.doris.thrift.TOlapTableLocationParam;
+import org.apache.doris.thrift.TOlapTableSink;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+public class GroupCommitBlockSinkTest {
+
+ @Test
+ public void testInitLocationParamsSkipsCreateLocation() throws
UserException {
+ OlapTable dstTable = Mockito.mock(OlapTable.class);
+ TupleDescriptor tuple = Mockito.mock(TupleDescriptor.class);
+ GroupCommitBlockSink sink = new GroupCommitBlockSink(
+ dstTable, tuple, Lists.newArrayList(1L), false, "async_mode",
0.0);
+
+ List<TOlapTableLocationParam> params = sink.initLocationParams(new
TOlapTableSink());
+
+ Assert.assertEquals(2, params.size());
+ Assert.assertNotNull(params.get(0).getTablets());
+ Assert.assertTrue("master location should be empty placeholder",
+ params.get(0).getTablets().isEmpty());
+ Assert.assertNotNull(params.get(1).getTablets());
+ Assert.assertTrue("slave location should be empty placeholder",
+ params.get(1).getTablets().isEmpty());
+ Mockito.verifyNoInteractions(dstTable);
+ Mockito.verifyNoInteractions(tuple);
+ }
+
+ @Test
+ public void testParseGroupCommit() {
+ Assert.assertEquals(TGroupCommitMode.ASYNC_MODE,
+ GroupCommitBlockSink.parseGroupCommit("async_mode"));
+ Assert.assertEquals(TGroupCommitMode.ASYNC_MODE,
+ GroupCommitBlockSink.parseGroupCommit("ASYNC_MODE"));
+ Assert.assertEquals(TGroupCommitMode.SYNC_MODE,
+ GroupCommitBlockSink.parseGroupCommit("sync_mode"));
+ Assert.assertEquals(TGroupCommitMode.OFF_MODE,
+ GroupCommitBlockSink.parseGroupCommit("off_mode"));
+ Assert.assertNull(GroupCommitBlockSink.parseGroupCommit(null));
+ Assert.assertNull(GroupCommitBlockSink.parseGroupCommit("invalid"));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]