This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c6c76e9b6a0 [feat](group commit) add group_commit_mode in table 
property (#61242)
c6c76e9b6a0 is described below

commit c6c76e9b6a0692b1254a922a4033f8689b46400d
Author: meiyi <[email protected]>
AuthorDate: Mon Mar 16 15:09:19 2026 +0800

    [feat](group commit) add group_commit_mode in table property (#61242)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    1. Create table support set `group_commit_mode` table property
    ```
    CREATE TABLE ... PROPERTIES(
        "group_commit_mode" = "async_mode"
    );
    ```
    2. Support alter this property
    ```
    ALTER TABLE ... SET ("group_commit_mode" = "off_mode");
    ```
    3. Show create table shows this property if its value is not `off_mode`
    4. For stream load, if it not set `group_commit` header, use the table
    property as the group commit mode; if it set `group_commit` header, use
    the header value.
    5. doc: https://github.com/apache/doris-website/pull/3465
    
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/load/stream_load/stream_load_context.cpp    |   4 +
 be/src/load/stream_load/stream_load_context.h      |   1 +
 be/src/load/stream_load/stream_load_executor.cpp   |  12 ++
 be/src/service/http/action/stream_load.cpp         | 100 ++++++++----
 be/src/service/http/action/stream_load.h           |   3 +
 .../apache/doris/alter/SchemaChangeHandler.java    |   2 +
 .../main/java/org/apache/doris/catalog/Env.java    |   6 +
 .../java/org/apache/doris/catalog/OlapTable.java   |   8 +
 .../org/apache/doris/catalog/TableProperty.java    |   9 ++
 .../cloud/alter/CloudSchemaChangeHandler.java      |  21 +++
 .../apache/doris/common/util/PropertyAnalyzer.java |  24 +++
 .../apache/doris/datasource/InternalCatalog.java   |   7 +
 .../commands/info/ModifyTablePropertiesOp.java     |   4 +
 .../apache/doris/service/FrontendServiceImpl.java  |  17 +-
 gensrc/proto/cloud.proto                           |   1 +
 gensrc/thrift/FrontendService.thrift               |   5 +
 .../data/query_p0/system/test_table_properties.out |   8 +-
 .../insert_p0/insert_group_commit_into.groovy      |  63 ++++++++
 .../test_group_commit_stream_load.groovy           | 176 +++++++++++++++++++++
 19 files changed, 436 insertions(+), 35 deletions(-)

diff --git a/be/src/load/stream_load/stream_load_context.cpp 
b/be/src/load/stream_load/stream_load_context.cpp
index 12e5c39f0d7..4fee22ca057 100644
--- a/be/src/load/stream_load/stream_load_context.cpp
+++ b/be/src/load/stream_load/stream_load_context.cpp
@@ -58,6 +58,10 @@ std::string StreamLoadContext::to_json() const {
     } else {
         writer.Key("GroupCommit");
         writer.Bool(true);
+        writer.Key("GroupCommitMode");
+        writer.String(group_commit_mode.c_str());
+        writer.Key("LoadId");
+        writer.String(id.to_string().c_str());
     }
 
     // status
diff --git a/be/src/load/stream_load/stream_load_context.h 
b/be/src/load/stream_load/stream_load_context.h
index 85b4c4d1468..4e257c86e7d 100644
--- a/be/src/load/stream_load/stream_load_context.h
+++ b/be/src/load/stream_load/stream_load_context.h
@@ -196,6 +196,7 @@ public:
     TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN;
     TFileCompressType::type compress_type = TFileCompressType::UNKNOWN;
     bool group_commit = false;
+    std::string group_commit_mode = "";
 
     std::shared_ptr<MessageBodySink> body_sink;
     std::shared_ptr<io::StreamLoadPipe> pipe;
diff --git a/be/src/load/stream_load/stream_load_executor.cpp 
b/be/src/load/stream_load/stream_load_executor.cpp
index b24333941c2..08fc3bb34b1 100644
--- a/be/src/load/stream_load/stream_load_executor.cpp
+++ b/be/src/load/stream_load/stream_load_executor.cpp
@@ -189,6 +189,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
     }
     request.__set_request_id(ctx->id.to_thrift());
     request.__set_backend_id(_exec_env->cluster_info()->backend_id);
+    if (ctx->group_commit_mode.empty()) {
+        request.__set_use_table_group_commit_mode(true);
+    }
 
     TLoadTxnBeginResult result;
     Status status;
@@ -217,6 +220,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
         }
         return status;
     }
