Copilot commented on code in PR #63594:
URL: https://github.com/apache/doris/pull/63594#discussion_r3296634988
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -2779,13 +2784,86 @@ private void recordFinishedLoadJobRequestImpl(String
label, long txnId, String d
EtlJobType.INSERT, createTime, failMsg, trackingUrl,
firstErrorMsg, userIdentity, -1);
}
+ private static int nextGroupCommitFollowerIndex(int followerCount) {
+ return Math.floorMod(GROUP_COMMIT_FOLLOWER_INDEX.getAndIncrement(),
followerCount);
+ }
+
+ private TStreamLoadPutResult
forwardGroupCommitStreamLoad(TStreamLoadPutRequest request) {
+ HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+ List<Frontend> followers =
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER).stream()
+ .filter(fe -> fe.isAlive() &&
!(fe.getHost().equals(selfNode.getHost())
+ && fe.getEditLogPort() == selfNode.getPort())).collect(
+ Collectors.toList());
+ if (CollectionUtils.isEmpty(followers)) {
+ return null;
+ }
+
+ // check table enable light_schama_change and group commit does not
block for schema change
+ TStreamLoadPutResult result = new TStreamLoadPutResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(request.getDb());
+ OlapTable table = (OlapTable)
db.getTableOrDdlException(request.getTbl());
+ if (!table.getTableProperty().getUseSchemaLightChange()) {
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(
+ "table light_schema_change is false, can't do stream
load with group commit mode");
+ return result;
+ }
+ if
(Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
+ String msg = "insert table " + table.getId() +
GroupCommitPlanner.SCHEMA_CHANGE;
+ LOG.info(msg);
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(msg);
+ return result;
+ }
+ } catch (DdlException e) {
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(e.getMessage());
+ return result;
Review Comment:
The pre-check block only catches `DdlException`, but other runtime
exceptions in this block (e.g. unexpected metadata issues) will propagate and
prevent the intended “forward if possible, otherwise fallback to local”
behavior. Consider catching a broader exception (at least `Exception`) here and
returning `null` so `streamLoadPut` can safely proceed locally.
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -2779,13 +2784,86 @@ private void recordFinishedLoadJobRequestImpl(String
label, long txnId, String d
EtlJobType.INSERT, createTime, failMsg, trackingUrl,
firstErrorMsg, userIdentity, -1);
}
+ private static int nextGroupCommitFollowerIndex(int followerCount) {
+ return Math.floorMod(GROUP_COMMIT_FOLLOWER_INDEX.getAndIncrement(),
followerCount);
+ }
+
+ private TStreamLoadPutResult
forwardGroupCommitStreamLoad(TStreamLoadPutRequest request) {
+ HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+ List<Frontend> followers =
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER).stream()
+ .filter(fe -> fe.isAlive() &&
!(fe.getHost().equals(selfNode.getHost())
+ && fe.getEditLogPort() == selfNode.getPort())).collect(
+ Collectors.toList());
+ if (CollectionUtils.isEmpty(followers)) {
+ return null;
+ }
+
+ // check table enable light_schama_change and group commit does not
block for schema change
+ TStreamLoadPutResult result = new TStreamLoadPutResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(request.getDb());
+ OlapTable table = (OlapTable)
db.getTableOrDdlException(request.getTbl());
+ if (!table.getTableProperty().getUseSchemaLightChange()) {
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(
+ "table light_schema_change is false, can't do stream
load with group commit mode");
+ return result;
+ }
+ if
(Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
+ String msg = "insert table " + table.getId() +
GroupCommitPlanner.SCHEMA_CHANGE;
Review Comment:
`db.getTableOrDdlException()` returns a generic `Table` and the direct cast
to `OlapTable` can throw `ClassCastException` (e.g., if the request targets a
non-OLAP table). Since this happens before any fallback and is not caught, it
can fail the whole `streamLoadPut` RPC instead of returning a proper
`ANALYSIS_ERROR`/fallback. Consider fetching the table with an OLAP type check
(or `instanceof` guard) and returning an explicit error status when the table
is not OLAP.
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -2779,13 +2784,86 @@ private void recordFinishedLoadJobRequestImpl(String
label, long txnId, String d
EtlJobType.INSERT, createTime, failMsg, trackingUrl,
firstErrorMsg, userIdentity, -1);
}
+ private static int nextGroupCommitFollowerIndex(int followerCount) {
+ return Math.floorMod(GROUP_COMMIT_FOLLOWER_INDEX.getAndIncrement(),
followerCount);
+ }
+
+ private TStreamLoadPutResult
forwardGroupCommitStreamLoad(TStreamLoadPutRequest request) {
+ HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+ List<Frontend> followers =
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER).stream()
+ .filter(fe -> fe.isAlive() &&
!(fe.getHost().equals(selfNode.getHost())
+ && fe.getEditLogPort() == selfNode.getPort())).collect(
+ Collectors.toList());
+ if (CollectionUtils.isEmpty(followers)) {
+ return null;
+ }
+
+ // check table enable light_schama_change and group commit does not
block for schema change
Review Comment:
Spelling in comment: `light_schama_change` should be `light_schema_change`.
##########
regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_multi_follower.groovy:
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite('test_group_commit_stream_load_multi_follower', 'docker') {
+ def databaseName = context.config.getDbNameByFile(context.file)
+ def tableName = "tbl"
+
+ def groupCommitStreamLoad = { fe ->
+ def feIp = fe.getHttpAddress()[0]
+ def fePort = fe.getHttpAddress()[1]
+ def command = """ curl -sS --location-trusted -u root:
+ -H group_commit:async_mode
+ -H column_separator:,
+ -H columns:id,name
+ -T
${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv
+
http://${feIp}:${fePort}/api/${databaseName}/${tableName}/_stream_load """
+ log.info("group commit command: ${command}")
+
+ def process = command.execute()
+ def code = process.waitFor()
+ def err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())))
+ def out = process.getText()
+ logger.info("load through fe {}:{} master={} code={}, out={}, err={}",
+ feIp, fePort, fe.isMaster, code, out, err)
+ assertEquals(code, 0)
Review Comment:
`assertEquals` parameter order appears reversed here. In this test suite the
common pattern is `assertEquals(expected, actual)` (e.g. `assertEquals(2,
json.NumberTotalRows)` below), so using `assertEquals(code, 0)` will produce
confusing failure output. Consider swapping to `assertEquals(0, code)`.
##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -3331,7 +3336,7 @@ public static int metaServiceRpcRetryTimes() {
"Whether to enable group commit streamload BE forward feature in
cloud mode. "
+ "Solves the issue where LB random forwarding breaks
group commit batching "
+ "by implementing BE-level forwarding to ensure
same-table requests reach the same BE node."})
- public static boolean enable_group_commit_streamload_be_forward = false;
+ public static boolean enable_group_commit_streamload_be_forward = true;
Review Comment:
Changing `enable_group_commit_streamload_be_forward` default from `false` to
`true` flips the cloud-mode stream load routing/forwarding behavior by default.
This can be a breaking operational change for existing deployments (different
backend selection and additional forwarding hops). If this is intended,
consider calling it out in config/docs/release notes; otherwise keep the
default `false` and enable it explicitly where needed (tests, specific
environments).
##########
regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_multi_follower.groovy:
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite('test_group_commit_stream_load_multi_follower', 'docker') {
+ def databaseName = context.config.getDbNameByFile(context.file)
+ def tableName = "tbl"
+
+ def groupCommitStreamLoad = { fe ->
+ def feIp = fe.getHttpAddress()[0]
+ def fePort = fe.getHttpAddress()[1]
+ def command = """ curl -sS --location-trusted -u root:
+ -H group_commit:async_mode
+ -H column_separator:,
+ -H columns:id,name
+ -T
${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv
+
http://${feIp}:${fePort}/api/${databaseName}/${tableName}/_stream_load """
+ log.info("group commit command: ${command}")
+
+ def process = command.execute()
+ def code = process.waitFor()
+ def err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())))
+ def out = process.getText()
+ logger.info("load through fe {}:{} master={} code={}, out={}, err={}",
+ feIp, fePort, fe.isMaster, code, out, err)
+ assertEquals(code, 0)
+
+ def json = parseJson(out)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(2, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+
+ def getRowCount = { expectedRowCount ->
+ def retry = 0
+ while (retry < 30) {
+ sleep(1000)
+ def rowCount = sql "select count(*) from
${databaseName}.${tableName}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ retry++
+ }
+ }
+
+ def options = new ClusterOptions()
+ options.feNum = 3
+ options.beNum = 1
+ options.cloudMode = true
+ options.useFollowersMode = true
+ options.beConfigs.add('enable_java_support=false')
+ options.feConfigs.add('enable_forward_group_commit_stream_load=true')
+ options.feConfigs.add('cloud_cluster_check_interval_second=1')
+ options.feConfigs.add('heartbeat_interval_second=1')
+ docker(options) {
+ awaitUntil(60) {
+ def ret = sql_return_maparray """SHOW FRONTENDS"""
+ ret.size() == 3
+ && ret.count { it.Role.contains("FOLLOWER") } == 3
+ && ret.count { it.IsMaster == "true" } == 1
+ && ret.count { it.Alive == "true" } == 3
+ }
+
+ def frontendRows = sql_return_maparray """SHOW FRONTENDS"""
+ logger.info("show frontends result {}", frontendRows)
+ frontendRows.each { row ->
+ assertTrue(row.Role.contains("FOLLOWER"))
+ assertEquals("true", row.Alive)
+ }
+
+ sql """ CREATE DATABASE IF NOT EXISTS ${databaseName} """
+ sql """ DROP TABLE IF EXISTS ${databaseName}.${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${databaseName}.${tableName} (
+ `id` int(11) NOT NULL,
+ `name` varchar(1100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "group_commit_interval_ms" = "200"
+ );
+ """
+
+ def expectedRowCount = 0
+ def frontends = cluster.getAllFrontends(true).sort { it.index }
+ for (int i = 0; i < 2; i++) {
+ frontends.each { fe ->
+ groupCommitStreamLoad(fe)
+ expectedRowCount += 2
+ }
+ }
+ getRowCount(expectedRowCount)
+
+ def rowCount = sql "select count(*) from ${databaseName}.${tableName}"
+ logger.info("rowCount: " + rowCount)
+ assertEquals(expectedRowCount, rowCount[0][0])
Review Comment:
Trailing whitespace at end of line; please remove to keep diffs clean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]