This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0-preview by this 
push:
     new 127b8776f48 [fix](merge-cloud) fix inconsistency between cloud mode 
and local mode for 2PC (#33917)
127b8776f48 is described below

commit 127b8776f4832e432ba8a172e462ceef2c351e38
Author: Xin Liao <[email protected]>
AuthorDate: Sun Apr 21 10:52:55 2024 +0800

    [fix](merge-cloud) fix inconsistency between cloud mode and local mode for 
2PC (#33917)
---
 cloud/src/common/bvars.cpp                         |   1 +
 cloud/src/common/bvars.h                           |   1 +
 cloud/src/meta-service/meta_service.h              |   8 ++
 cloud/src/meta-service/meta_service_txn.cpp        | 122 +++++++++++++++++
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |   4 +
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |  10 ++
 .../transaction/CloudGlobalTransactionMgr.java     |  43 +++++-
 .../doris/common/LabelAlreadyUsedException.java    |   6 +-
 gensrc/proto/cloud.proto                           |  13 ++
 .../load_p0/stream_load/test_stream_load_2pc.out   |   5 +
 .../stream_load/test_stream_load_2pc.groovy        | 150 +++++++++++++++++++++
 11 files changed, 360 insertions(+), 3 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index ab0b5934b50..1aa436bb603 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -70,6 +70,7 @@ BvarLatencyRecorderWithTag 
g_bvar_ms_get_delete_bitmap_update_lock("ms",
                                                                    
"get_delete_bitmap_update_lock");
 BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
 BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", 
"get_rl_task_commit_attach");
+BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
 
 BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", 
"start_tablet_job");
 BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", 
"finish_tablet_job");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index dbdbfa834e9..b55e1051cd9 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -171,6 +171,7 @@ extern BvarLatencyRecorderWithTag 
g_bvar_ms_get_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
+extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
 
 // txn_kv's bvars
 extern bvar::LatencyRecorder g_bvar_txn_kv_get;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 4b7ae829795..4dc4113f341 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -254,6 +254,9 @@ public:
                                    GetRLTaskCommitAttachResponse* response,
                                    ::google::protobuf::Closure* done) override;
 
+    void get_txn_id(::google::protobuf::RpcController* controller, const 
GetTxnIdRequest* request,
+                    GetTxnIdResponse* response, ::google::protobuf::Closure* 
done) override;
+
     // ATTN: If you add a new method, please also add the corresponding 
implementation in `MetaServiceProxy`.
 
     std::pair<MetaServiceCode, std::string> get_instance_info(const 
std::string& instance_id,
@@ -585,6 +588,11 @@ public:
                   done);
     }
 
+    void get_txn_id(::google::protobuf::RpcController* controller, const 
GetTxnIdRequest* request,
+                    GetTxnIdResponse* response, ::google::protobuf::Closure* 
done) override {
+        call_impl(&cloud::MetaService::get_txn_id, controller, request, 
response, done);
+    }
+
 private:
     template <typename Request, typename Response>
     using MetaServiceMethod = void 
(cloud::MetaService::*)(::google::protobuf::RpcController*,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 139206ede20..0afdc31d10a 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1969,4 +1969,126 @@ void 
MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
     code = MetaServiceCode::OK;
 }
 