+    if (ctx->group_commit_mode.empty() && 
result.__isset.table_group_commit_mode) {
+        auto table_group_commit_mode = result.table_group_commit_mode;
+        if (iequal(table_group_commit_mode, "async_mode") ||
+            iequal(table_group_commit_mode, "sync_mode")) {
+            ctx->group_commit = true;
+            ctx->group_commit_mode = table_group_commit_mode;
+            return Status::OK();
+        }
+    }
     ctx->txn_id = result.txnId;
     if (result.__isset.db_id) {
         ctx->db_id = result.db_id;
diff --git a/be/src/service/http/action/stream_load.cpp 
b/be/src/service/http/action/stream_load.cpp
index f483e11258a..8582ced02a7 100644
--- a/be/src/service/http/action/stream_load.cpp
+++ b/be/src/service/http/action/stream_load.cpp
@@ -84,6 +84,9 @@ bvar::LatencyRecorder 
g_stream_load_commit_and_publish_latency_ms("stream_load",
 
 static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
 static const std::string CHUNK = "chunked";
+static const std::string OFF_MODE = "off_mode";
+static const std::string SYNC_MODE = "sync_mode";
+static const std::string ASYNC_MODE = "async_mode";
 
 #ifdef BE_TEST
 TStreamLoadPutResult k_stream_load_put_result;
@@ -254,6 +257,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
 
     LOG(INFO) << "new income streaming load request." << ctx->brief() << ", 
db=" << ctx->db
               << ", tbl=" << ctx->table << ", group_commit=" << 
ctx->group_commit
+              << ", group_commit_mode=" << ctx->group_commit_mode
               << ", HTTP headers=" << req->get_all_headers();
     ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
 
@@ -366,6 +370,9 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
std::shared_ptr<Strea
         int64_t begin_txn_start_time = MonotonicNanos();
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
         ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
+        if (ctx->group_commit) {
+            RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode, 
ctx->body_bytes));
+        }
     }
 
     // process put file
