This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4d916401438 [fix](merge-cloud) fix inconsistency between cloud mode
and local mode for 2PC (#33917)
4d916401438 is described below
commit 4d916401438d5219aa86169dd9b315d60e2bfe98
Author: Xin Liao <[email protected]>
AuthorDate: Sun Apr 21 10:52:55 2024 +0800
[fix](merge-cloud) fix inconsistency between cloud mode and local mode for
2PC (#33917)
---
cloud/src/common/bvars.cpp | 1 +
cloud/src/common/bvars.h | 1 +
cloud/src/meta-service/meta_service.h | 8 ++
cloud/src/meta-service/meta_service_txn.cpp | 122 +++++++++++++++++
.../apache/doris/cloud/rpc/MetaServiceClient.java | 4 +
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 10 ++
.../transaction/CloudGlobalTransactionMgr.java | 43 +++++-
.../doris/common/LabelAlreadyUsedException.java | 6 +-
gensrc/proto/cloud.proto | 13 ++
.../load_p0/stream_load/test_stream_load_2pc.out | 5 +
.../stream_load/test_stream_load_2pc.groovy | 150 +++++++++++++++++++++
11 files changed, 360 insertions(+), 3 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index ab0b5934b50..1aa436bb603 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -70,6 +70,7 @@ BvarLatencyRecorderWithTag
g_bvar_ms_get_delete_bitmap_update_lock("ms",
"get_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms",
"get_rl_task_commit_attach");
+BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms",
"start_tablet_job");
BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms",
"finish_tablet_job");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index dbdbfa834e9..b55e1051cd9 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -171,6 +171,7 @@ extern BvarLatencyRecorderWithTag
g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
+extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index 4b7ae829795..4dc4113f341 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -254,6 +254,9 @@ public:
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;
+ void get_txn_id(::google::protobuf::RpcController* controller, const
GetTxnIdRequest* request,
+ GetTxnIdResponse* response, ::google::protobuf::Closure*
done) override;
+
// ATTN: If you add a new method, please also add the corresponding
implementation in `MetaServiceProxy`.
std::pair<MetaServiceCode, std::string> get_instance_info(const
std::string& instance_id,
@@ -585,6 +588,11 @@ public:
done);
}
+ void get_txn_id(::google::protobuf::RpcController* controller, const
GetTxnIdRequest* request,
+ GetTxnIdResponse* response, ::google::protobuf::Closure*
done) override {
+ call_impl(&cloud::MetaService::get_txn_id, controller, request,
response, done);
+ }
+
private:
template <typename Request, typename Response>
using MetaServiceMethod = void
(cloud::MetaService::*)(::google::protobuf::RpcController*,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 139206ede20..0afdc31d10a 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1969,4 +1969,126 @@ void
MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
code = MetaServiceCode::OK;
}
+// get txn id by label
+// 1. When the requested status is not empty, return the txnid
+// corresponding to the status. There may be multiple
+// requested status, just match one.
+// 2. When the requested status is empty, return the latest txnid.
+void MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller,
+ const GetTxnIdRequest* request,
GetTxnIdResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(get_txn_id);
+ if (!request->has_db_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "missing db id";
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ std::string cloud_unique_id = request->has_cloud_unique_id() ?
request->cloud_unique_id() : "";
+ instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ ss << "cannot find instance_id with cloud_unique_id="
+ << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ RPC_RATE_LIMIT(get_txn_id)
+ const int64_t db_id = request->db_id();
+ std::string label = request->label();
+ const std::string label_key = txn_label_key({instance_id, db_id, label});
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" <<
db_id
+ << " label=" << label;
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label
+ << " db_id=" << db_id;
+ msg = ss.str();
+ return;
+ }
+
+ std::string label_val;
+ err = txn->get(label_key, &label_val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "txn->get failed(), err=" << err << " label=" << label;
+ msg = ss.str();
+ return;
+ }
+
+ if (label_val.size() <= VERSION_STAMP_LEN) {
+ code = MetaServiceCode::TXN_ID_NOT_FOUND;
+ ss << "transaction not found, label=" << label;
+ return;
+ }
+
+ TxnLabelPB label_pb;
+ //label_val.size() > VERSION_STAMP_LEN means label has previous txn ids.
+ if (!label_pb.ParseFromArray(label_val.data(), label_val.size() -
VERSION_STAMP_LEN)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "label_pb->ParseFromString() failed, label=" << label;
+ msg = ss.str();
+ return;
+ }
+ if (label_pb.txn_ids_size() == 0) {
+ code = MetaServiceCode::TXN_ID_NOT_FOUND;
+ ss << "transaction not found, label=" << label;
+ msg = ss.str();
+ return;
+ }
+
+ // find the latest txn
+ if (request->txn_status_size() == 0) {
+ response->set_txn_id(*label_pb.txn_ids().rbegin());
+ return;
+ }
+
+ for (auto& cur_txn_id : label_pb.txn_ids()) {
+ const std::string cur_info_key = txn_info_key({instance_id, db_id,
cur_txn_id});
+ std::string cur_info_val;
+ err = txn->get(cur_info_key, &cur_info_val);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label="
<< label
+ << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ //label_to_idx and txn info inconsistency.
+ code = MetaServiceCode::TXN_ID_NOT_FOUND;
+ ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label="
<< label
+ << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ TxnInfoPB cur_txn_info;
+ if (!cur_txn_info.ParseFromString(cur_info_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" <<
cur_txn_id
+ << " label=" << label << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
+ for (auto txn_status : request->txn_status()) {
+ if (cur_txn_info.status() == txn_status) {
+ response->set_txn_id(cur_txn_id);
+ return;
+ }
+ }
+ }
+ code = MetaServiceCode::TXN_ID_NOT_FOUND;
+ ss << "transaction not found, label=" << label;
+ msg = ss.str();
+ return;
+}
+
} // namespace doris::cloud
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 847c90a73fd..7463a684680 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -192,6 +192,10 @@ public class MetaServiceClient {
return blockingStub.getTxn(request);
}
+ public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request) {
+ return blockingStub.getTxnId(request);
+ }
+
public Cloud.GetCurrentMaxTxnResponse
getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request) {
return blockingStub.getCurrentMaxTxnId(request);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 052abbcf4ef..117cfd71bd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -226,6 +226,16 @@ public class MetaServiceProxy {
}
}
+ public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request)
+ throws RpcException {
+ try {
+ final MetaServiceClient client = getProxy();
+ return client.getTxnId(request);
+ } catch (Exception e) {
+ throw new RpcException("", e.getMessage(), e);
+ }
+ }
+
public Cloud.GetCurrentMaxTxnResponse
getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request)
throws RpcException {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index d325f1dac2b..35b3cd28529 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -43,6 +43,8 @@ import
org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnRequest;
import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse;
import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest;
import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse;
+import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest;
+import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse;
import org.apache.doris.cloud.proto.Cloud.GetTxnRequest;
import org.apache.doris.cloud.proto.Cloud.GetTxnResponse;
import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB;
@@ -51,6 +53,7 @@ import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest;
import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse;
import org.apache.doris.cloud.proto.Cloud.TableStatsPB;
import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
+import org.apache.doris.cloud.proto.Cloud.TxnStatusPB;
import org.apache.doris.cloud.proto.Cloud.UniqueIdPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
@@ -90,6 +93,7 @@ import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;
import org.apache.doris.transaction.TransactionIdGenerator;
+import org.apache.doris.transaction.TransactionNotFoundException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -248,7 +252,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
throw new
DuplicatedRequestException(DebugUtil.printId(requestId),
beginTxnResponse.getDupTxnId(),
beginTxnResponse.getStatus().getMsg());
case TXN_LABEL_ALREADY_USED:
- throw new LabelAlreadyUsedException(label);
+ throw new
LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false);
default:
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_REJECT.increase(1L);
@@ -483,6 +487,9 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
internalMsgBuilder.append(commitTxnResponse.getStatus().getCode());
throw new UserException("internal error, " +
internalMsgBuilder.toString());
}
+ if (is2PC && commitTxnResponse.getStatus().getCode() ==
MetaServiceCode.TXN_ALREADY_VISIBLE) {
+ throw new UserException(commitTxnResponse.getStatus().getMsg());
+ }
TransactionState txnState =
TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
TxnStateChangeCallback cb =
callbackFactory.getCallback(txnState.getCallbackId());
@@ -1103,7 +1110,39 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public Long getTransactionIdByLabel(Long dbId, String label,
List<TransactionStatus> statusList)
throws UserException {
- throw new UserException(NOT_SUPPORTED_MSG);
+ LOG.info("try to get transaction id by label, dbId:{}, label:{}",
dbId, label);
+ GetTxnIdRequest.Builder builder = GetTxnIdRequest.newBuilder();
+ builder.setDbId(dbId);
+ builder.setLabel(label);
+ builder.setCloudUniqueId(Config.cloud_unique_id);
+ for (TransactionStatus status : statusList) {
+ if (status == TransactionStatus.PREPARE) {
+ builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PREPARED);
+ } else if (status == TransactionStatus.PRECOMMITTED) {
+ builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PRECOMMITTED);
+ } else if (status == TransactionStatus.COMMITTED) {
+ builder.addTxnStatus(TxnStatusPB.TXN_STATUS_COMMITTED);
+ }
+ }
+
+ final GetTxnIdRequest getTxnIdRequest = builder.build();
+ GetTxnIdResponse getTxnIdResponse = null;
+ try {
+ LOG.info("getTxnRequest:{}", getTxnIdRequest);
+ getTxnIdResponse = MetaServiceProxy
+ .getInstance().getTxnId(getTxnIdRequest);
+ LOG.info("getTxnIdReponse: {}", getTxnIdResponse);
+ } catch (RpcException e) {
+ LOG.info("getTransactionId exception: {}", e.getMessage());
+ throw new TransactionNotFoundException("transaction not found,
label=" + label);
+ }
+
+ if (getTxnIdResponse.getStatus().getCode() != MetaServiceCode.OK) {
+ LOG.info("getTransactionState exception: {}, {}",
getTxnIdResponse.getStatus().getCode(),
+ getTxnIdResponse.getStatus().getMsg());
+ throw new TransactionNotFoundException("transaction not found,
label=" + label);
+ }
+ return getTxnIdResponse.getTxnId();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
index 7c3e7e31c72..d739f2032f3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
@@ -30,7 +30,11 @@ public class LabelAlreadyUsedException extends DdlException {
private String jobStatus;
public LabelAlreadyUsedException(String label) {
- super("Label [" + label + "] has already been used.");
+ this(label, true);
+ }
+
+ public LabelAlreadyUsedException(String msg, boolean isLabel) {
+ super(isLabel ? "Label [" + msg + "] has already been used." : msg);
}
public LabelAlreadyUsedException(TransactionState txn) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 59324a41403..0ed9b693988 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -687,6 +687,18 @@ message GetTxnResponse {
optional TxnInfoPB txn_info = 2;
}
+message GetTxnIdRequest {
+ optional string cloud_unique_id = 1; // For auth
+ optional int64 db_id = 2;
+ optional string label = 3;
+ repeated TxnStatusPB txn_status = 4;
+}
+
+message GetTxnIdResponse {
+ optional MetaServiceResponseStatus status = 1;
+ optional int64 txn_id = 2;
+}
+
message GetCurrentMaxTxnRequest {
optional string cloud_unique_id = 1; // For auth
}
@@ -1346,6 +1358,7 @@ service MetaService {
rpc get_current_max_txn_id(GetCurrentMaxTxnRequest) returns
(GetCurrentMaxTxnResponse);
rpc check_txn_conflict(CheckTxnConflictRequest) returns
(CheckTxnConflictResponse);
rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse);
+ rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);
rpc get_version(GetVersionRequest) returns (GetVersionResponse);
rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse);
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out
b/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out
new file mode 100644
index 00000000000..3e449544055
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_2pc_commit --
+1 -50 1 2 1 \N
+2 -50 1 44 1 \N
+
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy
new file mode 100644
index 00000000000..e05850f5e57
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+
+import java.text.SimpleDateFormat
+
+suite("test_stream_load_2pc", "p0") {
+ def tableName = "test_2pc_table"
+ InetSocketAddress address = context.config.feHttpInetSocketAddress
+ String user = context.config.feHttpUser
+ String password = context.config.feHttpPassword
+ String db = context.config.getDbNameByFile(context.file)
+
+ def do_streamload_2pc_commit_by_label = { label ->
+ def command = "curl -X PUT --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+ " -H label:${label}" +
+ " -H txn_operation:commit" +
+ "
http://${context.config.feHttpAddress}/api/${db}/${tableName}/_stream_load_2pc"
+ log.info("http_stream execute 2pc: ${command}")
+
+ def process = command.execute()
+ code = process.waitFor()
+ out = process.text
+ log.info("http_stream 2pc result: ${out}".toString())
+ def json2pc = parseJson(out)
+ return json2pc
+ }
+
+ def do_streamload_2pc_commit_by_txn_id = { txnId ->
+ def command = "curl -X PUT --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+ " -H txn_id:${txnId}" +
+ " -H txn_operation:commit" +
+ "
http://${context.config.feHttpAddress}/api/${db}/${tableName}/_stream_load_2pc"
+ log.info("http_stream execute 2pc: ${command}")
+
+ def process = command.execute()
+ code = process.waitFor()
+ out = process.text
+ log.info("http_stream 2pc result: ${out}".toString())
+ def json2pc = parseJson(out)
+ return json2pc
+ }
+
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` bigint(20) NULL DEFAULT "1",
+ `k2` bigint(20) NULL ,
+ `v1` tinyint(4) NULL,
+ `v2` tinyint(4) NULL,
+ `v3` tinyint(4) NULL,
+ `v4` DATETIME NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def label = UUID.randomUUID().toString().replaceAll("-", "")
+ def txnId;
+ streamLoad {
+ table "${tableName}"
+
+ set 'label', "${label}"
+ set 'column_separator', '|'
+ set 'columns', 'k1, k2, v1, v2, v3'
+ set 'two_phase_commit', 'true'
+
+ file 'test_two_phase_commit.csv'
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ txnId = json.TxnId
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+
+ set 'label', "${label}"
+ set 'column_separator', '|'
+ set 'columns', 'k1, k2, v1, v2, v3'
+ set 'two_phase_commit', 'true'
+
+ file 'test_two_phase_commit.csv'
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("label already exists", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("has already been used,
relate to txn"))
+ }
+ }
+
+ def json2pc = do_streamload_2pc_commit_by_label.call(label)
+ assertEquals("success", json2pc.status.toLowerCase())
+
+ def count = 0
+ while (true) {
+ res = sql "select count(*) from ${tableName}"
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 60) {
+ log.error("stream load commit can not visible for long time")
+ assertEquals(2, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+
+ qt_sql_2pc_commit "select * from ${tableName} order by k1"
+
+ json2pc = do_streamload_2pc_commit_by_txn_id.call(txnId)
+ assertTrue(json2pc.msg.contains("is already visible, not
pre-committed"))
+ } finally {
+ sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
+ }
+
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]