This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d60268d30c8 [fix](cloud) fix job status is empty when label reused in
cloud mode (#42878)
d60268d30c8 is described below
commit d60268d30c88da41e42450170e497279b0e284bc
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 31 16:57:26 2024 +0800
[fix](cloud) fix job status is empty when label reused in cloud mode
(#42878)
Cherry-picked from #42700
---
cloud/src/meta-service/meta_service_txn.cpp | 1 +
.../transaction/CloudGlobalTransactionMgr.java | 3 +-
.../doris/common/LabelAlreadyUsedException.java | 21 +++++
gensrc/proto/cloud.proto | 1 +
.../stream_load/test_stream_load_job_status.groovy | 90 ++++++++++++++++++++++
5 files changed, 115 insertions(+), 1 deletion(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 03cb7866e1a..ddd1f8cdf79 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -266,6 +266,7 @@ void
MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller,
}
// clang-format on
}
+ response->set_txn_status(cur_txn_info.status());
code = MetaServiceCode::TXN_LABEL_ALREADY_USED;
ss << "Label [" << label << "] has already been used, relate to
txn ["
<< cur_txn_info.txn_id() << "], status=[" <<
TxnStatusPB_Name(cur_txn_info.status())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 81c5a3bfdba..131473470ab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -273,7 +273,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
throw new
DuplicatedRequestException(DebugUtil.printId(requestId),
beginTxnResponse.getDupTxnId(),
beginTxnResponse.getStatus().getMsg());
case TXN_LABEL_ALREADY_USED:
- throw new
LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false);
+ throw new
LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false,
+ beginTxnResponse.getTxnStatus());
default:
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_REJECT.increase(1L);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
index 8c508809d59..f1789881cdc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
@@ -17,6 +17,7 @@
package org.apache.doris.common;
+import org.apache.doris.cloud.proto.Cloud.TxnStatusPB;
import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Preconditions;
@@ -37,6 +38,26 @@ public class LabelAlreadyUsedException extends DdlException {
super(isLabel ? "Label [" + msg + "] has already been used." : msg);
}
+ public LabelAlreadyUsedException(String msg, boolean isLabel, TxnStatusPB
txnStatus) {
+ super(isLabel ? "Label [" + msg + "] has already been used." : msg);
+ switch (txnStatus) {
+ case TXN_STATUS_UNKNOWN:
+ case TXN_STATUS_PREPARED:
+ jobStatus = "RUNNING";
+ break;
+ case TXN_STATUS_PRECOMMITTED:
+ jobStatus = "PRECOMMITTED";
+ break;
+ case TXN_STATUS_COMMITTED:
+ case TXN_STATUS_VISIBLE:
+ jobStatus = "FINISHED";
+ break;
+ default:
+ Preconditions.checkState(false, txnStatus);
+ break;
+ }
+ }
+
public LabelAlreadyUsedException(TransactionState txn) {
super("Label [" + txn.getLabel() + "] has already been used, relate to
txn [" + txn.getTransactionId()
+ "], status [" + txn.getTransactionStatus() + "].");
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 93420bddbf6..8ae48851601 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -642,6 +642,7 @@ message BeginTxnResponse {
optional MetaServiceResponseStatus status = 1;
optional int64 txn_id = 2;
optional int64 dup_txn_id = 3;
+ optional TxnStatusPB txn_status = 4;
// TODO: There may be more fields TBD
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy
new file mode 100644
index 00000000000..9cb38747e22
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_job_status", "p0") {
+ def tableName = "test_stream_load_job_status"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` bigint(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) SUM NULL,
+ `v2` tinyint(4) REPLACE NULL,
+ `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+ `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+ `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+ `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+ `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+ `v8` datetime REPLACE_IF_NOT_NULL NULL,
+ `v9` date REPLACE_IF_NOT_NULL NULL,
+ `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+ `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+ `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`k1`)
+ (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")),
+ PARTITION partition_b VALUES [("100000"), ("1000000000")),
+ PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+ PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '\t'
+ set 'label', 'test_stream_load_job_status'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+ set 'strict_mode', 'true'
+
+ file 'test_strict_mode.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '\t'
+ set 'label', 'test_stream_load_job_status'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+ set 'strict_mode', 'true'
+
+ file 'test_strict_mode.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("label already exists", json.Status.toLowerCase())
+ assertEquals("finished", json.ExistingJobStatus.toLowerCase())
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]