@@ -767,12 +774,7 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         request.__set_stream_per_node(stream_per_node);
     }
     if (ctx->group_commit) {
-        if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
-            
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
-        } else {
-            // used for wait_internal_group_commit_finish
-            request.__set_group_commit_mode("sync_mode");
-        }
+        request.__set_group_commit_mode(ctx->group_commit_mode);
     }
 
     if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) {
@@ -811,7 +813,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
     if (config::is_cloud_mode() && ctx->two_phase_commit && 
ctx->is_mow_table()) {
         return Status::NotSupported("stream load 2pc is unsupported for mow 
table");
     }
-    if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
+    if (iequal(ctx->group_commit_mode, ASYNC_MODE)) {
         // FIXME find a way to avoid chunked stream load write large WALs
         size_t content_length = 0;
         if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
@@ -886,17 +888,24 @@ void 
StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContex
     }
 }
 
-Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
-                                              
std::shared_ptr<StreamLoadContext> ctx) {
-    std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
-    if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") 
&&
-        !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, 
"off_mode")) {
-        return Status::InvalidArgument(
-                "group_commit can only be [async_mode, sync_mode, off_mode]");
-    }
-    if (config::wait_internal_group_commit_finish) {
-        group_commit_mode = "sync_mode";
+Status StreamLoadAction::_check_wal_space(const std::string& group_commit_mode,
+                                          int64_t content_length) {
+    if (iequal(group_commit_mode, ASYNC_MODE) &&
+        !load_size_smaller_than_wal_limit(content_length)) {
+        std::stringstream ss;
+        ss << "There is no space for group commit stream load async WAL. This 
stream load "
+              "size is "
+           << content_length
+           << ". WAL dir info: " << 
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
+        LOG(WARNING) << ss.str();
+        return Status::Error<EXCEEDED_LIMIT>(ss.str());
     }
+    return Status::OK();
+}
+
+Status StreamLoadAction::_can_group_commit(HttpRequest* req, 
std::shared_ptr<StreamLoadContext> ctx,
+                                           std::string& group_commit_header,
+                                           bool& can_group_commit) {
     int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
                                      ? 0
                                      : 
std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
@@ -907,13 +916,11 @@ Status 
StreamLoadAction::_handle_group_commit(HttpRequest* req,
         LOG(WARNING) << ss.str();
         return Status::InvalidArgument(ss.str());
     }
-    // allow chunked stream load in flink
     auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
                     req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != 
std::string::npos;
-    if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") ||
-        (content_length == 0 && !is_chunk)) {
+    if (content_length == 0 && !is_chunk) {
         // off_mode and empty
-        ctx->group_commit = false;
+        can_group_commit = false;
         return Status::OK();
     }
     if (is_chunk) {
@@ -930,20 +937,47 @@ Status 
StreamLoadAction::_handle_group_commit(HttpRequest* req,
              iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), 
"UPDATE_FLEXIBLE_COLUMNS"));
     if (!partial_columns && !partitions && !temp_partitions && 
!ctx->two_phase_commit &&
         !update_mode) {
-        if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) 
{
+        if (!config::wait_internal_group_commit_finish && 
!group_commit_header.empty() &&
+            !ctx->label.empty()) {
             return Status::InvalidArgument("label and group_commit can't be 
set at the same time");
         }
-        ctx->group_commit = true;
-        if (iequal(group_commit_mode, "async_mode")) {
-            if (!load_size_smaller_than_wal_limit(content_length)) {
-                std::stringstream ss;
-                ss << "There is no space for group commit stream load async 
WAL. This stream load "
-                      "size is "
-                   << content_length << ". WAL dir info: "
-                   << 
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
-                LOG(WARNING) << ss.str();
-                return Status::Error<EXCEEDED_LIMIT>(ss.str());
-            }
+        RETURN_IF_ERROR(_check_wal_space(group_commit_header, content_length));
+        can_group_commit = true;
+    }
+    return Status::OK();
+}
+
+Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
+                                              
std::shared_ptr<StreamLoadContext> ctx) {
+    std::string group_commit_header = req->header(HTTP_GROUP_COMMIT);
+    if (!group_commit_header.empty() && !iequal(group_commit_header, 
SYNC_MODE) &&
+        !iequal(group_commit_header, ASYNC_MODE) && 
!iequal(group_commit_header, OFF_MODE)) {
+        return Status::InvalidArgument(
+                "group_commit can only be [async_mode, sync_mode, off_mode]");
+    }
+    if (config::wait_internal_group_commit_finish) {
+        group_commit_header = SYNC_MODE;
+    }
+
+    // if group_commit_header is off_mode, we will not use group commit
+    if (iequal(group_commit_header, OFF_MODE)) {
+        ctx->group_commit_mode = OFF_MODE;
+        ctx->group_commit = false;
+        return Status::OK();
+    }
+    bool can_group_commit = false;
+    RETURN_IF_ERROR(_can_group_commit(req, ctx, group_commit_header, 
can_group_commit));
+    if (!can_group_commit) {
+        ctx->group_commit_mode = OFF_MODE;
+        ctx->group_commit = false;
+    } else {
+        if (!group_commit_header.empty()) {
+            ctx->group_commit_mode = group_commit_header;
+            ctx->group_commit = true;
+        } else {
+            // use table property to decide group commit or not
+            ctx->group_commit_mode = "";
+            ctx->group_commit = false;
         }
     }
     return Status::OK();
diff --git a/be/src/service/http/action/stream_load.h 
b/be/src/service/http/action/stream_load.h
index e36b960f379..29fc92065ed 100644
--- a/be/src/service/http/action/stream_load.h
+++ b/be/src/service/http/action/stream_load.h
@@ -50,7 +50,10 @@ private:
     Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
     Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t 
file_bytes);
     Status _process_put(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
+    Status _can_group_commit(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx,
+                             std::string& group_commit_header, bool& 
can_group_commit);
     void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, 
const std::string& str);
+    Status _check_wal_space(const std::string& group_commit_mode, int64_t 
content_length);
     Status _handle_group_commit(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
     void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
     void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 113f32d7ae8..55ebf57ee78 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2569,6 +2569,7 @@ public class SchemaChangeHandler extends AlterHandler {
                 
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
                 
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
                 
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
+                add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE);
                 add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
                 add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES);
                 add(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE);
@@ -2682,6 +2683,7 @@ public class SchemaChangeHandler extends AlterHandler {
                 && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
                 && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION)
                 && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
+                && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)
                 && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
                 && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)
                 && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 789410a629c..1b49e786176 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4030,6 +4030,12 @@ public class Env {
         
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES).append("\"
 = \"");
         sb.append(olapTable.getGroupCommitDataBytes()).append("\"");
 
+        // group commit mode (only show when not off_mode)
+        if 
(!olapTable.getGroupCommitMode().equalsIgnoreCase(PropertyAnalyzer.GROUP_COMMIT_MODE_OFF))
 {
+            
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE).append("\"
 = \"");
