Copilot commented on code in PR #61242:
URL: https://github.com/apache/doris/pull/61242#discussion_r2922095177


##########
regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy:
##########
@@ -31,6 +31,19 @@ suite("test_group_commit_stream_load") {
         }
     }
 
+    def getTableRowCount = { tableName1, expectedRowCount ->
+        def retry = 0
+        while (retry < 30) {
+            sleep(2000)
+            def rowCount = sql "select count(*) from ${tableName1}"
+            logger.info("rowCount: " + rowCount + ", retry: " + retry)
+            if (rowCount[0][0] >= expectedRowCount) {
+                break
+            }
+            retry++
+        }
+    }

Review Comment:
   `getTableRowCount` doesn't assert that the expected row count was reached; 
if the condition is never met, the loop exits silently and the test can still 
pass. Consider returning the final row count and asserting it meets 
`expectedRowCount` (and/or reusing the existing `getRowCount` helper to avoid 
duplication).



##########
be/src/service/http/action/stream_load.cpp:
##########
@@ -933,8 +922,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* 
req,
         if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) 
{
             return Status::InvalidArgument("label and group_commit can't be 
set at the same time");
         }

Review Comment:
   In `_can_group_commit`, the `label and group_commit can't be set at the same 
time` check runs even when the client did not set the `group_commit` header 
(i.e., `group_commit_header` is empty). This can reject normal 
(non-group-commit) stream loads that specify a label on tables whose 
`group_commit_mode` is off, which is a backward-incompatible behavior change. 
Consider skipping the label conflict check when `group_commit_header` is empty, 
and instead enforce/clear label only after the table property lookup confirms 
group commit will be used (e.g., in `begin_txn` when `table_group_commit_mode` 
is returned).



##########
regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy:
##########
@@ -342,4 +355,164 @@ suite("test_group_commit_stream_load") {
     }
     qt_read_json_by_line "select 
k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} 
order by k;"
 
