This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6936b9b068f [fix](partial update) mishandling of exceptions in the
publish phase may result in data loss (#30366)
6936b9b068f is described below
commit 6936b9b068fc89d873e7841befb7c0829761c62e
Author: zhannngchen <[email protected]>
AuthorDate: Fri Jan 26 15:58:43 2024 +0800
[fix](partial update) mishandling of exceptions in the publish phase may
result in data loss (#30366)
---
be/src/olap/tablet.cpp | 44 +++++-
be/src/olap/tablet.h | 6 +-
be/src/olap/txn_manager.cpp | 63 +-------
be/src/olap/txn_manager.h | 40 ++++-
.../fault_injection_p0/concurrency_update1.csv | 21 +++
.../fault_injection_p0/concurrency_update2.csv | 21 +++
.../fault_injection_p0/concurrency_update3.csv | 21 +++
..._partial_update_publish_conflict_with_error.out | 47 ++++++
...rtial_update_publish_conflict_with_error.groovy | 171 +++++++++++++++++++++
9 files changed, 362 insertions(+), 72 deletions(-)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8243b4a420d..203e91eac45 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3110,16 +3110,28 @@ Status Tablet::commit_phase_update_delete_bitmap(
return Status::OK();
}
-Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
- const RowsetIdUnorderedSet& pre_rowset_ids,
- DeleteBitmapPtr delete_bitmap, int64_t
txn_id,
- RowsetWriter* rowset_writer) {
+Status Tablet::update_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id) {
SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
RowsetIdUnorderedSet rowset_ids_to_del;
+ RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
+ std::unique_ptr<RowsetWriter> rowset_writer;
+ RETURN_IF_ERROR(
+ create_transient_rowset_writer(rowset, &rowset_writer,
txn_info->partial_update_info));
+
+ DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap;
+ // Partial update might generate new segments when there is conflicts
while publish, and mark
+ // the same key in original segments as delete.
+ // When the new segment flush fails or the rowset build fails, the
deletion marker for the
+ // duplicate key of the original segment should not remain in
`txn_info->delete_bitmap`,
+ // so we need to make a copy of `txn_info->delete_bitmap` and make changes
on it.
+ if (txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update) {
+ delete_bitmap =
std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
+ }
+
OlapStopWatch watch;
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
@@ -3137,7 +3149,8 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
}
auto t2 = watch.get_elapse_time_us();
- _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add,
&rowset_ids_to_del);
+ _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids,
&rowset_ids_to_add,
+ &rowset_ids_to_del);
for (const auto& to_del : rowset_ids_to_del) {
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
}
@@ -3151,7 +3164,7 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
auto token = _engine.calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets,
delete_bitmap,
- cur_version - 1, token.get(),
rowset_writer));
+ cur_version - 1, token.get(),
rowset_writer.get()));
RETURN_IF_ERROR(token->wait());
std::stringstream ss;
@@ -3183,6 +3196,25 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
}
+ if (txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update) {
+
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail",
{
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+
LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random
failed")
+ .tag("txn_id", txn_id);
+ return Status::InternalError(
+ "debug update_delete_bitmap partial update write
rowset random failed");
+ }
+ });
+ // build rowset writer and merge transient rowset
+ RETURN_IF_ERROR(rowset_writer->flush());
+ RowsetSharedPtr transient_rowset;
+ RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
+ rowset->merge_rowset_meta(transient_rowset->rowset_meta());
+
+ // erase segment cache cause we will add a segment to rowset
+ SegmentLoader::instance()->erase_segments(rowset->rowset_id(),
rowset->num_segments());
+ }
+
// update version without write lock, compaction and publish_txn
// will update delete bitmap, handle compaction with _rowset_update_lock
// and publish_txn runs sequential so no need to lock here
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d0de962c568..46f8ccfd5a5 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -70,6 +70,7 @@ class TupleDescriptor;
class CalcDeleteBitmapToken;
enum CompressKind : int;
class RowsetBinlogMetasPB;
+struct TabletTxnInfo;
namespace io {
class RemoteFileSystem;
@@ -451,10 +452,7 @@ public:
const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t
txn_id,
CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer =
nullptr);
- Status update_delete_bitmap(const RowsetSharedPtr& rowset,
- const RowsetIdUnorderedSet& pre_rowset_ids,
- DeleteBitmapPtr delete_bitmap, int64_t txn_id,
- RowsetWriter* rowset_writer = nullptr);
+ Status update_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id);
void calc_compaction_output_rowset_delete_bitmap(
const std::vector<RowsetSharedPtr>& input_rowsets,
const RowIdConversion& rowid_conversion, uint64_t start_version,
uint64_t end_version,
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 7a82ef4c6ae..8e0a2439712 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -63,46 +63,6 @@ using std::vector;
namespace doris {
using namespace ErrorCode;
-struct TabletTxnInfo {
- PUniqueId load_id;
- RowsetSharedPtr rowset;
- PendingRowsetGuard pending_rs_guard;
- bool unique_key_merge_on_write {false};
- DeleteBitmapPtr delete_bitmap;
- // records rowsets calc in commit txn
- RowsetIdUnorderedSet rowset_ids;
- int64_t creation_time;
- bool ingest {false};
- std::shared_ptr<PartialUpdateInfo> partial_update_info;
- TxnState state {TxnState::PREPARED};
-
- TabletTxnInfo() = default;
-
- TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
- : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
-
- TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
- : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()),
ingest(ingest_arg) {}
-
- TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool
merge_on_write,
- DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet&
ids)
- : load_id(load_id),
- rowset(rowset),
- unique_key_merge_on_write(merge_on_write),
- delete_bitmap(delete_bitmap),
- rowset_ids(ids),
- creation_time(UnixSeconds()) {}
-
- void prepare() { state = TxnState::PREPARED; }
- void commit() { state = TxnState::COMMITTED; }
- void rollback() { state = TxnState::ROLLEDBACK; }
- void abort() {
- if (state == TxnState::PREPARED) {
- state = TxnState::ABORTED;
- }
- }
-};
-
TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size,
int32_t txn_shard_size)
: _engine(engine),
_txn_map_shard_size(txn_map_shard_size),
@@ -521,33 +481,14 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
});
// update delete_bitmap
if (tablet_txn_info->unique_key_merge_on_write) {
- std::unique_ptr<RowsetWriter> rowset_writer;
- RETURN_IF_ERROR(tablet->create_transient_rowset_writer(
- rowset, &rowset_writer, tablet_txn_info->partial_update_info));
-
int64_t t2 = MonotonicMicros();
- RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset,
tablet_txn_info->rowset_ids,
-
tablet_txn_info->delete_bitmap, transaction_id,
- rowset_writer.get()));
+ RETURN_IF_ERROR(tablet->update_delete_bitmap(tablet_txn_info.get(),
transaction_id));
int64_t t3 = MonotonicMicros();
stats->calc_delete_bitmap_time_us = t3 - t2;
- if (tablet_txn_info->partial_update_info &&
- tablet_txn_info->partial_update_info->is_partial_update) {
- // build rowset writer and merge transient rowset
- RETURN_IF_ERROR(rowset_writer->flush());
- RowsetSharedPtr transient_rowset;
- RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
- rowset->merge_rowset_meta(transient_rowset->rowset_meta());
-
- // erase segment cache cause we will add a segment to rowset
- SegmentLoader::instance()->erase_segments(rowset->rowset_id(),
rowset->num_segments());
- }
- stats->partial_update_write_segment_us = MonotonicMicros() - t3;
- int64_t t4 = MonotonicMicros();
RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap(
tablet->data_dir(), tablet->tablet_id(),
tablet_txn_info->delete_bitmap,
version.second));
- stats->save_meta_time_us = MonotonicMicros() - t4;
+ stats->save_meta_time_us = MonotonicMicros() - t3;
}
/// Step 3: add to binlog
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index f7e67d0a462..e33958a66aa 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -62,7 +62,45 @@ enum class TxnState {
DELETED = 5,
};
-struct TabletTxnInfo;
+struct TabletTxnInfo {
+ PUniqueId load_id;
+ RowsetSharedPtr rowset;
+ PendingRowsetGuard pending_rs_guard;
+ bool unique_key_merge_on_write {false};
+ DeleteBitmapPtr delete_bitmap;
+ // records rowsets calc in commit txn
+ RowsetIdUnorderedSet rowset_ids;
+ int64_t creation_time;
+ bool ingest {false};
+ std::shared_ptr<PartialUpdateInfo> partial_update_info;
+ TxnState state {TxnState::PREPARED};
+
+ TabletTxnInfo() = default;
+
+ TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
+ : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
+
+ TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
+ : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()),
ingest(ingest_arg) {}
+
+ TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool
merge_on_write,
+ DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet&
ids)
+ : load_id(load_id),
+ rowset(rowset),
+ unique_key_merge_on_write(merge_on_write),
+ delete_bitmap(delete_bitmap),
+ rowset_ids(ids),
+ creation_time(UnixSeconds()) {}
+
+ void prepare() { state = TxnState::PREPARED; }
+ void commit() { state = TxnState::COMMITTED; }
+ void rollback() { state = TxnState::ROLLEDBACK; }
+ void abort() {
+ if (state == TxnState::PREPARED) {
+ state = TxnState::ABORTED;
+ }
+ }
+};
struct CommitTabletTxnInfo {
TTransactionId transaction_id {0};
diff --git a/regression-test/data/fault_injection_p0/concurrency_update1.csv
b/regression-test/data/fault_injection_p0/concurrency_update1.csv
new file mode 100644
index 00000000000..5bc6c8de802
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/concurrency_update1.csv
@@ -0,0 +1,21 @@
+0
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
diff --git a/regression-test/data/fault_injection_p0/concurrency_update2.csv
b/regression-test/data/fault_injection_p0/concurrency_update2.csv
new file mode 100644
index 00000000000..23f43edc585
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/concurrency_update2.csv
@@ -0,0 +1,21 @@
+0,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+1,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+2,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+3,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+4,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+5,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+6,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+7,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+8,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+9,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+10,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+12,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+13,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+14,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+15,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+16,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+17,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+18,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+19,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+20,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
diff --git a/regression-test/data/fault_injection_p0/concurrency_update3.csv
b/regression-test/data/fault_injection_p0/concurrency_update3.csv
new file mode 100644
index 00000000000..e2dd51a2dcc
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/concurrency_update3.csv
@@ -0,0 +1,21 @@
+0,b0
+1,b1
+2,b2
+3,b3
+4,b4
+5,b5
+6,b6
+7,b7
+8,b8
+9,b9
+10,b10
+11,b11
+12,b12
+13,b13
+14,b14
+15,b15
+16,b16
+17,b17
+18,b18
+19,b19
+20,b20
diff --git
a/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out
b/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out
new file mode 100644
index 00000000000..4e4d3f19f6e
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out
@@ -0,0 +1,47 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+0 \N \N \N \N \N
+1 \N \N \N \N \N
+10 \N \N \N \N \N
+11 \N \N \N \N \N
+12 \N \N \N \N \N
+13 \N \N \N \N \N
+14 \N \N \N \N \N
+15 \N \N \N \N \N
+16 \N \N \N \N \N
+17 \N \N \N \N \N
+18 \N \N \N \N \N
+19 \N \N \N \N \N
+2 \N \N \N \N \N
+20 \N \N \N \N \N
+3 \N \N \N \N \N
+4 \N \N \N \N \N
+5 \N \N \N \N \N
+6 \N \N \N \N \N
+7 \N \N \N \N \N
+8 \N \N \N \N \N
+9 \N \N \N \N \N
+
+-- !sql --
+0 aaaaaaaaaa b0 \N \N \N
+1 aaaaaaaaaa b1 \N \N \N
+10 aaaaaaaaaa b10 \N \N \N
+11 aaaaaaaaaa b11 \N \N \N
+12 aaaaaaaaaa b12 \N \N \N
+13 aaaaaaaaaa b13 \N \N \N
+14 aaaaaaaaaa b14 \N \N \N
+15 aaaaaaaaaa b15 \N \N \N
+16 aaaaaaaaaa b16 \N \N \N
+17 aaaaaaaaaa b17 \N \N \N
+18 aaaaaaaaaa b18 \N \N \N
+19 aaaaaaaaaa b19 \N \N \N
+2 aaaaaaaaaa b2 \N \N \N
+20 aaaaaaaaaa b20 \N \N \N
+3 aaaaaaaaaa b3 \N \N \N
+4 aaaaaaaaaa b4 \N \N \N
+5 aaaaaaaaaa b5 \N \N \N
+6 aaaaaaaaaa b6 \N \N \N
+7 aaaaaaaaaa b7 \N \N \N
+8 aaaaaaaaaa b8 \N \N \N
+9 aaaaaaaaaa b9 \N \N \N
+
diff --git
a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy
b/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy
new file mode 100644
index 00000000000..29b5bcaaed6
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy
@@ -0,0 +1,171 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+
+suite("test_partial_update_publish_conflict_with_error", "nonConcurrent") {
+ def dbName = context.config.getDbNameByFile(context.file)
+ def tableName = "test_partial_update_publish_conflict_with_error"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE ${tableName} (
+ k1 varchar(20) not null,
+ v1 varchar(20),
+ v2 varchar(20),
+ v3 varchar(20),
+ v4 varchar(20),
+ v5 varchar(20))
+ UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES(
+ "replication_num" = "1",
+ "light_schema_change" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true")"""
+
+ // base data
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'columns', "k1"
+
+ file 'concurrency_update1.csv'
+ time 10000 // limit inflight 10s
+ }
+ sql "sync;"
+ qt_sql """ select * from ${tableName} order by k1;"""
+
+ // NOTE: use streamload 2pc to construct the conflict of publish
+ def do_streamload_2pc_commit = { txnId ->
+ def command = "curl -X PUT --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+ " -H txn_id:${txnId}" +
+ " -H txn_operation:commit" +
+ "
http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc"
+ log.info("http_stream execute 2pc: ${command}")
+
+ def process = command.execute()
+ code = process.waitFor()
+ out = process.text
+ json2pc = parseJson(out)
+ log.info("http_stream 2pc result: ${out}".toString())
+ assertEquals(code, 0)
+ assertEquals("success", json2pc.status.toLowerCase())
+ }
+
+ def wait_for_publish = {txnId, waitSecond ->
+ String st = "PREPARE"
+ while (!st.equalsIgnoreCase("VISIBLE") &&
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+ Thread.sleep(1000)
+ waitSecond -= 1
+ def result = sql_return_maparray "show transaction from ${dbName}
where id = ${txnId}"
+ assertNotNull(result)
+ st = result[0].TransactionStatus
+ }
+ log.info("Stream load with txn ${txnId} is ${st}")
+ assertEquals(st, "VISIBLE")
+ }
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ def dbug_point =
'Tablet.update_delete_bitmap.partial_update_write_rowset_fail'
+
+ // concurrent load 1
+ String txnId1
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'k1,tmp,v1=substr(tmp,1,10)'
+ set 'strict_mode', "false"
+ set 'two_phase_commit', 'true'
+ file 'concurrency_update2.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ txnId1 = json.TxnId
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+
+ String txnId2
+ // concurrent load 2
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'k1,v2'
+ set 'strict_mode', "false"
+ set 'two_phase_commit', 'true'
+ file 'concurrency_update3.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ txnId2 = json.TxnId
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ sql "sync;"
+
+ // complete load 1 first
+ do_streamload_2pc_commit(txnId1)
+ wait_for_publish(txnId1, 10)
+
+ // inject failure on publish
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(dbug_point, [percent : 1.0])
+ do_streamload_2pc_commit(txnId2)
+ sleep(3000)
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(dbug_point)
+ }
+ // publish will retry until success
+ // FE retry may take logger time, wait for 20 secs
+ wait_for_publish(txnId2, 20)
+
+ sql "sync;"
+ qt_sql """ select * from ${tableName} order by k1;"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]