+            sb.append(olapTable.getGroupCommitMode()).append("\"");
+        }
+
         // enable delete on delete predicate
         if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && 
olapTable.getEnableUniqueKeyMergeOnWrite()) {
             
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 5489c0669c0..5199ddbe4fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1698,6 +1698,14 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         return getOrCreatTableProperty().getGroupCommitDataBytes();
     }
 
+    public void setGroupCommitMode(String groupCommitMode) {
+        getOrCreatTableProperty().setGroupCommitMode(groupCommitMode);
+    }
+
+    public String getGroupCommitMode() {
+        return getOrCreatTableProperty().getGroupCommitMode();
+    }
+
     public Boolean hasSequenceCol() {
         return getSequenceCol() != null;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index f8544de47cf..c260fbb99c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -764,6 +764,15 @@ public class TableProperty implements GsonPostProcessable {
             
Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE)));
     }
 
+    public void setGroupCommitMode(String groupCommitMode) {
+        properties.put(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE, 
groupCommitMode);
+    }
+
+    public String getGroupCommitMode() {
+        return 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE,
+                PropertyAnalyzer.GROUP_COMMIT_MODE_OFF);
+    }
+
     public void setRowStoreColumns(List<String> rowStoreColumns) {
         if (rowStoreColumns != null && !rowStoreColumns.isEmpty()) {
             
modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN, "true");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
index caf5f93f772..c2155963273 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
@@ -100,6 +100,7 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
             throws UserException {
         final Set<String> allowedProps = new HashSet<String>() {
             {
+                add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE);
                 add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
                 add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES);
                 add(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS);
@@ -152,6 +153,21 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
             }
             param.ttlSeconds = ttlSeconds;
             param.type = UpdatePartitionMetaParam.TabletMetaType.TTL_SECONDS;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)) {
+            String groupCommitMode = 
PropertyAnalyzer.analyzeGroupCommitMode(properties, false);
+            olapTable.readLock();
+            try {
+                if 
(groupCommitMode.equalsIgnoreCase(olapTable.getGroupCommitMode())) {
+                    LOG.info("groupCommitMode:{} is equal with 
olapTable.groupCommitMode():{}",
+                            groupCommitMode, olapTable.getGroupCommitMode());
+                    return;
+                }
+                partitions.addAll(olapTable.getPartitions());
+            } finally {
+                olapTable.readUnlock();
+            }
+            param.groupCommitMode = groupCommitMode;
+            param.type = 
UpdatePartitionMetaParam.TabletMetaType.GROUP_COMMIT_MODE;
         } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) {
             int groupCommitIntervalMs = 
PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties, false);
             olapTable.readLock();
@@ -384,6 +400,7 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
             INMEMORY,
             PERSISTENT,
             TTL_SECONDS,
+            GROUP_COMMIT_MODE,
             GROUP_COMMIT_INTERVAL_MS,
             GROUP_COMMIT_DATA_BYTES,
             COMPACTION_POLICY,
@@ -401,6 +418,7 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
         boolean isPersistent = false;
         boolean isInMemory = false;
         long ttlSeconds = 0;
+        String groupCommitMode;
         long groupCommitIntervalMs = 0;
         long groupCommitDataBytes = 0;
         String compactionPolicy;
@@ -460,6 +478,9 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
                     case GROUP_COMMIT_DATA_BYTES:
                         
infoBuilder.setGroupCommitDataBytes(param.groupCommitDataBytes);
                         break;
+                    case GROUP_COMMIT_MODE:
+                        infoBuilder.setGroupCommitMode(param.groupCommitMode);
+                        break;
                     case COMPACTION_POLICY:
                         
