This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 64b69ed1baf [branch-2.1] Picks "[opt](merge-on-write) Skip the
alignment process of some rowsets in partial update #38487" (#38682)
64b69ed1baf is described below
commit 64b69ed1bafee30fd5f03e40a31f151cea8c4f66
Author: bobhan1 <[email protected]>
AuthorDate: Fri Aug 2 20:05:31 2024 +0800
[branch-2.1] Picks "[opt](merge-on-write) Skip the alignment process of
some rowsets in partial update #38487" (#38682)
## Proposed changes
picks https://github.com/apache/doris/pull/38487
---
be/src/olap/partial_update_info.h | 5 +-
be/src/olap/rowset/rowset.h | 1 +
be/src/olap/rowset/rowset_meta.h | 6 +
be/src/olap/rowset_builder.cpp | 11 +-
be/src/olap/rowset_builder.h | 1 +
be/src/olap/tablet.cpp | 35 +++-
be/src/olap/task/engine_publish_version_task.cpp | 14 ++
.../{ => partial_update}/concurrency_update1.csv | 0
.../{ => partial_update}/concurrency_update2.csv | 0
.../{ => partial_update}/concurrency_update3.csv | 0
...t_partial_update_column_num_fault_injection.out | 0
...tial_update_compaction_with_higher_version.out} | 10 +-
...st_partial_update_conflict_skip_compaction.out} | 10 +-
..._partial_update_publish_conflict_with_error.out | 0
.../test_partial_update_skip_compaction.out} | 10 +-
...artial_update_column_num_fault_injection.groovy | 0
...al_update_compaction_with_higher_version.groovy | 222 +++++++++++++++++++++
..._partial_update_conflict_skip_compaction.groovy | 212 ++++++++++++++++++++
...rtial_update_publish_conflict_with_error.groovy | 0
.../test_partial_update_skip_compaction.groovy | 193 ++++++++++++++++++
20 files changed, 707 insertions(+), 23 deletions(-)
diff --git a/be/src/olap/partial_update_info.h
b/be/src/olap/partial_update_info.h
index f20f9680b0b..4b62cb8f0ff 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -25,10 +25,10 @@ struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
- const std::string& auto_increment_column) {
+ const std::string& auto_increment_column, int64_t
cur_max_version = -1) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
-
+ max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
@@ -91,6 +91,7 @@ private:
public:
bool is_partial_update {false};
+ int64_t max_version_in_flush_phase {-1};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
std::vector<uint32_t> update_cids;
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 72c6c2fa29b..7677015f2e0 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -166,6 +166,7 @@ public:
bool is_segments_overlapping() const { return
rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
RowsetStatePB rowset_meta_state() const { return
rowset_meta()->rowset_state(); }
+ bool produced_by_compaction() const { return
rowset_meta()->produced_by_compaction(); }
// remove all files in this rowset
// TODO should we rename the method to remove_files() to be more specific?
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 30457d30bc6..5284deb461b 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -242,6 +242,12 @@ public:
return num_segments() > 1 && is_singleton_delta() &&
segments_overlap() != NONOVERLAPPING;
}
+ bool produced_by_compaction() const {
+ return has_version() &&
+ (start_version() < end_version() ||
+ (start_version() == end_version() && segments_overlap() ==
NONOVERLAPPING));
+ }
+
// get the compaction score of this rowset.
// if segments are overlapping, the score equals to the number of segments,
// otherwise, score is 1.
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index a1edc61e478..32bbdb246a3 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -118,7 +118,7 @@ void RowsetBuilder::_garbage_collection() {
Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>&
mow_context) {
std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock());
- int64_t cur_max_version = tablet()->max_version_unlocked().second;
+ _max_version_in_flush_phase = tablet()->max_version_unlocked().second;
std::vector<RowsetSharedPtr> rowset_ptrs;
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
if (tablet()->tablet_state() == TABLET_NOTREADY) {
@@ -130,12 +130,12 @@ Status
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
}
_rowset_ids.clear();
} else {
- RETURN_IF_ERROR(tablet()->all_rs_id(cur_max_version, &_rowset_ids));
+ RETURN_IF_ERROR(tablet()->all_rs_id(_max_version_in_flush_phase,
&_rowset_ids));
rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids);
}
_delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id());
- mow_context = std::make_shared<MowContext>(cur_max_version, _req.txn_id,
_rowset_ids,
- rowset_ptrs, _delete_bitmap);
+ mow_context = std::make_shared<MowContext>(_max_version_in_flush_phase,
_req.txn_id,
+ _rowset_ids, rowset_ptrs,
_delete_bitmap);
return Status::OK();
}
@@ -402,7 +402,8 @@ void
BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(),
table_schema_param->timezone(),
- table_schema_param->auto_increment_coulumn());
+ table_schema_param->auto_increment_coulumn(),
+ _max_version_in_flush_phase);
}
} // namespace doris
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index 8f254074c37..362e976da71 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -107,6 +107,7 @@ protected:
std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
// current rowset_ids, used to do diff in publish_version
RowsetIdUnorderedSet _rowset_ids;
+ int64_t _max_version_in_flush_phase {-1};
std::shared_ptr<PartialUpdateInfo> _partial_update_info;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 7ff4b508c97..11cb7055c7f 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3506,7 +3506,9 @@ Status Tablet::update_delete_bitmap(TabletTxnInfo*
txn_info, int64_t txn_id) {
// When the new segment flush fails or the rowset build fails, the
deletion marker for the
// duplicate key of the original segment should not remain in
`txn_info->delete_bitmap`,
// so we need to make a copy of `txn_info->delete_bitmap` and make changes
on it.
- if (txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update) {
+ bool is_partial_update =
+ txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update;
+ if (is_partial_update) {
delete_bitmap =
std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
}
@@ -3540,6 +3542,37 @@ Status Tablet::update_delete_bitmap(TabletTxnInfo*
txn_info, int64_t txn_id) {
}
auto t3 = watch.get_elapse_time_us();
+ // If a rowset is produced by compaction before the commit phase of the
partial update load
+ // and is not included in txn_info->rowset_ids, we can skip the alignment
process of that rowset
+ // because data remains the same before and after compaction. But we still
need to calculate the
+ // the delete bitmap for that rowset.
+ std::vector<RowsetSharedPtr> rowsets_skip_alignment;
+ if (is_partial_update) {
+ int64_t max_version_in_flush_phase =
+ txn_info->partial_update_info->max_version_in_flush_phase;
+ DCHECK(max_version_in_flush_phase != -1);
+ std::vector<RowsetSharedPtr> remained_rowsets;
+ for (const auto& rowset : specified_rowsets) {
+ if (rowset->end_version() <= max_version_in_flush_phase &&
+ rowset->produced_by_compaction()) {
+ rowsets_skip_alignment.emplace_back(rowset);
+ } else {
+ remained_rowsets.emplace_back(rowset);
+ }
+ }
+ if (!rowsets_skip_alignment.empty()) {
+ specified_rowsets = std::move(remained_rowsets);
+ }
+ }
+
+ if (!rowsets_skip_alignment.empty()) {
+ auto token = _engine.calc_delete_bitmap_executor()->create_token();
+ // set rowset_writer to nullptr to skip the alignment process
+ RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments,
rowsets_skip_alignment, delete_bitmap,
+ cur_version - 1, token.get(),
nullptr));
+ RETURN_IF_ERROR(token->wait());
+ }
+
auto token = _engine.calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets,
delete_bitmap,
cur_version - 1, token.get(),
rowset_writer.get()));
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 96cad7f934d..6108e81bae3 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -110,6 +110,20 @@ Status EnginePublishVersionTask::execute() {
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
+ DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.enable_spin_wait", {
+ auto token = dp->param<std::string>("token", "invalid_token");
+ while
(DebugPoints::instance()->is_enable("EnginePublishVersionTask::execute.block"))
{
+ auto block_dp = DebugPoints::instance()->get_debug_point(
+ "EnginePublishVersionTask::execute.block");
+ if (block_dp) {
+ auto pass_token = block_dp->param<std::string>("pass_token",
"");
+ if (pass_token == token) {
+ break;
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ }
+ });
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);
diff --git a/regression-test/data/fault_injection_p0/concurrency_update1.csv
b/regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv
similarity index 100%
rename from regression-test/data/fault_injection_p0/concurrency_update1.csv
rename to
regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv
diff --git a/regression-test/data/fault_injection_p0/concurrency_update2.csv
b/regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv
similarity index 100%
rename from regression-test/data/fault_injection_p0/concurrency_update2.csv
rename to
regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv
diff --git a/regression-test/data/fault_injection_p0/concurrency_update3.csv
b/regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv
similarity index 100%
rename from regression-test/data/fault_injection_p0/concurrency_update3.csv
rename to
regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv
diff --git
a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.out
similarity index 100%
copy from
regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
copy to
regression-test/data/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.out
diff --git
a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out
similarity index 63%
copy from
regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
copy to
regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out
index 3d9ecbcede0..df12f4b08e5 100644
---
a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
+++
b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out
@@ -1,11 +1,11 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
--- !select_1 --
+-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
--- !select_2 --
-1 1 1 1 1
-2 1 1 2 2
-3 3 3 3 3
+-- !sql --
+1 999 999 666 666
+2 888 888 2 2
+3 777 777 555 555
diff --git
a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out
similarity index 63%
copy from
regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
copy to
regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out
index 3d9ecbcede0..df12f4b08e5 100644
---
a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
+++
b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out
@@ -1,11 +1,11 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
--- !select_1 --
+-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
--- !select_2 --
-1 1 1 1 1
-2 1 1 2 2
-3 3 3 3 3
+-- !sql --
+1 999 999 666 666
+2 888 888 2 2
+3 777 777 555 555
diff --git
a/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out
b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out
similarity index 100%
rename from
regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out
rename to
regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out
diff --git
a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out
similarity index 66%
rename from
regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
rename to
regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out
index 3d9ecbcede0..6c7fe443a89 100644
---
a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out
+++
b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out
@@ -1,11 +1,11 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
--- !select_1 --
+-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
--- !select_2 --
-1 1 1 1 1
-2 1 1 2 2
-3 3 3 3 3
+-- !sql --
+1 999 999 1 1
+2 888 888 2 2
+3 777 777 3 3
diff --git
a/regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.groovy
similarity index 100%
rename from
regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy
rename to
regression-test/suites/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.groovy
diff --git
a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy
new file mode 100644
index 00000000000..cd3d03330d5
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") {
+
+ def table1 = "test_partial_update_compaction_with_higher_version"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int,
+ `c4` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(1,1,1,1,1);"
+ sql "insert into ${table1} values(2,2,2,2,2);"
+ sql "insert into ${table1} values(3,3,3,3,3);"
+ sql "sync;"
+ order_qt_sql "select * from ${table1};"
+
+ def beNodes = sql_return_maparray("show backends;")
+ def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0)
+ def tabletBackendId = tabletStat.BackendId
+ def tabletId = tabletStat.TabletId
+ def tabletBackend;
+ for (def be : beNodes) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+
+ def check_rs_metas = { expected_rs_meta_size, check_func ->
+ if (isCloudMode()) {
+ return
+ }
+
+ def metaUrl = sql_return_maparray("show tablets from
${table1};").get(0).MetaUrl
+ def (code, out, err) = curl("GET", metaUrl)
+ Assert.assertEquals(code, 0)
+ def jsonMeta = parseJson(out.trim())
+
+ Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size)
+ for (def meta : jsonMeta.rs_metas) {
+ int startVersion = meta.start_version
+ int endVersion = meta.end_version
+ int numSegments = meta.num_segments
+ int numRows = meta.num_rows
+ String overlapPb = meta.segments_overlap_pb
+ logger.info("[${startVersion}-${endVersion}] ${overlapPb}
${meta.num_segments} ${numRows} ${meta.rowset_id_v2}")
+ check_func(startVersion, endVersion, numSegments, numRows,
overlapPb)
+ }
+ }
+
+ check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int
numRows, String overlapPb ->
+ if (startVersion == 0) {
+ // [0-1]
+ Assert.assertEquals(endVersion, 1)
+ Assert.assertEquals(numSegments, 0)
+ } else {
+ // [2-2], [3-3], [4-4]
+ Assert.assertEquals(startVersion, endVersion)
+ Assert.assertEquals(numSegments, 1)
+ }
+ })
+
+ def enable_publish_spin_wait = { tokenName ->
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait",
[token: "${tokenName}"])
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait",
[token: "${tokenName}"])
+ }
+ }
+
+ def disable_publish_spin_wait = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+ }
+ }
+
+ def enable_block_in_publish = { passToken ->
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block",
[pass_token: "${passToken}"])
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block",
[pass_token: "${passToken}"])
+ }
+ }
+
+ def disable_block_in_publish = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+ }
+
+ try {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ // block the partial update in publish phase
+ enable_publish_spin_wait("token1")
+ enable_block_in_publish("-1")
+
+ // the first partial update load
+ def t1 = Thread.start {
+ sql "set enable_unique_key_partial_update=true;"
+ sql "sync;"
+ sql "insert into ${table1}(k1,c1,c2)
values(1,999,999),(2,888,888),(3,777,777);"
+ }
+
+ Thread.sleep(600)
+
+ // the second partial update load that conflicts with the first one
+ enable_publish_spin_wait("token2")
+ def t2 = Thread.start {
+ sql "set enable_unique_key_partial_update=true;"
+ sql "sync;"
+ sql "insert into ${table1}(k1,c3,c4)
values(1,666,666),(3,555,555);"
+ }
+
+ Thread.sleep(400)
+
+ // let the first partial update load finish
+ enable_block_in_publish("token1")
+ t1.join()
+ Thread.sleep(200)
+ check_rs_metas(5, {int startVersion, int endVersion, int numSegments,
int numRows, String overlapPb ->
+ if (startVersion == 0) {
+ // [0-1]
+ Assert.assertEquals(endVersion, 1)
+ Assert.assertEquals(numSegments, 0)
+ } else {
+ // [2-2], [3-3], [4-4], [5-5]
+ Assert.assertEquals(startVersion, endVersion)
+ Assert.assertEquals(numSegments, 1)
+ }
+ })
+
+ // trigger full compaction on tablet
+ logger.info("trigger compaction on another BE ${tabletBackend.Host}
with backendId=${tabletBackend.BackendId}")
+ def (code, out, err) = be_run_full_compaction(tabletBackend.Host,
tabletBackend.HttpPort, tabletId)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
+ Assert.assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ Assert.assertEquals("success", compactJson.status.toLowerCase())
+
+ // wait for full compaction to complete
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200,
TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(
+ {
+ (code, out, err) =
be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ Assert.assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ Assert.assertEquals("success",
compactionStatus.status.toLowerCase())
+ return !compactionStatus.run_status
+ }
+ )
+
+ check_rs_metas(1, {int startVersion, int endVersion, int numSegments,
int numRows, String overlapPb ->
+ // check the rowset produced by full compaction
+ // [0-5]
+ Assert.assertEquals(startVersion, 0)
+ Assert.assertEquals(endVersion, 5)
+ Assert.assertEquals(numRows, 3)
+ Assert.assertEquals(overlapPb, "NONOVERLAPPING")
+ })
+
+ // let the second partial update load publish
+ disable_block_in_publish()
+ t1.join()
+ Thread.sleep(300)
+
+ order_qt_sql "select * from ${table1};"
+
+ check_rs_metas(2, {int startVersion, int endVersion, int numSegments,
int numRows, String overlapPb ->
+ if (startVersion == 6) {
+ // [6-6]
+ Assert.assertEquals(endVersion, 6)
+ // checks that partial update didn't skip the alignment
process of rowsets produced by compaction and
+ // generate new segment in publish phase
+ Assert.assertEquals(numSegments, 2)
+ Assert.assertEquals(numRows, 4) // 4 = 2 + 2
+ }
+ })
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ // sql "DROP TABLE IF EXISTS ${table1};"
+}
diff --git
a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy
new file mode 100644
index 00000000000..51018d38288
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") {
+
+ def table1 = "test_partial_update_conflict_skip_compaction"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int,
+ `c4` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(1,1,1,1,1);"
+ sql "insert into ${table1} values(2,2,2,2,2);"
+ sql "insert into ${table1} values(3,3,3,3,3);"
+ sql "sync;"
+ order_qt_sql "select * from ${table1};"
+
+ def beNodes = sql_return_maparray("show backends;")
+ def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0)
+ def tabletBackendId = tabletStat.BackendId
+ def tabletId = tabletStat.TabletId
+ def tabletBackend;
+ for (def be : beNodes) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+
+ def check_rs_metas = { expected_rs_meta_size, check_func ->
+ if (isCloudMode()) {
+ return
+ }
+
+ def metaUrl = sql_return_maparray("show tablets from
${table1};").get(0).MetaUrl
+ def (code, out, err) = curl("GET", metaUrl)
+ Assert.assertEquals(code, 0)
+ def jsonMeta = parseJson(out.trim())
+
+ Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size)
+ for (def meta : jsonMeta.rs_metas) {
+ int startVersion = meta.start_version
+ int endVersion = meta.end_version
+ int numSegments = meta.num_segments
+ int numRows = meta.num_rows
+ String overlapPb = meta.segments_overlap_pb
+ logger.info("[${startVersion}-${endVersion}] ${overlapPb}
${meta.num_segments} ${numRows} ${meta.rowset_id_v2}")
+ check_func(startVersion, endVersion, numSegments, numRows,
overlapPb)
+ }
+ }
+
+ check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int
numRows, String overlapPb ->
+ if (startVersion == 0) {
+ // [0-1]
+ Assert.assertEquals(endVersion, 1)
+ Assert.assertEquals(numSegments, 0)
+ } else {
+ // [2-2], [3-3], [4-4]
+ Assert.assertEquals(startVersion, endVersion)
+ Assert.assertEquals(numSegments, 1)
+ }
+ })
+
+ def enable_publish_spin_wait = {
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+ }
+ }
+
+ def disable_publish_spin_wait = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+ }
+ }
+
+ def enable_block_in_publish = {
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+ }
+
+ def disable_block_in_publish = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+ }
+
+ try {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ // block the partial update before publish phase
+ enable_publish_spin_wait()
+ enable_block_in_publish()
+
+ // the first partial update load
+ def t1 = Thread.start {
+ sql "set enable_unique_key_partial_update=true;"
+ sql "sync;"
+ sql "insert into ${table1}(k1,c1,c2)
values(1,999,999),(2,888,888),(3,777,777);"
+ }
+
+ Thread.sleep(300)
+
+ // the second partial update load that has conflict with the first one
+ def t2 = Thread.start {
+ sql "set enable_unique_key_partial_update=true;"
+ sql "sync;"
+ sql "insert into ${table1}(k1,c3,c4)
values(1,666,666),(3,555,555);"
+ }
+
+ Thread.sleep(300)
+
+ // trigger full compaction on tablet
+ logger.info("trigger compaction on another BE ${tabletBackend.Host}
with backendId=${tabletBackend.BackendId}")
+ def (code, out, err) = be_run_full_compaction(tabletBackend.Host,
tabletBackend.HttpPort, tabletId)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
+ Assert.assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ Assert.assertEquals("success", compactJson.status.toLowerCase())
+
+ // wait for full compaction to complete
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200,
TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(
+ {
+ (code, out, err) =
be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ Assert.assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ Assert.assertEquals("success",
compactionStatus.status.toLowerCase())
+ return !compactionStatus.run_status
+ }
+ )
+
+ check_rs_metas(1, {int startVersion, int endVersion, int numSegments,
int numRows, String overlapPb ->
+ // check the rowset produced by full compaction
+ // [0-4]
+ Assert.assertEquals(startVersion, 0)
+ Assert.assertEquals(endVersion, 4)
+ Assert.assertEquals(numRows, 3)
+ Assert.assertEquals(overlapPb, "NONOVERLAPPING")
+ })
+
+ disable_block_in_publish()
+
+ t1.join()
+ t2.join()
+
+ order_qt_sql "select * from ${table1};"
+
+ check_rs_metas(3, {int startVersion, int endVersion, int numSegments,
int numRows, String overlapPb ->
+ if (startVersion == 5) {
+ // the first partial update load
+ // it should skip the alignment process of rowsets produced by
full compaction and
+ // should not generate new segment in publish phase
+ Assert.assertEquals(endVersion, 5)
+ Assert.assertEquals(numSegments, 1)
+ Assert.assertEquals(numRows, 3)
+ } else if (startVersion == 6) {
+ // the second partial update load
+ // it should skip the alignment process of rowsets produced by
full compaction and
+ // should generate new segment in publish phase for
conflicting rows with the first partial update load
+ Assert.assertEquals(endVersion, 6)
+ Assert.assertEquals(numSegments, 2)
+ Assert.assertEquals(numRows, 4) // 4 = 2 + 2
+ }
+ })
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ sql "DROP TABLE IF EXISTS ${table1};"
+}
diff --git
a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy
similarity index 100%
rename from
regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy
rename to
regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy
diff --git
a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy
new file mode 100644
index 00000000000..b665ae4c19b
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_partial_update_skip_compaction", "nonConcurrent") {
+
+ def table1 = "test_partial_update_skip_compaction"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int,
+ `c4` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(1,1,1,1,1);"
+ sql "insert into ${table1} values(2,2,2,2,2);"
+ sql "insert into ${table1} values(3,3,3,3,3);"
+ sql "sync;"
+ order_qt_sql "select * from ${table1};"
+
+ def beNodes = sql_return_maparray("show backends;")
+ def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0)
+ def tabletBackendId = tabletStat.BackendId
+ def tabletId = tabletStat.TabletId
+ def tabletBackend;
+ for (def be : beNodes) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+
+ def check_rs_metas = { expected_rs_meta_size, check_func ->
+ if (isCloudMode()) {
+ return
+ }
+
+ def metaUrl = sql_return_maparray("show tablets from
${table1};").get(0).MetaUrl
+ def (code, out, err) = curl("GET", metaUrl)
+ Assert.assertEquals(code, 0)
+ def jsonMeta = parseJson(out.trim())
+
+ Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size)
+ for (def meta : jsonMeta.rs_metas) {
+ int startVersion = meta.start_version
+ int endVersion = meta.end_version
+ int numSegments = meta.num_segments
+ int numRows = meta.num_rows
+ String overlapPb = meta.segments_overlap_pb
+ logger.info("[${startVersion}-${endVersion}] ${overlapPb}
${meta.num_segments} ${numRows} ${meta.rowset_id_v2}")
+ check_func(startVersion, endVersion, numSegments, numRows,
overlapPb)
+ }
+ }
+
+ check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int
numRows, String overlapPb ->
+ if (startVersion == 0) {
+ // [0-1]
+ Assert.assertEquals(endVersion, 1)
+ Assert.assertEquals(numSegments, 0)
+ } else {
+ // [2-2], [3-3], [4-4]
+ Assert.assertEquals(startVersion, endVersion)
+ Assert.assertEquals(numSegments, 1)
+ }
+ })
+
+ def enable_publish_spin_wait = {
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+ }
+ }
+
+ def disable_publish_spin_wait = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+ }
+ }
+
+ def enable_block_in_publish = {
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+ }
+
+ def disable_block_in_publish = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+ }
+
+ try {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ // block the partial update in publish phase
+ enable_publish_spin_wait()
+ enable_block_in_publish()
+ def t1 = Thread.start {
+ sql "set enable_unique_key_partial_update=true;"
+ sql "sync;"
+ sql "insert into ${table1}(k1,c1,c2)
values(1,999,999),(2,888,888),(3,777,777);"
+ }
+
+ Thread.sleep(500)
+
+ // trigger full compaction on tablet
+ logger.info("trigger compaction on another BE ${tabletBackend.Host}
with backendId=${tabletBackend.BackendId}")
+ def (code, out, err) = be_run_full_compaction(tabletBackend.Host,
tabletBackend.HttpPort, tabletId)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
+ Assert.assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ Assert.assertEquals("success", compactJson.status.toLowerCase())
+
+ // wait for full compaction to complete
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200,
TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(
+ {
+ (code, out, err) =
be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ Assert.assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ Assert.assertEquals("success",
compactionStatus.status.toLowerCase())
+ return !compactionStatus.run_status
+ }
+ )
+
+ check_rs_metas(1, {int startVersion, int endVersion, int numSegments,
int numRows, String overlapPb ->
+ // check the rowset produced by full compaction
+ // [0-4]
+ Assert.assertEquals(startVersion, 0)
+ Assert.assertEquals(endVersion, 4)
+ Assert.assertEquals(numRows, 3)
+ Assert.assertEquals(overlapPb, "NONOVERLAPPING")
+ })
+
+ // let the partial update load publish
+ disable_block_in_publish()
+ t1.join()
+
+ order_qt_sql "select * from ${table1};"
+
+ check_rs_metas(2, {int startVersion, int endVersion, int numSegments,
int numRows, String overlapPb ->
+ if (startVersion == 5) {
+ // [5-5]
+ Assert.assertEquals(endVersion, 5)
+ // checks that partial update skips the alignment process of
rowsets produced by compaction and
+ // doesn't generate new segment in publish phase
+ Assert.assertEquals(numSegments, 1)
+ Assert.assertEquals(numRows, 3)
+ }
+ })
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ sql "DROP TABLE IF EXISTS ${table1};"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]