+// get txn id by label
+// 1. When the requested status is not empty, return the txnid
+//    corresponding to the status. There may be multiple
+//    requested status, just match one.
+// 2. When the requested status is empty, return the latest txnid.
+void MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller,
+                                 const GetTxnIdRequest* request, 
GetTxnIdResponse* response,
+                                 ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(get_txn_id);
+    if (!request->has_db_id()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "missing db id";
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    RPC_RATE_LIMIT(get_txn_id)
+    const int64_t db_id = request->db_id();
+    std::string label = request->label();
+    const std::string label_key = txn_label_key({instance_id, db_id, label});
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << 
db_id
+                     << " label=" << label;
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label
+           << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    std::string label_val;
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get failed(), err=" << err << " label=" << label;
+        msg = ss.str();
+        return;
+    }
+
+    if (label_val.size() <= VERSION_STAMP_LEN) {
+        code = MetaServiceCode::TXN_ID_NOT_FOUND;
+        ss << "transaction not found, label=" << label;
+        return;
+    }
+
+    TxnLabelPB label_pb;
+    //label_val.size() > VERSION_STAMP_LEN means label has previous txn ids.
+    if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - 
VERSION_STAMP_LEN)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "label_pb->ParseFromString() failed, label=" << label;
+        msg = ss.str();
+        return;
+    }
+    if (label_pb.txn_ids_size() == 0) {
+        code = MetaServiceCode::TXN_ID_NOT_FOUND;
+        ss << "transaction not found, label=" << label;
+        msg = ss.str();
+        return;
+    }
+
+    // find the latest txn
+    if (request->txn_status_size() == 0) {
+        response->set_txn_id(*label_pb.txn_ids().rbegin());
+        return;
+    }
+
+    for (auto& cur_txn_id : label_pb.txn_ids()) {
+        const std::string cur_info_key = txn_info_key({instance_id, db_id, 
cur_txn_id});
+        std::string cur_info_val;
+        err = txn->get(cur_info_key, &cur_info_val);
+        if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" 
<< label
+               << " err=" << err;
+            msg = ss.str();
+            return;
+        }
+
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            //label_to_idx and txn info inconsistency.
+            code = MetaServiceCode::TXN_ID_NOT_FOUND;
+            ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" 
<< label
+               << " err=" << err;
+            msg = ss.str();
+            return;
+        }
+
+        TxnInfoPB cur_txn_info;
+        if (!cur_txn_info.ParseFromString(cur_info_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << 
cur_txn_id
+               << " label=" << label << " err=" << err;
+            msg = ss.str();
+            return;
+        }
+
+        VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
+        for (auto txn_status : request->txn_status()) {
+            if (cur_txn_info.status() == txn_status) {
+                response->set_txn_id(cur_txn_id);
+                return;
+            }
+        }
+    }
+    code = MetaServiceCode::TXN_ID_NOT_FOUND;
+    ss << "transaction not found, label=" << label;
+    msg = ss.str();
+    return;
+}
+
 } // namespace doris::cloud
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 847c90a73fd..7463a684680 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -192,6 +192,10 @@ public class MetaServiceClient {
         return blockingStub.getTxn(request);
     }
 
+    public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request) {
+        return blockingStub.getTxnId(request);
+    }
+
     public Cloud.GetCurrentMaxTxnResponse 
getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request) {
         return blockingStub.getCurrentMaxTxnId(request);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 052abbcf4ef..117cfd71bd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -226,6 +226,16 @@ public class MetaServiceProxy {
         }
     }
 
+    public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request)
+            throws RpcException {
+        try {
+            final MetaServiceClient client = getProxy();
+            return client.getTxnId(request);
+        } catch (Exception e) {
+            throw new RpcException("", e.getMessage(), e);
+        }
+    }
+
     public Cloud.GetCurrentMaxTxnResponse 
getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request)
             throws RpcException {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index d325f1dac2b..35b3cd28529 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -43,6 +43,8 @@ import 
org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest;
 import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse;
+import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest;
+import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse;
 import org.apache.doris.cloud.proto.Cloud.GetTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.GetTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB;
@@ -51,6 +53,7 @@ import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.TableStatsPB;
 import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
+import org.apache.doris.cloud.proto.Cloud.TxnStatusPB;
 import org.apache.doris.cloud.proto.Cloud.UniqueIdPB;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.AnalysisException;
@@ -90,6 +93,7 @@ import org.apache.doris.transaction.GlobalTransactionMgrIface;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionCommitFailedException;
 import org.apache.doris.transaction.TransactionIdGenerator;
+import org.apache.doris.transaction.TransactionNotFoundException;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -248,7 +252,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     throw new 
DuplicatedRequestException(DebugUtil.printId(requestId),
                             beginTxnResponse.getDupTxnId(), 
beginTxnResponse.getStatus().getMsg());
                 case TXN_LABEL_ALREADY_USED:
-                    throw new LabelAlreadyUsedException(label);
+                    throw new 
LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false);
                 default:
                     if (MetricRepo.isInit) {
                         MetricRepo.COUNTER_TXN_REJECT.increase(1L);
@@ -483,6 +487,9 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             internalMsgBuilder.append(commitTxnResponse.getStatus().getCode());
             throw new UserException("internal error, " + 
internalMsgBuilder.toString());
         }