infoBuilder.setCompactionPolicy(param.compactionPolicy);
                         break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 93dc9756f2a..c6a8f0df8db 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -240,6 +240,11 @@ public class PropertyAnalyzer {
     public static final int PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE
             = Config.group_commit_data_bytes_default_value;
 
+    public static final String PROPERTIES_GROUP_COMMIT_MODE = 
"group_commit_mode";
+    public static final String GROUP_COMMIT_MODE_OFF = "off_mode";
+    public static final String GROUP_COMMIT_MODE_ASYNC = "async_mode";
+    public static final String GROUP_COMMIT_MODE_SYNC = "sync_mode";
+
     public static final String PROPERTIES_ENABLE_MOW_LIGHT_DELETE =
             "enable_mow_light_delete";
     public static final boolean 
PROPERTIES_ENABLE_MOW_LIGHT_DELETE_DEFAULT_VALUE
@@ -1908,6 +1913,25 @@ public class PropertyAnalyzer {
         return groupCommitDataBytes;
     }
 
+    public static String analyzeGroupCommitMode(Map<String, String> 
properties, boolean removeProperty)
+            throws AnalysisException {
+        String groupCommitMode = GROUP_COMMIT_MODE_OFF;
+        if (properties != null && 
properties.containsKey(PROPERTIES_GROUP_COMMIT_MODE)) {
+            groupCommitMode = properties.get(PROPERTIES_GROUP_COMMIT_MODE);
+            if (!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_OFF)
+                    && 
!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_ASYNC)
+                    && 
!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_SYNC)) {
+                throw new AnalysisException("Invalid group_commit_mode: " + 
groupCommitMode
+                        + ". Valid values: " + GROUP_COMMIT_MODE_OFF + ", " + 
GROUP_COMMIT_MODE_ASYNC
+                        + ", " + GROUP_COMMIT_MODE_SYNC);
+            }
+            if (removeProperty) {
+                properties.remove(PROPERTIES_GROUP_COMMIT_MODE);
+            }
+        }
+        return groupCommitMode.toLowerCase();
+    }
+
     /**
      * Check the type property of the catalog props.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 06b3a551dea..d353520ae63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -2963,6 +2963,13 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             throw new DdlException(e.getMessage());
         }
 
+        try {
+            String groupCommitMode = 
PropertyAnalyzer.analyzeGroupCommitMode(properties, true);
+            olapTable.setGroupCommitMode(groupCommitMode);
+        } catch (Exception e) {
+            throw new DdlException(e.getMessage());
+        }
+
         try {
             TEncryptionAlgorithm tdeAlgorithm = 
PropertyAnalyzer.analyzeTDEAlgorithm(properties);
             olapTable.setEncryptionAlgorithm(tdeAlgorithm);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java
index e2189ed0696..8ae93531a0f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java
@@ -334,6 +334,10 @@ public class ModifyTablePropertiesOp extends AlterTableOp {
             PropertyAnalyzer.analyzeGroupCommitDataBytes(properties, false);
             this.needTableStable = false;
             this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)) {
+            PropertyAnalyzer.analyzeGroupCommitMode(properties, false);
+            this.needTableStable = false;
+            this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
         } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)) {
             this.needTableStable = false;
             this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 63d1f19dac5..c537c42c4c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1239,7 +1239,12 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         try {
             TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr);
-            result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
+            if (tmpRes.isSetTableGroupCommitMode()) {
+                // if use table group commit mode, just return the mode info, 
no need to begin txn
+                
result.setTableGroupCommitMode(tmpRes.getTableGroupCommitMode()).setDbId(tmpRes.getDbId());
+            } else {
+                result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
+            }
         } catch (DuplicatedRequestException e) {
             // this is a duplicate request, just return previous txn id
             LOG.warn("duplicate request for stream load. request id: {}, txn: 
{}", e.getDuplicatedRequestId(),
@@ -1296,6 +1301,16 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
 
         OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, 
TableType.OLAP);
+        // check if use table group_commit_mode property
+        if (request.isUseTableGroupCommitMode()) {
+            String tableGroupCommitMode = table.getGroupCommitMode();
+            if (tableGroupCommitMode != null && 
!tableGroupCommitMode.equalsIgnoreCase(
+                    PropertyAnalyzer.GROUP_COMMIT_MODE_OFF)) {
+                TLoadTxnBeginResult result = new TLoadTxnBeginResult();
+                
result.setTableGroupCommitMode(tableGroupCommitMode).setDbId(db.getId());
+                return result;
+            }
+        }
         // begin
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
         Backend backend = 
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 60596a58828..34422d6c1e3 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -664,6 +664,7 @@ message TabletMetaInfoPB { // For update tablet meta
     optional bool disable_auto_compaction = 13;
     optional bool enable_mow_light_delete = 14;
     optional int32 vertical_compaction_num_columns_per_group = 15;
+    optional string group_commit_mode = 16;
 }
 
 message TabletCompactionJobPB {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 7e520751ee1..ccd082e7f52 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -466,6 +466,8 @@ struct TLoadTxnBeginRequest {
     14: optional i64 table_id
     15: optional i64 backend_id
     16: optional TCertBasedAuth cert_based_auth
+    // If set to true: use table group_commit_mode property
+    17: optional bool use_table_group_commit_mode
 }
 
 struct TLoadTxnBeginResult {
@@ -473,6 +475,9 @@ struct TLoadTxnBeginResult {
     2: optional i64 txnId
     3: optional string job_status // if label already used, set status of 
existing job
     4: optional i64 db_id
+    // If use_table_group_commit_mode is true in TLoadTxnBeginRequest, and 
table group_commit_mode property is
+    // async_mode or sync_mode, return table group_commit_mode (begin_txn is 
skipped)
+    5: optional string table_group_commit_mode
 }
 
 struct TBeginTxnRequest {
diff --git a/regression-test/data/query_p0/system/test_table_properties.out 
b/regression-test/data/query_p0/system/test_table_properties.out
index 0f8b1a2acaf..9f6e334cc9a 100644
--- a/regression-test/data/query_p0/system/test_table_properties.out
+++ b/regression-test/data/query_p0/system/test_table_properties.out
@@ -1,6 +1,6 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !select_check_1 --
-111
+114
 
 -- !select_check_2 --
 internal       test_table_properties_db        duplicate_table _auto_bucket    
false
@@ -19,6 +19,7 @@ internal      test_table_properties_db        duplicate_table 
enable_unique_key_merge_on_wri
 internal       test_table_properties_db        duplicate_table 
file_cache_ttl_seconds  0
 internal       test_table_properties_db        duplicate_table 
group_commit_data_bytes 134217728
 internal       test_table_properties_db        duplicate_table 
group_commit_interval_ms        10000
+internal       test_table_properties_db        duplicate_table 
group_commit_mode       off_mode
 internal       test_table_properties_db        duplicate_table in_memory       
false
 internal       test_table_properties_db        duplicate_table 
inverted_index_storage_format   V3
 internal       test_table_properties_db        duplicate_table is_being_synced 
false
@@ -55,6 +56,7 @@ internal      test_table_properties_db        listtable       
enable_unique_key_merge_on_write        fal
 internal       test_table_properties_db        listtable       
file_cache_ttl_seconds  0
 internal       test_table_properties_db        listtable       
group_commit_data_bytes 134217728
 internal       test_table_properties_db        listtable       
group_commit_interval_ms        10000
+internal       test_table_properties_db        listtable       
group_commit_mode       off_mode
 internal       test_table_properties_db        listtable       in_memory       
false
 internal       test_table_properties_db        listtable       
inverted_index_storage_format   V3
 internal       test_table_properties_db        listtable       is_being_synced 
false
@@ -91,6 +93,7 @@ internal      test_table_properties_db        unique_table    
enable_unique_key_merge_on_write
 internal       test_table_properties_db        unique_table    
file_cache_ttl_seconds  0
 internal       test_table_properties_db        unique_table    
group_commit_data_bytes 134217728
 internal       test_table_properties_db        unique_table    
group_commit_interval_ms        10000
+internal       test_table_properties_db        unique_table    
group_commit_mode       off_mode
 internal       test_table_properties_db        unique_table    in_memory       
false
 internal       test_table_properties_db        unique_table    
inverted_index_storage_format   V3
 internal       test_table_properties_db        unique_table    is_being_synced 
false
@@ -129,6 +132,7 @@ internal    test_table_properties_db        duplicate_table 
enable_unique_key_merge_on_wri
 internal       test_table_properties_db        duplicate_table 
file_cache_ttl_seconds  0
 internal       test_table_properties_db        duplicate_table 
group_commit_data_bytes 134217728
 internal       test_table_properties_db        duplicate_table 
group_commit_interval_ms        10000
+internal       test_table_properties_db        duplicate_table 
group_commit_mode       off_mode
 internal       test_table_properties_db        duplicate_table in_memory       
false
 internal       test_table_properties_db        duplicate_table 
inverted_index_storage_format   V3
 internal       test_table_properties_db        duplicate_table is_being_synced 
false
@@ -165,6 +169,7 @@ internal    test_table_properties_db        unique_table    
enable_unique_key_merge_on_write
 internal       test_table_properties_db        unique_table    
file_cache_ttl_seconds  0
 internal       test_table_properties_db        unique_table    
group_commit_data_bytes 134217728
 internal       test_table_properties_db        unique_table    
group_commit_interval_ms        10000
+internal       test_table_properties_db        unique_table    
group_commit_mode       off_mode
 internal       test_table_properties_db        unique_table    in_memory       
false
 internal       test_table_properties_db        unique_table    
inverted_index_storage_format   V3
 internal       test_table_properties_db        unique_table    is_being_synced 
false
@@ -205,6 +210,7 @@ internal    test_table_properties_db        duplicate_table 
enable_unique_key_merge_on_wri
 internal       test_table_properties_db        duplicate_table 
file_cache_ttl_seconds  0
 internal       test_table_properties_db        duplicate_table 
group_commit_data_bytes 134217728
 internal       test_table_properties_db        duplicate_table 
group_commit_interval_ms        10000
+internal       test_table_properties_db        duplicate_table 
group_commit_mode       off_mode
 internal       test_table_properties_db        duplicate_table in_memory       
false
 internal       test_table_properties_db        duplicate_table 
inverted_index_storage_format   V3
 internal       test_table_properties_db        duplicate_table is_being_synced 
false
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index ff8aaeeefc7..9f18e9a813c 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -513,6 +513,69 @@ suite("insert_group_commit_into") {
                 """
             exception """group_commit_data_bytes must be greater than 0"""
         }
+
+        // Test group_commit_mode property
+        sql """ drop table if exists ${table}_mode; """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${table}_mode (
+                k1 INT not null,
+                k2 varchar(50)
+            )
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) 
+            BUCKETS 1 PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "group_commit_interval_ms" = "200",
+                "group_commit_mode" = "async_mode"
+            ); 
+        """
+        
+        // Verify SHOW CREATE TABLE displays group_commit_mode
+        def createStmt = sql """ SHOW CREATE TABLE ${table}_mode """
+        logger.info("SHOW CREATE TABLE result: " + createStmt)
+        assertTrue(createStmt.toString().contains('group_commit_mode'), "SHOW 
CREATE TABLE should contain group_commit_mode")
+        assertTrue(createStmt.toString().contains('async_mode'), "SHOW CREATE 
TABLE should contain async_mode")
+        
+        // Test ALTER TABLE to change group_commit_mode
+        sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = 
"SYNC_MODE"); """
+        
+        def createStmt2 = sql """ SHOW CREATE TABLE ${table}_mode """
+        logger.info("SHOW CREATE TABLE result after alter: " + createStmt2)
+        assertTrue(createStmt2.toString().contains('SYNC_MODE'), "SHOW CREATE 
TABLE should contain sync_mode after alter")
+        
+        // Test ALTER TABLE to change back to off_mode - should NOT show in 
SHOW CREATE TABLE
+        sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = 
"OFF_MODE"); """
+        
+        def createStmt3 = sql """ SHOW CREATE TABLE ${table}_mode """
+        logger.info("SHOW CREATE TABLE result after alter to off_mode: " + 
createStmt3)
+        // off_mode should NOT be shown in SHOW CREATE TABLE
+        assertTrue(!createStmt3.toString().contains('group_commit_mode'), 
+            "off_mode should NOT be shown in SHOW CREATE TABLE")
+        
+        // Test invalid group_commit_mode value
+        test {
+            sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = 
"invalid_mode"); """
+            exception """Invalid group_commit_mode"""
+        }
+        
+        // Test default value (off_mode) - should NOT show group_commit_mode 
in SHOW CREATE TABLE
+        sql """ drop table if exists ${table}_mode_default; """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${table}_mode_default (
+                k1 INT not null
+            )
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) 
+            BUCKETS 1 PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1"
+            ); 
+        """
+        def createStmt4 = sql """ SHOW CREATE TABLE ${table}_mode_default """
+        logger.info("SHOW CREATE TABLE result for default: " + createStmt4)
+        // When default is off_mode, it should NOT be shown in SHOW CREATE 
TABLE
+        assertTrue(!createStmt4.toString().contains('group_commit_mode'), 
+            "Default off_mode should NOT be shown in SHOW CREATE TABLE")
+        
     } finally {
     }
 }
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index a2de2a04b98..342b87f1396 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -31,6 +31,22 @@ suite("test_group_commit_stream_load") {
         }
     }
 
+    def getTableRowCount = { tableName1, expectedRowCount ->
+        def retry = 0
+        def row = 0
+        while (retry < 30) {
+            sleep(2000)
+            def rowCount = sql "select count(*) from ${tableName1}"
+            logger.info("rowCount: " + rowCount + ", retry: " + retry)
+            row = rowCount[0][0]
+            if (row >= expectedRowCount) {
+                break
+            }
+            retry++
+        }
+        assertTrue(row >= expectedRowCount, "Expected at least 
${expectedRowCount} rows, but got ${row}")
+    }
+
     def getAlterTableState = {
         waitForSchemaChangeDone {
             sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${tableName}' 
ORDER BY createtime DESC LIMIT 1 """
@@ -342,4 +358,164 @@ suite("test_group_commit_stream_load") {
     }
     qt_read_json_by_line "select 
k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} 
order by k;"
 
+    // Test: stream load using table property group_commit_mode (async_mode)
+    // When HTTP header 'group_commit' is not set, should use table's 
group_commit_mode property
+    def tableNameAsync = "test_group_commit_stream_load_table_property_async"
+    try {
+        sql """ drop table if exists ${tableNameAsync}; """
+        
+        sql """
+        CREATE TABLE `${tableNameAsync}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(100) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`, `name`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "group_commit_interval_ms" = "200",
+            "group_commit_mode" = "async_mode"
+        );
+        """
+
+        // Verify SHOW CREATE TABLE contains group_commit_mode
+        def createStmt1 = sql """ SHOW CREATE TABLE ${tableNameAsync} """
+        logger.info("SHOW CREATE TABLE for async: " + createStmt1)
+        assertTrue(createStmt1.toString().contains('async_mode'), "Table 
should have async_mode")
+        
+        // Stream load WITHOUT setting group_commit header - should use table 
property
+        streamLoad {
+            table "${tableNameAsync}"
+            set 'column_separator', ','
+            // NOT setting 'group_commit' header - should use table property
+            set 'columns', 'id, name'
+            file "test_stream_load1.csv"
+            unset 'label'
+            time 10000
+            
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
+        }
+        // Check data is loaded
+        getTableRowCount(tableNameAsync, 2)
+
+        streamLoad {
+            table "${tableNameAsync}"
+            set 'column_separator', ','
+            // NOT setting 'group_commit' header - should use table property, 
but set partitions
+            set 'partitions', "${tableNameAsync}"
+            set 'columns', 'id, name'
+            file "test_stream_load1.csv"
+            time 10000
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result (header override): 
${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                def label = json.Label
+                assertTrue(!label.startsWith("group_commit"))
+            }
+        }
+    } finally {
+    }
+
+    // Test: stream load using table property group_commit_mode (sync_mode)
+    def tableNameSync = "test_group_commit_stream_load_table_property_sync"
+    try {
+        sql """ drop table if exists ${tableNameSync}; """
+        
+        sql """
+        CREATE TABLE `${tableNameSync}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(100) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`, `name`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "group_commit_interval_ms" = "200",
+            "group_commit_mode" = "SYNC_MODE"
+        );
+        """
+        
+        // Verify SHOW CREATE TABLE contains group_commit_mode
+        def createStmt2 = sql """ SHOW CREATE TABLE ${tableNameSync} """
+        logger.info("SHOW CREATE TABLE for sync: " + createStmt2)
+        assertTrue(createStmt2.toString().contains('sync_mode'), "Table should 
have sync_mode")
+        
+        // Stream load WITHOUT setting group_commit header - should use table 
property (sync_mode)
+        streamLoad {
+            table "${tableNameSync}"
+            set 'column_separator', ','
+            // NOT setting 'group_commit' header - should use table property
+            set 'columns', 'id, name'
+            file "test_stream_load1.csv"
+            unset 'label'
+            time 10000
+            
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
+        }
+        
+        // Check data is loaded
+        def rowCount2 = sql "select count(*) from ${tableNameSync}"
+        logger.info("Row count for sync table: " + rowCount2)
+        assertTrue(rowCount2[0][0] > 0, "Data should be loaded")
+        
+    } finally {
+    }
+
+    // Test: stream load with header override table property
+    // Table has async_mode, but header sets off_mode
+    def tableNameOverride = "test_group_commit_stream_load_override"
+    try {
+        sql """ drop table if exists ${tableNameOverride}; """
+        
+        sql """
+        CREATE TABLE `${tableNameOverride}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(100) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`, `name`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "group_commit_interval_ms" = "200",
+            "group_commit_mode" = "async_mode"
+        );
+        """
+        
+        // Stream load with header setting group_commit=off_mode - should 
override table property
+        streamLoad {
+            table "${tableNameOverride}"
+            set 'column_separator', ','
+            set 'columns', 'id, name'
+            set 'group_commit', 'off_mode'  // Override table property
+            file "test_stream_load1.csv"
+            set 'label', 'test_override_' + System.currentTimeMillis()
+            time 10000
+            
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result (header override): 
${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                def label = json.Label
+                assertTrue(label.startsWith("test_override_"), "Label should 
start with test_override_")
+                // When off_mode, GroupCommit should be false or label should 
not start with group_commit_
+                // Note: GroupCommit field behavior may vary, but label should 
NOT be group_commit_ when off_mode
+            }
+        }
+    } finally {
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to