Copilot commented on code in PR #61242:
URL: https://github.com/apache/doris/pull/61242#discussion_r2922095177
##########
regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy:
##########
@@ -31,6 +31,19 @@ suite("test_group_commit_stream_load") {
}
}
+ def getTableRowCount = { tableName1, expectedRowCount ->
+ def retry = 0
+ while (retry < 30) {
+ sleep(2000)
+ def rowCount = sql "select count(*) from ${tableName1}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ retry++
+ }
+ }
Review Comment:
`getTableRowCount` doesn't assert that the expected row count was reached;
if the condition is never met, the loop exits silently and the test can still
pass. Consider returning the final row count and asserting it meets
`expectedRowCount` (and/or reusing the existing `getRowCount` helper to avoid
duplication).
##########
be/src/service/http/action/stream_load.cpp:
##########
@@ -933,8 +922,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest*
req,
if (!config::wait_internal_group_commit_finish && !ctx->label.empty())
{
return Status::InvalidArgument("label and group_commit can't be
set at the same time");
}
Review Comment:
In `_can_group_commit`, the `label and group_commit can't be set at the same
time` check runs even when the client did not set the `group_commit` header
(i.e., `group_commit_header` is empty). This can reject normal
(non-group-commit) stream loads that specify a label on tables whose
`group_commit_mode` is off, which is a backward-incompatible behavior change.
Consider skipping the label conflict check when `group_commit_header` is empty,
and instead enforce/clear label only after the table property lookup confirms
group commit will be used (e.g., in `begin_txn` when `table_group_commit_mode`
is returned).
##########
regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy:
##########
@@ -342,4 +355,164 @@ suite("test_group_commit_stream_load") {
}
qt_read_json_by_line "select
k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName}
order by k;"
+ // Test: stream load using table property group_commit_mode (async_mode)
+ // When HTTP header 'group_commit' is not set, should use table's
group_commit_mode property
+ def tableNameAsync = "test_group_commit_stream_load_table_property_async"
+ try {
+ sql """ drop table if exists ${tableNameAsync}; """
+
+ sql """
+ CREATE TABLE `${tableNameAsync}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "group_commit_interval_ms" = "200",
+ "group_commit_mode" = "async_mode"
+ );
+ """
+
+ // Verify SHOW CREATE TABLE contains group_commit_mode
+ def createStmt1 = sql """ SHOW CREATE TABLE ${tableNameAsync} """
+ logger.info("SHOW CREATE TABLE for async: " + createStmt1)
+ assertTrue(createStmt1.toString().contains('async_mode'), "Table
should have async_mode")
+
+ // Stream load WITHOUT setting group_commit header - should use table
property
+ streamLoad {
+ table "${tableNameAsync}"
+ set 'column_separator', ','
+ // NOT setting 'group_commit' header - should use table property
+ set 'columns', 'id, name'
+ file "test_stream_load1.csv"
+ unset 'label'
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
+ }
+ // Check data is loaded
+ getTableRowCount(tableNameAsync, 2)
+
+ streamLoad {
+ table "${tableNameAsync}"
+ set 'column_separator', ','
+ // NOT setting 'group_commit' header - should use table property,
but set partitions
+ set 'partitions', "${tableNameAsync}"
Review Comment:
This table is created without explicit partitions, but the stream load sets
the `partitions` header to the table name. The `partitions` header expects
existing partition names; using the table name here will likely fail the load
or make the test meaningless. Either create the table with partitions and use a
real partition name, or remove this `partitions` header if the intent is to
test table-property-driven group commit.
```suggestion
// NOT setting 'group_commit' header - should use table property
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -3942,6 +3942,12 @@ private static void addOlapTablePropertyInfo(OlapTable
olapTable, StringBuilder
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES).append("\"
= \"");
sb.append(olapTable.getGroupCommitDataBytes()).append("\"");
+ // group commit mode (only show when not off_mode)
+ if
(!olapTable.getGroupCommitMode().equals(PropertyAnalyzer.GROUP_COMMIT_MODE_OFF))
{
Review Comment:
This `off_mode` check is case-sensitive (`equals`). If `group_commit_mode`
is stored as a different case (e.g. "OFF_MODE"), SHOW CREATE TABLE will
incorrectly print the property even though it's logically off. Use
`equalsIgnoreCase` here, or ensure `group_commit_mode` is normalized to a
canonical value when parsed/stored.
```suggestion
if
(!PropertyAnalyzer.GROUP_COMMIT_MODE_OFF.equalsIgnoreCase(olapTable.getGroupCommitMode()))
{
```
##########
fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java:
##########
@@ -1904,6 +1909,25 @@ public static int
analyzeGroupCommitDataBytes(Map<String, String> properties, bo
return groupCommitDataBytes;
}
+ public static String analyzeGroupCommitMode(Map<String, String>
properties, boolean removeProperty)
+ throws AnalysisException {
+ String groupCommitMode = GROUP_COMMIT_MODE_OFF;
+ if (properties != null &&
properties.containsKey(PROPERTIES_GROUP_COMMIT_MODE)) {
+ groupCommitMode = properties.get(PROPERTIES_GROUP_COMMIT_MODE);
+ if (!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_OFF)
+ &&
!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_ASYNC)
+ &&
!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_SYNC)) {
+ throw new AnalysisException("Invalid group_commit_mode: " +
groupCommitMode
+ + ". Valid values: " + GROUP_COMMIT_MODE_OFF + ", " +
GROUP_COMMIT_MODE_ASYNC
+ + ", " + GROUP_COMMIT_MODE_SYNC);
+ }
+ if (removeProperty) {
+ properties.remove(PROPERTIES_GROUP_COMMIT_MODE);
+ }
+ }
+ return groupCommitMode;
+ }
Review Comment:
`analyzeGroupCommitMode` returns the user-provided string as-is after
validation. Because other code paths sometimes do case-sensitive
comparisons/serialization (e.g. SHOW CREATE TABLE filtering, BE-side string
compares), this can cause inconsistent behavior when users set values like
"OFF_MODE". Consider normalizing the stored/returned value to a canonical form
(e.g. lower-case constants) before returning (and before persisting into table
properties).
##########
be/src/load/stream_load/stream_load_executor.cpp:
##########
@@ -217,6 +220,14 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext*
ctx) {
}
return status;
}
+ if (ctx->group_commit_mode.empty()) {
+ auto table_group_commit_mode = result.table_group_commit_mode;
+ if (!table_group_commit_mode.empty() && table_group_commit_mode !=
"off_mode") {
+ ctx->group_commit = true;
+ ctx->group_commit_mode = table_group_commit_mode;
+ return Status::OK();
Review Comment:
`table_group_commit_mode` is compared to the literal "off_mode" using a
case-sensitive comparison. Since FE-side parsing/serialization allows
case-insensitive values, this can incorrectly treat values like "OFF_MODE" as
enabled group commit. Use a case-insensitive comparison (or normalize to a
canonical value when storing/parsing) before deciding to enable group commit.
##########
be/src/service/http/action/stream_load.cpp:
##########
@@ -945,6 +933,43 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest*
req,
return Status::Error<EXCEEDED_LIMIT>(ss.str());
}
}
+ can_group_commit = true;
+ }
Review Comment:
The async WAL space check (`load_size_smaller_than_wal_limit`) only runs
when the HTTP `group_commit` header explicitly equals `async_mode`. If
`group_commit_mode` is resolved from the table property (header is empty) and
ends up being async, this check is skipped, which can lead to unexpected
failures later or WAL pressure without an early, clear error. Consider
performing the WAL limit check after the table-property lookup determines async
mode (e.g., in `begin_txn` when it sets `ctx->group_commit_mode`, using the
already-parsed content length).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]