+        if (is2PC && commitTxnResponse.getStatus().getCode() == 
MetaServiceCode.TXN_ALREADY_VISIBLE) {
+            throw new UserException(commitTxnResponse.getStatus().getMsg());
+        }
 
         TransactionState txnState = 
TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
         TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());
@@ -1103,7 +1110,39 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     @Override
     public Long getTransactionIdByLabel(Long dbId, String label, 
List<TransactionStatus> statusList)
             throws UserException {
-        throw new UserException(NOT_SUPPORTED_MSG);
+        LOG.info("try to get transaction id by label, dbId:{}, label:{}", 
dbId, label);
+        GetTxnIdRequest.Builder builder = GetTxnIdRequest.newBuilder();
+        builder.setDbId(dbId);
+        builder.setLabel(label);
+        builder.setCloudUniqueId(Config.cloud_unique_id);
+        for (TransactionStatus status : statusList) {
+            if (status == TransactionStatus.PREPARE) {
+                builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PREPARED);
+            } else if (status == TransactionStatus.PRECOMMITTED) {
+                builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PRECOMMITTED);
+            } else if (status == TransactionStatus.COMMITTED) {
+                builder.addTxnStatus(TxnStatusPB.TXN_STATUS_COMMITTED);
+            }
+        }
+
+        final GetTxnIdRequest getTxnIdRequest = builder.build();
+        GetTxnIdResponse getTxnIdResponse = null;
+        try {
+            LOG.info("getTxnRequest:{}", getTxnIdRequest);
+            getTxnIdResponse = MetaServiceProxy
+                    .getInstance().getTxnId(getTxnIdRequest);
+            LOG.info("getTxnIdReponse: {}", getTxnIdResponse);
+        } catch (RpcException e) {
+            LOG.info("getTransactionId exception: {}", e.getMessage());
+            throw new TransactionNotFoundException("transaction not found, 
label=" + label);
+        }
+
+        if (getTxnIdResponse.getStatus().getCode() != MetaServiceCode.OK) {
+            LOG.info("getTransactionState exception: {}, {}", 
getTxnIdResponse.getStatus().getCode(),
+                    getTxnIdResponse.getStatus().getMsg());
+            throw new TransactionNotFoundException("transaction not found, 
label=" + label);
+        }
+        return getTxnIdResponse.getTxnId();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
index 7c3e7e31c72..d739f2032f3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
@@ -30,7 +30,11 @@ public class LabelAlreadyUsedException extends DdlException {
     private String jobStatus;
 
     public LabelAlreadyUsedException(String label) {
-        super("Label [" + label + "] has already been used.");
+        this(label, true);
+    }
+
+    public LabelAlreadyUsedException(String msg, boolean isLabel) {
+        super(isLabel ? "Label [" + msg + "] has already been used." : msg);
     }
 
     public LabelAlreadyUsedException(TransactionState txn) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 2bcc3dabfcd..9a5705bc781 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -687,6 +687,18 @@ message GetTxnResponse {
     optional TxnInfoPB txn_info = 2;
 }
 
+message GetTxnIdRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional int64 db_id = 2;
+    optional string label = 3;
+    repeated TxnStatusPB txn_status = 4;
+}
+
+message GetTxnIdResponse {
+    optional MetaServiceResponseStatus status = 1;
+    optional int64 txn_id = 2;
+}
+
 message GetCurrentMaxTxnRequest {
     optional string cloud_unique_id = 1; // For auth
 }
@@ -1344,6 +1356,7 @@ service MetaService {
     rpc get_current_max_txn_id(GetCurrentMaxTxnRequest) returns 
(GetCurrentMaxTxnResponse);
     rpc check_txn_conflict(CheckTxnConflictRequest) returns 
(CheckTxnConflictResponse);
     rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse);
+    rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);
 
     rpc get_version(GetVersionRequest) returns (GetVersionResponse);
     rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse);
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out 
b/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out
new file mode 100644
index 00000000000..3e449544055
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_2pc_commit --
+1      -50     1       2       1       \N
+2      -50     1       44      1       \N
+
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy
new file mode 100644
index 00000000000..e05850f5e57
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+
+import java.text.SimpleDateFormat
+
+suite("test_stream_load_2pc", "p0") {
+    def tableName = "test_2pc_table"
+    InetSocketAddress address = context.config.feHttpInetSocketAddress
+    String user = context.config.feHttpUser
+    String password = context.config.feHttpPassword
+    String db = context.config.getDbNameByFile(context.file)
+
+    def do_streamload_2pc_commit_by_label = { label ->
+        def command = "curl -X PUT --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+                " -H label:${label}" +
+                " -H txn_operation:commit" +
+                " 
http://${context.config.feHttpAddress}/api/${db}/${tableName}/_stream_load_2pc";
+        log.info("http_stream execute 2pc: ${command}")
+
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        log.info("http_stream 2pc result: ${out}".toString())
+        def json2pc = parseJson(out)
+        return json2pc
+    }
+
+    def do_streamload_2pc_commit_by_txn_id = { txnId ->
+        def command = "curl -X PUT --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+                " -H txn_id:${txnId}" +
+                " -H txn_operation:commit" +
+                " 
http://${context.config.feHttpAddress}/api/${db}/${tableName}/_stream_load_2pc";
+        log.info("http_stream execute 2pc: ${command}")
+
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        log.info("http_stream 2pc result: ${out}".toString())
+        def json2pc = parseJson(out)
+        return json2pc
+    }
+
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` bigint(20) NULL DEFAULT "1",
+                `k2` bigint(20) NULL ,
+                `v1` tinyint(4) NULL,
+                `v2` tinyint(4) NULL,
+                `v3` tinyint(4) NULL,
+                `v4` DATETIME NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def label = UUID.randomUUID().toString().replaceAll("-", "")
+        def txnId;
+        streamLoad {
+            table "${tableName}"
+
+            set 'label', "${label}"
+            set 'column_separator', '|'
+            set 'columns', 'k1, k2, v1, v2, v3'
+            set 'two_phase_commit', 'true'
+
+            file 'test_two_phase_commit.csv'
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(2, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                txnId = json.TxnId
+            }
+        }
+
+        streamLoad {
+            table "${tableName}"
+
+            set 'label', "${label}"
+            set 'column_separator', '|'
+            set 'columns', 'k1, k2, v1, v2, v3'
+            set 'two_phase_commit', 'true'
+
+            file 'test_two_phase_commit.csv'
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("label already exists", json.Status.toLowerCase())
+                assertTrue(json.Message.contains("has already been used, 
relate to txn"))
+            }
+        }
+
+        def json2pc = do_streamload_2pc_commit_by_label.call(label)
+        assertEquals("success", json2pc.status.toLowerCase())
+
+        def count = 0
+        while (true) {
+            res = sql "select count(*) from ${tableName}"
+            if (res[0][0] > 0) {
+                break
+            }
+            if (count >= 60) {
+                log.error("stream load commit can not visible for long time")
+                assertEquals(2, res[0][0])
+                break
+            }
+            sleep(1000)
+            count++
+        }
+
+         qt_sql_2pc_commit "select * from ${tableName} order by k1"
+
+         json2pc = do_streamload_2pc_commit_by_txn_id.call(txnId)
+         assertTrue(json2pc.msg.contains("is already visible, not 
pre-committed"))
+    } finally {
+        sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
+    }
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to