This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c6c76e9b6a0 [feat](group commit) add group_commit_mode in table
property (#61242)
c6c76e9b6a0 is described below
commit c6c76e9b6a0692b1254a922a4033f8689b46400d
Author: meiyi <[email protected]>
AuthorDate: Mon Mar 16 15:09:19 2026 +0800
[feat](group commit) add group_commit_mode in table property (#61242)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
1. Create table support set `group_commit_mode` table property
```
CREATE TABLE ... PROPERTIES(
"group_commit_mode" = "async_mode"
);
```
2. Support alter this property
```
ALTER TABLE ... SET ("group_commit_mode" = "off_mode");
```
3. Show create table shows this property if its value is not `off_mode`
4. For stream load, if it not set `group_commit` header, use the table
property as the group commit mode; if it set `group_commit` header, use
the header value.
5. doc: https://github.com/apache/doris-website/pull/3465
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/load/stream_load/stream_load_context.cpp | 4 +
be/src/load/stream_load/stream_load_context.h | 1 +
be/src/load/stream_load/stream_load_executor.cpp | 12 ++
be/src/service/http/action/stream_load.cpp | 100 ++++++++----
be/src/service/http/action/stream_load.h | 3 +
.../apache/doris/alter/SchemaChangeHandler.java | 2 +
.../main/java/org/apache/doris/catalog/Env.java | 6 +
.../java/org/apache/doris/catalog/OlapTable.java | 8 +
.../org/apache/doris/catalog/TableProperty.java | 9 ++
.../cloud/alter/CloudSchemaChangeHandler.java | 21 +++
.../apache/doris/common/util/PropertyAnalyzer.java | 24 +++
.../apache/doris/datasource/InternalCatalog.java | 7 +
.../commands/info/ModifyTablePropertiesOp.java | 4 +
.../apache/doris/service/FrontendServiceImpl.java | 17 +-
gensrc/proto/cloud.proto | 1 +
gensrc/thrift/FrontendService.thrift | 5 +
.../data/query_p0/system/test_table_properties.out | 8 +-
.../insert_p0/insert_group_commit_into.groovy | 63 ++++++++
.../test_group_commit_stream_load.groovy | 176 +++++++++++++++++++++
19 files changed, 436 insertions(+), 35 deletions(-)
diff --git a/be/src/load/stream_load/stream_load_context.cpp
b/be/src/load/stream_load/stream_load_context.cpp
index 12e5c39f0d7..4fee22ca057 100644
--- a/be/src/load/stream_load/stream_load_context.cpp
+++ b/be/src/load/stream_load/stream_load_context.cpp
@@ -58,6 +58,10 @@ std::string StreamLoadContext::to_json() const {
} else {
writer.Key("GroupCommit");
writer.Bool(true);
+ writer.Key("GroupCommitMode");
+ writer.String(group_commit_mode.c_str());
+ writer.Key("LoadId");
+ writer.String(id.to_string().c_str());
}
// status
diff --git a/be/src/load/stream_load/stream_load_context.h
b/be/src/load/stream_load/stream_load_context.h
index 85b4c4d1468..4e257c86e7d 100644
--- a/be/src/load/stream_load/stream_load_context.h
+++ b/be/src/load/stream_load/stream_load_context.h
@@ -196,6 +196,7 @@ public:
TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN;
TFileCompressType::type compress_type = TFileCompressType::UNKNOWN;
bool group_commit = false;
+ std::string group_commit_mode = "";
std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;
diff --git a/be/src/load/stream_load/stream_load_executor.cpp
b/be/src/load/stream_load/stream_load_executor.cpp
index b24333941c2..08fc3bb34b1 100644
--- a/be/src/load/stream_load/stream_load_executor.cpp
+++ b/be/src/load/stream_load/stream_load_executor.cpp
@@ -189,6 +189,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext*
ctx) {
}
request.__set_request_id(ctx->id.to_thrift());
request.__set_backend_id(_exec_env->cluster_info()->backend_id);
+ if (ctx->group_commit_mode.empty()) {
+ request.__set_use_table_group_commit_mode(true);
+ }
TLoadTxnBeginResult result;
Status status;
@@ -217,6 +220,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext*
ctx) {
}
return status;
}
+ if (ctx->group_commit_mode.empty() &&
result.__isset.table_group_commit_mode) {
+ auto table_group_commit_mode = result.table_group_commit_mode;
+ if (iequal(table_group_commit_mode, "async_mode") ||
+ iequal(table_group_commit_mode, "sync_mode")) {
+ ctx->group_commit = true;
+ ctx->group_commit_mode = table_group_commit_mode;
+ return Status::OK();
+ }
+ }
ctx->txn_id = result.txnId;
if (result.__isset.db_id) {
ctx->db_id = result.db_id;
diff --git a/be/src/service/http/action/stream_load.cpp
b/be/src/service/http/action/stream_load.cpp
index f483e11258a..8582ced02a7 100644
--- a/be/src/service/http/action/stream_load.cpp
+++ b/be/src/service/http/action/stream_load.cpp
@@ -84,6 +84,9 @@ bvar::LatencyRecorder
g_stream_load_commit_and_publish_latency_ms("stream_load",
static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
static const std::string CHUNK = "chunked";
+static const std::string OFF_MODE = "off_mode";
+static const std::string SYNC_MODE = "sync_mode";
+static const std::string ASYNC_MODE = "async_mode";
#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
@@ -254,6 +257,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
LOG(INFO) << "new income streaming load request." << ctx->brief() << ",
db=" << ctx->db
<< ", tbl=" << ctx->table << ", group_commit=" <<
ctx->group_commit
+ << ", group_commit_mode=" << ctx->group_commit_mode
<< ", HTTP headers=" << req->get_all_headers();
ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
@@ -366,6 +370,9 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req,
std::shared_ptr<Strea
int64_t begin_txn_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
+ if (ctx->group_commit) {
+ RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode,
ctx->body_bytes));
+ }
}
// process put file
@@ -767,12 +774,7 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_stream_per_node(stream_per_node);
}
if (ctx->group_commit) {
- if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
-
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
- } else {
- // used for wait_internal_group_commit_finish
- request.__set_group_commit_mode("sync_mode");
- }
+ request.__set_group_commit_mode(ctx->group_commit_mode);
}
if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) {
@@ -811,7 +813,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (config::is_cloud_mode() && ctx->two_phase_commit &&
ctx->is_mow_table()) {
return Status::NotSupported("stream load 2pc is unsupported for mow
table");
}
- if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
+ if (iequal(ctx->group_commit_mode, ASYNC_MODE)) {
// FIXME find a way to avoid chunked stream load write large WALs
size_t content_length = 0;
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
@@ -886,17 +888,24 @@ void
StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContex
}
}
-Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
-
std::shared_ptr<StreamLoadContext> ctx) {
- std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
- if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode")
&&
- !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode,
"off_mode")) {
- return Status::InvalidArgument(
- "group_commit can only be [async_mode, sync_mode, off_mode]");
- }
- if (config::wait_internal_group_commit_finish) {
- group_commit_mode = "sync_mode";
+Status StreamLoadAction::_check_wal_space(const std::string& group_commit_mode,
+ int64_t content_length) {
+ if (iequal(group_commit_mode, ASYNC_MODE) &&
+ !load_size_smaller_than_wal_limit(content_length)) {
+ std::stringstream ss;
+ ss << "There is no space for group commit stream load async WAL. This
stream load "
+ "size is "
+ << content_length
+ << ". WAL dir info: " <<
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
+ LOG(WARNING) << ss.str();
+ return Status::Error<EXCEEDED_LIMIT>(ss.str());
}
+ return Status::OK();
+}
+
+Status StreamLoadAction::_can_group_commit(HttpRequest* req,
std::shared_ptr<StreamLoadContext> ctx,
+ std::string& group_commit_header,
+ bool& can_group_commit) {
int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
:
std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
@@ -907,13 +916,11 @@ Status
StreamLoadAction::_handle_group_commit(HttpRequest* req,
LOG(WARNING) << ss.str();
return Status::InvalidArgument(ss.str());
}
- // allow chunked stream load in flink
auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) !=
std::string::npos;
- if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") ||
- (content_length == 0 && !is_chunk)) {
+ if (content_length == 0 && !is_chunk) {
// off_mode and empty
- ctx->group_commit = false;
+ can_group_commit = false;
return Status::OK();
}
if (is_chunk) {
@@ -930,20 +937,47 @@ Status
StreamLoadAction::_handle_group_commit(HttpRequest* req,
iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE),
"UPDATE_FLEXIBLE_COLUMNS"));
if (!partial_columns && !partitions && !temp_partitions &&
!ctx->two_phase_commit &&
!update_mode) {
- if (!config::wait_internal_group_commit_finish && !ctx->label.empty())
{
+ if (!config::wait_internal_group_commit_finish &&
!group_commit_header.empty() &&
+ !ctx->label.empty()) {
return Status::InvalidArgument("label and group_commit can't be
set at the same time");
}
- ctx->group_commit = true;
- if (iequal(group_commit_mode, "async_mode")) {
- if (!load_size_smaller_than_wal_limit(content_length)) {
- std::stringstream ss;
- ss << "There is no space for group commit stream load async
WAL. This stream load "
- "size is "
- << content_length << ". WAL dir info: "
- <<
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
- LOG(WARNING) << ss.str();
- return Status::Error<EXCEEDED_LIMIT>(ss.str());
- }
+ RETURN_IF_ERROR(_check_wal_space(group_commit_header, content_length));
+ can_group_commit = true;
+ }
+ return Status::OK();
+}
+
+Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
+
std::shared_ptr<StreamLoadContext> ctx) {
+ std::string group_commit_header = req->header(HTTP_GROUP_COMMIT);
+ if (!group_commit_header.empty() && !iequal(group_commit_header,
SYNC_MODE) &&
+ !iequal(group_commit_header, ASYNC_MODE) &&
!iequal(group_commit_header, OFF_MODE)) {
+ return Status::InvalidArgument(
+ "group_commit can only be [async_mode, sync_mode, off_mode]");
+ }
+ if (config::wait_internal_group_commit_finish) {
+ group_commit_header = SYNC_MODE;
+ }
+
+ // if group_commit_header is off_mode, we will not use group commit
+ if (iequal(group_commit_header, OFF_MODE)) {
+ ctx->group_commit_mode = OFF_MODE;
+ ctx->group_commit = false;
+ return Status::OK();
+ }
+ bool can_group_commit = false;
+ RETURN_IF_ERROR(_can_group_commit(req, ctx, group_commit_header,
can_group_commit));
+ if (!can_group_commit) {
+ ctx->group_commit_mode = OFF_MODE;
+ ctx->group_commit = false;
+ } else {
+ if (!group_commit_header.empty()) {
+ ctx->group_commit_mode = group_commit_header;
+ ctx->group_commit = true;
+ } else {
+ // use table property to decide group commit or not
+ ctx->group_commit_mode = "";
+ ctx->group_commit = false;
}
}
return Status::OK();
diff --git a/be/src/service/http/action/stream_load.h
b/be/src/service/http/action/stream_load.h
index e36b960f379..29fc92065ed 100644
--- a/be/src/service/http/action/stream_load.h
+++ b/be/src/service/http/action/stream_load.h
@@ -50,7 +50,10 @@ private:
Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t
file_bytes);
Status _process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
+ Status _can_group_commit(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx,
+ std::string& group_commit_header, bool&
can_group_commit);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str);
+ Status _check_wal_space(const std::string& group_commit_mode, int64_t
content_length);
Status _handle_group_commit(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 113f32d7ae8..55ebf57ee78 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2569,6 +2569,7 @@ public class SchemaChangeHandler extends AlterHandler {
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
+ add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES);
add(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE);
@@ -2682,6 +2683,7 @@ public class SchemaChangeHandler extends AlterHandler {
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION)
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
+ &&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 789410a629c..1b49e786176 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4030,6 +4030,12 @@ public class Env {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES).append("\"
= \"");
sb.append(olapTable.getGroupCommitDataBytes()).append("\"");
+ // group commit mode (only show when not off_mode)
+ if
(!olapTable.getGroupCommitMode().equalsIgnoreCase(PropertyAnalyzer.GROUP_COMMIT_MODE_OFF))
{
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE).append("\"
= \"");
+ sb.append(olapTable.getGroupCommitMode()).append("\"");
+ }
+
// enable delete on delete predicate
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS &&
olapTable.getEnableUniqueKeyMergeOnWrite()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 5489c0669c0..5199ddbe4fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1698,6 +1698,14 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return getOrCreatTableProperty().getGroupCommitDataBytes();
}
+ public void setGroupCommitMode(String groupCommitMode) {
+ getOrCreatTableProperty().setGroupCommitMode(groupCommitMode);
+ }
+
+ public String getGroupCommitMode() {
+ return getOrCreatTableProperty().getGroupCommitMode();
+ }
+
public Boolean hasSequenceCol() {
return getSequenceCol() != null;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index f8544de47cf..c260fbb99c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -764,6 +764,15 @@ public class TableProperty implements GsonPostProcessable {
Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE)));
}
+ public void setGroupCommitMode(String groupCommitMode) {
+ properties.put(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE,
groupCommitMode);
+ }
+
+ public String getGroupCommitMode() {
+ return
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE,
+ PropertyAnalyzer.GROUP_COMMIT_MODE_OFF);
+ }
+
public void setRowStoreColumns(List<String> rowStoreColumns) {
if (rowStoreColumns != null && !rowStoreColumns.isEmpty()) {
modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN, "true");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
index caf5f93f772..c2155963273 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
@@ -100,6 +100,7 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
throws UserException {
final Set<String> allowedProps = new HashSet<String>() {
{
+ add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES);
add(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS);
@@ -152,6 +153,21 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
}
param.ttlSeconds = ttlSeconds;
param.type = UpdatePartitionMetaParam.TabletMetaType.TTL_SECONDS;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)) {
+ String groupCommitMode =
PropertyAnalyzer.analyzeGroupCommitMode(properties, false);
+ olapTable.readLock();
+ try {
+ if
(groupCommitMode.equalsIgnoreCase(olapTable.getGroupCommitMode())) {
+ LOG.info("groupCommitMode:{} is equal with
olapTable.groupCommitMode():{}",
+ groupCommitMode, olapTable.getGroupCommitMode());
+ return;
+ }
+ partitions.addAll(olapTable.getPartitions());
+ } finally {
+ olapTable.readUnlock();
+ }
+ param.groupCommitMode = groupCommitMode;
+ param.type =
UpdatePartitionMetaParam.TabletMetaType.GROUP_COMMIT_MODE;
} else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) {
int groupCommitIntervalMs =
PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties, false);
olapTable.readLock();
@@ -384,6 +400,7 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
INMEMORY,
PERSISTENT,
TTL_SECONDS,
+ GROUP_COMMIT_MODE,
GROUP_COMMIT_INTERVAL_MS,
GROUP_COMMIT_DATA_BYTES,
COMPACTION_POLICY,
@@ -401,6 +418,7 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
boolean isPersistent = false;
boolean isInMemory = false;
long ttlSeconds = 0;
+ String groupCommitMode;
long groupCommitIntervalMs = 0;
long groupCommitDataBytes = 0;
String compactionPolicy;
@@ -460,6 +478,9 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
case GROUP_COMMIT_DATA_BYTES:
infoBuilder.setGroupCommitDataBytes(param.groupCommitDataBytes);
break;
+ case GROUP_COMMIT_MODE:
+ infoBuilder.setGroupCommitMode(param.groupCommitMode);
+ break;
case COMPACTION_POLICY:
infoBuilder.setCompactionPolicy(param.compactionPolicy);
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 93dc9756f2a..c6a8f0df8db 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -240,6 +240,11 @@ public class PropertyAnalyzer {
public static final int PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE
= Config.group_commit_data_bytes_default_value;
+ public static final String PROPERTIES_GROUP_COMMIT_MODE =
"group_commit_mode";
+ public static final String GROUP_COMMIT_MODE_OFF = "off_mode";
+ public static final String GROUP_COMMIT_MODE_ASYNC = "async_mode";
+ public static final String GROUP_COMMIT_MODE_SYNC = "sync_mode";
+
public static final String PROPERTIES_ENABLE_MOW_LIGHT_DELETE =
"enable_mow_light_delete";
public static final boolean
PROPERTIES_ENABLE_MOW_LIGHT_DELETE_DEFAULT_VALUE
@@ -1908,6 +1913,25 @@ public class PropertyAnalyzer {
return groupCommitDataBytes;
}
+ public static String analyzeGroupCommitMode(Map<String, String>
properties, boolean removeProperty)
+ throws AnalysisException {
+ String groupCommitMode = GROUP_COMMIT_MODE_OFF;
+ if (properties != null &&
properties.containsKey(PROPERTIES_GROUP_COMMIT_MODE)) {
+ groupCommitMode = properties.get(PROPERTIES_GROUP_COMMIT_MODE);
+ if (!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_OFF)
+ &&
!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_ASYNC)
+ &&
!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_SYNC)) {
+ throw new AnalysisException("Invalid group_commit_mode: " +
groupCommitMode
+ + ". Valid values: " + GROUP_COMMIT_MODE_OFF + ", " +
GROUP_COMMIT_MODE_ASYNC
+ + ", " + GROUP_COMMIT_MODE_SYNC);
+ }
+ if (removeProperty) {
+ properties.remove(PROPERTIES_GROUP_COMMIT_MODE);
+ }
+ }
+ return groupCommitMode.toLowerCase();
+ }
+
/**
* Check the type property of the catalog props.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 06b3a551dea..d353520ae63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -2963,6 +2963,13 @@ public class InternalCatalog implements
CatalogIf<Database> {
throw new DdlException(e.getMessage());
}
+ try {
+ String groupCommitMode =
PropertyAnalyzer.analyzeGroupCommitMode(properties, true);
+ olapTable.setGroupCommitMode(groupCommitMode);
+ } catch (Exception e) {
+ throw new DdlException(e.getMessage());
+ }
+
try {
TEncryptionAlgorithm tdeAlgorithm =
PropertyAnalyzer.analyzeTDEAlgorithm(properties);
olapTable.setEncryptionAlgorithm(tdeAlgorithm);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java
index e2189ed0696..8ae93531a0f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java
@@ -334,6 +334,10 @@ public class ModifyTablePropertiesOp extends AlterTableOp {
PropertyAnalyzer.analyzeGroupCommitDataBytes(properties, false);
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)) {
+ PropertyAnalyzer.analyzeGroupCommitMode(properties, false);
+ this.needTableStable = false;
+ this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)) {
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 63d1f19dac5..c537c42c4c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1239,7 +1239,12 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
try {
TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr);
- result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
+ if (tmpRes.isSetTableGroupCommitMode()) {
+ // if use table group commit mode, just return the mode info,
no need to begin txn
+
result.setTableGroupCommitMode(tmpRes.getTableGroupCommitMode()).setDbId(tmpRes.getDbId());
+ } else {
+ result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
+ }
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
LOG.warn("duplicate request for stream load. request id: {}, txn:
{}", e.getDuplicatedRequestId(),
@@ -1296,6 +1301,16 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl,
TableType.OLAP);
+ // check if use table group_commit_mode property
+ if (request.isUseTableGroupCommitMode()) {
+ String tableGroupCommitMode = table.getGroupCommitMode();
+ if (tableGroupCommitMode != null &&
!tableGroupCommitMode.equalsIgnoreCase(
+ PropertyAnalyzer.GROUP_COMMIT_MODE_OFF)) {
+ TLoadTxnBeginResult result = new TLoadTxnBeginResult();
+
result.setTableGroupCommitMode(tableGroupCommitMode).setDbId(db.getId());
+ return result;
+ }
+ }
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() :
Config.stream_load_default_timeout_second;
Backend backend =
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 60596a58828..34422d6c1e3 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -664,6 +664,7 @@ message TabletMetaInfoPB { // For update tablet meta
optional bool disable_auto_compaction = 13;
optional bool enable_mow_light_delete = 14;
optional int32 vertical_compaction_num_columns_per_group = 15;
+ optional string group_commit_mode = 16;
}
message TabletCompactionJobPB {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 7e520751ee1..ccd082e7f52 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -466,6 +466,8 @@ struct TLoadTxnBeginRequest {
14: optional i64 table_id
15: optional i64 backend_id
16: optional TCertBasedAuth cert_based_auth
+ // If set to true: use table group_commit_mode property
+ 17: optional bool use_table_group_commit_mode
}
struct TLoadTxnBeginResult {
@@ -473,6 +475,9 @@ struct TLoadTxnBeginResult {
2: optional i64 txnId
3: optional string job_status // if label already used, set status of
existing job
4: optional i64 db_id
+ // If use_table_group_commit_mode is true in TLoadTxnBeginRequest, and
table group_commit_mode property is
+ // async_mode or sync_mode, return table group_commit_mode (begin_txn is
skipped)
+ 5: optional string table_group_commit_mode
}
struct TBeginTxnRequest {
diff --git a/regression-test/data/query_p0/system/test_table_properties.out
b/regression-test/data/query_p0/system/test_table_properties.out
index 0f8b1a2acaf..9f6e334cc9a 100644
--- a/regression-test/data/query_p0/system/test_table_properties.out
+++ b/regression-test/data/query_p0/system/test_table_properties.out
@@ -1,6 +1,6 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
-- !select_check_1 --
-111
+114
-- !select_check_2 --
internal test_table_properties_db duplicate_table _auto_bucket
false
@@ -19,6 +19,7 @@ internal test_table_properties_db duplicate_table
enable_unique_key_merge_on_wri
internal test_table_properties_db duplicate_table
file_cache_ttl_seconds 0
internal test_table_properties_db duplicate_table
group_commit_data_bytes 134217728
internal test_table_properties_db duplicate_table
group_commit_interval_ms 10000
+internal test_table_properties_db duplicate_table
group_commit_mode off_mode
internal test_table_properties_db duplicate_table in_memory
false
internal test_table_properties_db duplicate_table
inverted_index_storage_format V3
internal test_table_properties_db duplicate_table is_being_synced
false
@@ -55,6 +56,7 @@ internal test_table_properties_db listtable
enable_unique_key_merge_on_write fal
internal test_table_properties_db listtable
file_cache_ttl_seconds 0
internal test_table_properties_db listtable
group_commit_data_bytes 134217728
internal test_table_properties_db listtable
group_commit_interval_ms 10000
+internal test_table_properties_db listtable
group_commit_mode off_mode
internal test_table_properties_db listtable in_memory
false
internal test_table_properties_db listtable
inverted_index_storage_format V3
internal test_table_properties_db listtable is_being_synced
false
@@ -91,6 +93,7 @@ internal test_table_properties_db unique_table
enable_unique_key_merge_on_write
internal test_table_properties_db unique_table
file_cache_ttl_seconds 0
internal test_table_properties_db unique_table
group_commit_data_bytes 134217728
internal test_table_properties_db unique_table
group_commit_interval_ms 10000
+internal test_table_properties_db unique_table
group_commit_mode off_mode
internal test_table_properties_db unique_table in_memory
false
internal test_table_properties_db unique_table
inverted_index_storage_format V3
internal test_table_properties_db unique_table is_being_synced
false
@@ -129,6 +132,7 @@ internal test_table_properties_db duplicate_table
enable_unique_key_merge_on_wri
internal test_table_properties_db duplicate_table
file_cache_ttl_seconds 0
internal test_table_properties_db duplicate_table
group_commit_data_bytes 134217728
internal test_table_properties_db duplicate_table
group_commit_interval_ms 10000
+internal test_table_properties_db duplicate_table
group_commit_mode off_mode
internal test_table_properties_db duplicate_table in_memory
false
internal test_table_properties_db duplicate_table
inverted_index_storage_format V3
internal test_table_properties_db duplicate_table is_being_synced
false
@@ -165,6 +169,7 @@ internal test_table_properties_db unique_table
enable_unique_key_merge_on_write
internal test_table_properties_db unique_table
file_cache_ttl_seconds 0
internal test_table_properties_db unique_table
group_commit_data_bytes 134217728
internal test_table_properties_db unique_table
group_commit_interval_ms 10000
+internal test_table_properties_db unique_table
group_commit_mode off_mode
internal test_table_properties_db unique_table in_memory
false
internal test_table_properties_db unique_table
inverted_index_storage_format V3
internal test_table_properties_db unique_table is_being_synced
false
@@ -205,6 +210,7 @@ internal test_table_properties_db duplicate_table
enable_unique_key_merge_on_wri
internal test_table_properties_db duplicate_table
file_cache_ttl_seconds 0
internal test_table_properties_db duplicate_table
group_commit_data_bytes 134217728
internal test_table_properties_db duplicate_table
group_commit_interval_ms 10000
+internal test_table_properties_db duplicate_table
group_commit_mode off_mode
internal test_table_properties_db duplicate_table in_memory
false
internal test_table_properties_db duplicate_table
inverted_index_storage_format V3
internal test_table_properties_db duplicate_table is_being_synced
false
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index ff8aaeeefc7..9f18e9a813c 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -513,6 +513,69 @@ suite("insert_group_commit_into") {
"""
exception """group_commit_data_bytes must be greater than 0"""
}
+
+ // Test group_commit_mode property
+ sql """ drop table if exists ${table}_mode; """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table}_mode (
+ k1 INT not null,
+ k2 varchar(50)
+ )
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`)
+ BUCKETS 1 PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "group_commit_interval_ms" = "200",
+ "group_commit_mode" = "async_mode"
+ );
+ """
+
+ // Verify SHOW CREATE TABLE displays group_commit_mode
+ def createStmt = sql """ SHOW CREATE TABLE ${table}_mode """
+ logger.info("SHOW CREATE TABLE result: " + createStmt)
+ assertTrue(createStmt.toString().contains('group_commit_mode'), "SHOW
CREATE TABLE should contain group_commit_mode")
+ assertTrue(createStmt.toString().contains('async_mode'), "SHOW CREATE
TABLE should contain async_mode")
+
+ // Test ALTER TABLE to change group_commit_mode
+ sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" =
"SYNC_MODE"); """
+
+ def createStmt2 = sql """ SHOW CREATE TABLE ${table}_mode """
+ logger.info("SHOW CREATE TABLE result after alter: " + createStmt2)
+ assertTrue(createStmt2.toString().contains('SYNC_MODE'), "SHOW CREATE
TABLE should contain sync_mode after alter")
+
+ // Test ALTER TABLE to change back to off_mode - should NOT show in
SHOW CREATE TABLE
+ sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" =
"OFF_MODE"); """
+
+ def createStmt3 = sql """ SHOW CREATE TABLE ${table}_mode """
+ logger.info("SHOW CREATE TABLE result after alter to off_mode: " +
createStmt3)
+ // off_mode should NOT be shown in SHOW CREATE TABLE
+ assertTrue(!createStmt3.toString().contains('group_commit_mode'),
+ "off_mode should NOT be shown in SHOW CREATE TABLE")
+
+ // Test invalid group_commit_mode value
+ test {
+ sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" =
"invalid_mode"); """
+ exception """Invalid group_commit_mode"""
+ }
+
+ // Test default value (off_mode) - should NOT show group_commit_mode
in SHOW CREATE TABLE
+ sql """ drop table if exists ${table}_mode_default; """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table}_mode_default (
+ k1 INT not null
+ )
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`)
+ BUCKETS 1 PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ def createStmt4 = sql """ SHOW CREATE TABLE ${table}_mode_default """
+ logger.info("SHOW CREATE TABLE result for default: " + createStmt4)
+ // When default is off_mode, it should NOT be shown in SHOW CREATE
TABLE
+ assertTrue(!createStmt4.toString().contains('group_commit_mode'),
+ "Default off_mode should NOT be shown in SHOW CREATE TABLE")
+
} finally {
}
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index a2de2a04b98..342b87f1396 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -31,6 +31,22 @@ suite("test_group_commit_stream_load") {
}
}
+ def getTableRowCount = { tableName1, expectedRowCount ->
+ def retry = 0
+ def row = 0
+ while (retry < 30) {
+ sleep(2000)
+ def rowCount = sql "select count(*) from ${tableName1}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ row = rowCount[0][0]
+ if (row >= expectedRowCount) {
+ break
+ }
+ retry++
+ }
+ assertTrue(row >= expectedRowCount, "Expected at least
${expectedRowCount} rows, but got ${row}")
+ }
+
def getAlterTableState = {
waitForSchemaChangeDone {
sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${tableName}'
ORDER BY createtime DESC LIMIT 1 """
@@ -342,4 +358,164 @@ suite("test_group_commit_stream_load") {
}
qt_read_json_by_line "select
k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName}
order by k;"
+ // Test: stream load using table property group_commit_mode (async_mode)
+ // When HTTP header 'group_commit' is not set, should use table's
group_commit_mode property
+ def tableNameAsync = "test_group_commit_stream_load_table_property_async"
+ try {
+ sql """ drop table if exists ${tableNameAsync}; """
+
+ sql """
+ CREATE TABLE `${tableNameAsync}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "group_commit_interval_ms" = "200",
+ "group_commit_mode" = "async_mode"
+ );
+ """
+
+ // Verify SHOW CREATE TABLE contains group_commit_mode
+ def createStmt1 = sql """ SHOW CREATE TABLE ${tableNameAsync} """
+ logger.info("SHOW CREATE TABLE for async: " + createStmt1)
+ assertTrue(createStmt1.toString().contains('async_mode'), "Table
should have async_mode")
+
+ // Stream load WITHOUT setting group_commit header - should use table
property
+ streamLoad {
+ table "${tableNameAsync}"
+ set 'column_separator', ','
+ // NOT setting 'group_commit' header - should use table property
+ set 'columns', 'id, name'
+ file "test_stream_load1.csv"
+ unset 'label'
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
+ }
+ // Check data is loaded
+ getTableRowCount(tableNameAsync, 2)
+
+ streamLoad {
+ table "${tableNameAsync}"
+ set 'column_separator', ','
+ // NOT setting 'group_commit' header - should use table property,
but set partitions
+ set 'partitions', "${tableNameAsync}"
+ set 'columns', 'id, name'
+ file "test_stream_load1.csv"
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result (header override):
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ def label = json.Label
+ assertTrue(!label.startsWith("group_commit"))
+ }
+ }
+ } finally {
+ }
+
+ // Test: stream load using table property group_commit_mode (sync_mode)
+ def tableNameSync = "test_group_commit_stream_load_table_property_sync"
+ try {
+ sql """ drop table if exists ${tableNameSync}; """
+
+ sql """
+ CREATE TABLE `${tableNameSync}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "group_commit_interval_ms" = "200",
+ "group_commit_mode" = "SYNC_MODE"
+ );
+ """
+
+ // Verify SHOW CREATE TABLE contains group_commit_mode
+ def createStmt2 = sql """ SHOW CREATE TABLE ${tableNameSync} """
+ logger.info("SHOW CREATE TABLE for sync: " + createStmt2)
+ assertTrue(createStmt2.toString().contains('sync_mode'), "Table should
have sync_mode")
+
+ // Stream load WITHOUT setting group_commit header - should use table
property (sync_mode)
+ streamLoad {
+ table "${tableNameSync}"
+ set 'column_separator', ','
+ // NOT setting 'group_commit' header - should use table property
+ set 'columns', 'id, name'
+ file "test_stream_load1.csv"
+ unset 'label'
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
+ }
+
+ // Check data is loaded
+ def rowCount2 = sql "select count(*) from ${tableNameSync}"
+ logger.info("Row count for sync table: " + rowCount2)
+ assertTrue(rowCount2[0][0] > 0, "Data should be loaded")
+
+ } finally {
+ }
+
+ // Test: stream load with header override table property
+ // Table has async_mode, but header sets off_mode
+ def tableNameOverride = "test_group_commit_stream_load_override"
+ try {
+ sql """ drop table if exists ${tableNameOverride}; """
+
+ sql """
+ CREATE TABLE `${tableNameOverride}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "group_commit_interval_ms" = "200",
+ "group_commit_mode" = "async_mode"
+ );
+ """
+
+ // Stream load with header setting group_commit=off_mode - should
override table property
+ streamLoad {
+ table "${tableNameOverride}"
+ set 'column_separator', ','
+ set 'columns', 'id, name'
+ set 'group_commit', 'off_mode' // Override table property
+ file "test_stream_load1.csv"
+ set 'label', 'test_override_' + System.currentTimeMillis()
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result (header override):
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ def label = json.Label
+ assertTrue(label.startsWith("test_override_"), "Label should
start with test_override_")
+ // When off_mode, GroupCommit should be false or label should
not start with group_commit_
+ // Note: GroupCommit field behavior may vary, but label should
NOT be group_commit_ when off_mode
+ }
+ }
+ } finally {
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]