+    // Test: stream load using table property group_commit_mode (async_mode)
+    // When HTTP header 'group_commit' is not set, should use table's 
group_commit_mode property
+    def tableNameAsync = "test_group_commit_stream_load_table_property_async"
+    try {
+        sql """ drop table if exists ${tableNameAsync}; """
+        
+        sql """
+        CREATE TABLE `${tableNameAsync}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(100) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`, `name`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "group_commit_interval_ms" = "200",
+            "group_commit_mode" = "async_mode"
+        );
+        """
+
+        // Verify SHOW CREATE TABLE contains group_commit_mode
+        def createStmt1 = sql """ SHOW CREATE TABLE ${tableNameAsync} """
+        logger.info("SHOW CREATE TABLE for async: " + createStmt1)
+        assertTrue(createStmt1.toString().contains('async_mode'), "Table 
should have async_mode")
+        
+        // Stream load WITHOUT setting group_commit header - should use table 
property
+        streamLoad {
+            table "${tableNameAsync}"
+            set 'column_separator', ','
+            // NOT setting 'group_commit' header - should use table property
+            set 'columns', 'id, name'
+            file "test_stream_load1.csv"
+            unset 'label'
+            time 10000
+            
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
+        }
+        // Check data is loaded
+        getTableRowCount(tableNameAsync, 2)
+
+        streamLoad {
+            table "${tableNameAsync}"
+            set 'column_separator', ','
+            // NOT setting 'group_commit' header - should use table property, 
but set partitions
+            set 'partitions', "${tableNameAsync}"

Review Comment:
   This table is created without explicit partitions, but the stream load sets 
the `partitions` header to the table name. The `partitions` header expects 
existing partition names; using the table name here will likely fail the load 
or make the test meaningless. Either create the table with partitions and use a 
real partition name, or remove this `partitions` header if the intent is to 
test table-property-driven group commit.
   ```suggestion
               // NOT setting 'group_commit' header - should use table property
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -3942,6 +3942,12 @@ private static void addOlapTablePropertyInfo(OlapTable 
olapTable, StringBuilder
         
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES).append("\"
 = \"");
         sb.append(olapTable.getGroupCommitDataBytes()).append("\"");
 
+        // group commit mode (only show when not off_mode)
+        if 
(!olapTable.getGroupCommitMode().equals(PropertyAnalyzer.GROUP_COMMIT_MODE_OFF))
 {

Review Comment:
   This `off_mode` check is case-sensitive (`equals`). If `group_commit_mode` 
is stored as a different case (e.g. "OFF_MODE"), SHOW CREATE TABLE will 
incorrectly print the property even though it's logically off. Use 
`equalsIgnoreCase` here, or ensure `group_commit_mode` is normalized to a 
canonical value when parsed/stored.
   ```suggestion
           if 
(!PropertyAnalyzer.GROUP_COMMIT_MODE_OFF.equalsIgnoreCase(olapTable.getGroupCommitMode()))
 {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java:
##########
@@ -1904,6 +1909,25 @@ public static int 
analyzeGroupCommitDataBytes(Map<String, String> properties, bo
         return groupCommitDataBytes;
     }
 
+    public static String analyzeGroupCommitMode(Map<String, String> 
properties, boolean removeProperty)
+            throws AnalysisException {
+        String groupCommitMode = GROUP_COMMIT_MODE_OFF;
+        if (properties != null && 
properties.containsKey(PROPERTIES_GROUP_COMMIT_MODE)) {
+            groupCommitMode = properties.get(PROPERTIES_GROUP_COMMIT_MODE);
+            if (!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_OFF)
+                    && 
!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_ASYNC)
+                    && 
!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_SYNC)) {
+                throw new AnalysisException("Invalid group_commit_mode: " + 
groupCommitMode
+                        + ". Valid values: " + GROUP_COMMIT_MODE_OFF + ", " + 
GROUP_COMMIT_MODE_ASYNC
+                        + ", " + GROUP_COMMIT_MODE_SYNC);
+            }
+            if (removeProperty) {
+                properties.remove(PROPERTIES_GROUP_COMMIT_MODE);
+            }
+        }
+        return groupCommitMode;
+    }

Review Comment:
   `analyzeGroupCommitMode` returns the user-provided string as-is after 
validation. Because other code paths sometimes do case-sensitive 
comparisons/serialization (e.g. SHOW CREATE TABLE filtering, BE-side string 
compares), this can cause inconsistent behavior when users set values like 
"OFF_MODE". Consider normalizing the stored/returned value to a canonical form 
(e.g. lower-case constants) before returning (and before persisting into table 
properties).



##########
be/src/load/stream_load/stream_load_executor.cpp:
##########
@@ -217,6 +220,14 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
         }
         return status;
     }
+    if (ctx->group_commit_mode.empty()) {
+        auto table_group_commit_mode = result.table_group_commit_mode;
+        if (!table_group_commit_mode.empty() && table_group_commit_mode != 
"off_mode") {
+            ctx->group_commit = true;
+            ctx->group_commit_mode = table_group_commit_mode;
+            return Status::OK();

Review Comment:
   `table_group_commit_mode` is compared to the literal "off_mode" using a 
case-sensitive comparison. Since FE-side parsing/serialization allows 
case-insensitive values, this can incorrectly treat values like "OFF_MODE" as 
enabled group commit. Use a case-insensitive comparison (or normalize to a 
canonical value when storing/parsing) before deciding to enable group commit.



##########
be/src/service/http/action/stream_load.cpp:
##########
@@ -945,6 +933,43 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* 
req,
                 return Status::Error<EXCEEDED_LIMIT>(ss.str());
             }
         }
+        can_group_commit = true;
+    }

Review Comment:
   The async WAL space check (`load_size_smaller_than_wal_limit`) only runs 
when the HTTP `group_commit` header explicitly equals `async_mode`. If 
`group_commit_mode` is resolved from the table property (header is empty) and 
ends up being async, this check is skipped, which can lead to unexpected 
failures later or WAL pressure without an early, clear error. Consider 
performing the WAL limit check after the table-property lookup determines async 
mode (e.g., in `begin_txn` when it sets `ctx->group_commit_mode`, using the 
already-parsed content length).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to