This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e2f45225d6f [branch-2.1] Picks "[opt](merge-on-write) eliminate
reading the old values of non-key columns for delete stmt in publish phase
#38703" (#39074)
e2f45225d6f is described below
commit e2f45225d6fb1ab9f63e93020069e6b5b9ced0c1
Author: bobhan1 <[email protected]>
AuthorDate: Fri Aug 9 10:42:52 2024 +0800
[branch-2.1] Picks "[opt](merge-on-write) eliminate reading the old values
of non-key columns for delete stmt in publish phase #38703" (#39074)
picks https://github.com/apache/doris/pull/38703
---
be/src/olap/rowset/segment_v2/segment_writer.cpp | 10 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 10 +-
be/src/olap/tablet.cpp | 108 +++++++++++-------
be/src/olap/tablet.h | 3 +-
.../test_delete_publish_skip_read.out | 19 ++++
.../test_delete_publish_skip_read.groovy | 122 +++++++++++++++++++++
6 files changed, 222 insertions(+), 50 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 76a072644be..54c27205431 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -523,7 +523,7 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
// 1. if the delete sign is marked, it means that the value columns of
the row will not
// be read. So we don't need to read the missing values from the
previous rows.
- // 2. the one exception is when there are sequence columns in the
table, we need to read
+ // 2. the one exception is when there is sequence column in the table,
we need to read
// the sequence columns, otherwise it may cause the merge-on-read
based compaction
// policy to produce incorrect results
if (have_delete_sign && !_tablet_schema->has_sequence_col()) {
@@ -642,9 +642,9 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
auto rowset = _rsid_to_rowset[rs_it.first];
CHECK(rowset);
std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- read_index[id_and_pos.pos] = read_idx++;
+ for (auto [rid, pos] : seg_it.second) {
+ rids.emplace_back(rid);
+ read_index[pos] = read_idx++;
}
if (has_row_column) {
auto st = tablet->fetch_value_through_row_column(
@@ -698,7 +698,7 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
// fill all missing value from mutable_old_columns, need to consider
default value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
- // `use_default_or_null_flag[idx] == true` doesn't mean that we should
read values from the old row
+ // `use_default_or_null_flag[idx] == false` doesn't mean that we
should read values from the old row
// for the missing columns. For example, if a table has sequence
column, the rows with DELETE_SIGN column
// marked will not be marked in delete bitmap(see
https://github.com/apache/doris/pull/24011), so it will
// be found in Tablet::lookup_row_key() and
`use_default_or_null_flag[idx]` will be false. But we should not
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 6bc4445b626..d32e75fbebb 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -463,7 +463,7 @@ Status
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
// 1. if the delete sign is marked, it means that the value columns of
the row will not
// be read. So we don't need to read the missing values from the
previous rows.
- // 2. the one exception is when there are sequence columns in the
table, we need to read
+ // 2. the one exception is when there is sequence column in the table,
we need to read
// the sequence columns, otherwise it may cause the merge-on-read
based compaction
// policy to produce incorrect results
if (have_delete_sign && !_tablet_schema->has_sequence_col()) {
@@ -582,9 +582,9 @@ Status VerticalSegmentWriter::_fill_missing_columns(
auto rowset = _rsid_to_rowset[rs_it.first];
CHECK(rowset);
std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- read_index[id_and_pos.pos] = read_idx++;
+ for (auto [rid, pos] : seg_it.second) {
+ rids.emplace_back(rid);
+ read_index[pos] = read_idx++;
}
if (has_row_column) {
auto st = tablet->fetch_value_through_row_column(
@@ -636,7 +636,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(
// fill all missing value from mutable_old_columns, need to consider
default value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
- // `use_default_or_null_flag[idx] == true` doesn't mean that we should
read values from the old row
+ // `use_default_or_null_flag[idx] == false` doesn't mean that we
should read values from the old row
// for the missing columns. For example, if a table has sequence
column, the rows with DELETE_SIGN column
// marked will not be marked in delete bitmap(see
https://github.com/apache/doris/pull/24011), so it will
// be found in Tablet::lookup_row_key() and
`use_default_or_null_flag[idx]` will be false. But we should not
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 212b21a8174..79405b1fe0b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3235,27 +3235,56 @@ Status Tablet::generate_new_block_for_partial_update(
auto old_block = rowset_schema->create_block_by_cids(missing_cids);
auto update_block = rowset_schema->create_block_by_cids(update_cids);
- std::map<uint32_t, uint32_t> read_index_old;
- RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids,
read_plan_ori, rsid_to_rowset,
- old_block, &read_index_old));
+ auto get_delete_sign_column_data = [](vectorized::Block& block,
+ size_t rows) -> const signed char* {
+ if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+ block.try_get_by_name(DELETE_SIGN);
+ delete_sign_column != nullptr) {
+ const auto& delete_sign_col =
+ reinterpret_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+ if (delete_sign_col.size() >= rows) {
+ return delete_sign_col.get_data().data();
+ }
+ }
+ return nullptr;
+ };
+ // rowid in the final block(start from 0, increase continuously) -> rowid
to read in update_block
std::map<uint32_t, uint32_t> read_index_update;
+
+ // read current rowset first, if a row in the current rowset has delete
sign mark
+ // we don't need to read values from old block
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids,
read_plan_update,
rsid_to_rowset, update_block,
&read_index_update));
- const vectorized::Int8* delete_sign_column_data = nullptr;
- if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
- old_block.try_get_by_name(DELETE_SIGN);
- delete_sign_column != nullptr) {
- auto& delete_sign_col =
- reinterpret_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column->column));
- delete_sign_column_data = delete_sign_col.get_data().data();
+ size_t update_rows = read_index_update.size();
+ for (auto i = 0; i < update_cids.size(); ++i) {
+ for (auto idx = 0; idx < update_rows; ++idx) {
+ full_mutable_columns[update_cids[i]]->insert_from(
+
*update_block.get_columns_with_type_and_name()[i].column.get(),
+ read_index_update[idx]);
+ }
}
+ // if there is sequence column in the table, we need to read the sequence
column,
+ // otherwise it may cause the merge-on-read based compaction policy to
produce incorrect results
+ const auto* __restrict new_block_delete_signs =
+ rowset_schema->has_sequence_col()
+ ? nullptr
+ : get_delete_sign_column_data(update_block, update_rows);
+
+ // rowid in the final block(start from 0, increase, may not continuous
becasue we skip to read some rows) -> rowid to read in old_block
+ std::map<uint32_t, uint32_t> read_index_old;
+ RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids,
read_plan_ori, rsid_to_rowset,
+ old_block, &read_index_old,
new_block_delete_signs));
+ size_t old_rows = read_index_old.size();
+ const auto* __restrict old_block_delete_signs =
+ get_delete_sign_column_data(old_block, old_rows);
+
// build default value block
auto default_value_block = old_block.clone_empty();
auto mutable_default_value_columns = default_value_block.mutate_columns();
- if (delete_sign_column_data != nullptr) {
+ if (old_block_delete_signs != nullptr || new_block_delete_signs !=
nullptr) {
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& column = rowset_schema->column(missing_cids[i]);
if (column.has_default_value()) {
@@ -3268,22 +3297,26 @@ Status Tablet::generate_new_block_for_partial_update(
}
}
- // build full block
- CHECK(read_index_old.size() == read_index_update.size());
+ CHECK(update_rows >= old_rows);
+ // build full block
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& rs_column = rowset_schema->column(missing_cids[i]);
- for (auto idx = 0; idx < read_index_old.size(); ++idx) {
- // if the conflict update is a delete sign, which means that the
key is
- // not exist now, we should not read old values from the deleted
data,
- // and should use default value instead.
- // NOTE: since now we are in the publishing phase, all data is
commited
- // before, even the `strict_mode` is true (which requires partial
update
- // load job can't insert new keys), this "new" key MUST be written
into
- // the new generated segment file.
- if (delete_sign_column_data != nullptr &&
- delete_sign_column_data[read_index_old[idx]] != 0) {
- auto& mutable_column = full_mutable_columns[missing_cids[i]];
+ auto& mutable_column = full_mutable_columns[missing_cids[i]];
+ for (auto idx = 0; idx < update_rows; ++idx) {
+ // There are two cases we don't need to read values from old data:
+ // 1. if the conflicting new row's delete sign is marked,
which means the value columns
+ // of the row will not be read. So we don't need to read the
missing values from the previous rows.
+ // 2. if the conflicting old row's delete sign is marked,
which means that the key is not exist now,
+ // we should not read old values from the deleted data, and
should use default value instead.
+ // NOTE: since now we are in the publishing phase, all data is
commited
+ // before, even the `strict_mode` is true (which requires
partial update
+ // load job can't insert new keys), this "new" key MUST be
written into
+ // the new generated segment file.
+ if (new_block_delete_signs != nullptr &&
new_block_delete_signs[idx]) {
+ mutable_column->insert_default();
+ } else if (old_block_delete_signs != nullptr &&
+ old_block_delete_signs[read_index_old[idx]] != 0) {
if (rs_column.has_default_value()) {
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
} else if (rs_column.is_nullable()) {
@@ -3292,18 +3325,11 @@ Status Tablet::generate_new_block_for_partial_update(
} else {
mutable_column->insert_default();
}
- continue;
+ } else {
+ mutable_column->insert_from(
+
*old_block.get_columns_with_type_and_name()[i].column.get(),
+ read_index_old[idx]);
}
- full_mutable_columns[missing_cids[i]]->insert_from(
-
*old_block.get_columns_with_type_and_name()[i].column.get(),
- read_index_old[idx]);
- }
- }
- for (auto i = 0; i < update_cids.size(); ++i) {
- for (auto idx = 0; idx < read_index_update.size(); ++idx) {
- full_mutable_columns[update_cids[i]]->insert_from(
-
*update_block.get_columns_with_type_and_name()[i].column.get(),
- read_index_update[idx]);
}
}
output_block->set_columns(std::move(full_mutable_columns));
@@ -3318,7 +3344,8 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr
tablet_schema,
const PartialUpdateReadPlan& read_plan,
const std::map<RowsetId, RowsetSharedPtr>&
rsid_to_rowset,
vectorized::Block& block,
- std::map<uint32_t, uint32_t>* read_index) {
+ std::map<uint32_t, uint32_t>* read_index,
+ const signed char* __restrict skip_map) {
bool has_row_column = tablet_schema->store_row_column();
auto mutable_columns = block.mutate_columns();
size_t read_idx = 0;
@@ -3327,9 +3354,12 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr
tablet_schema,
auto rowset_iter = rsid_to_rowset.find(rs_it.first);
CHECK(rowset_iter != rsid_to_rowset.end());
std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- (*read_index)[id_and_pos.pos] = read_idx++;
+ for (auto [rid, pos] : seg_it.second) {
+ if (skip_map && skip_map[pos]) {
+ continue;
+ }
+ rids.emplace_back(rid);
+ (*read_index)[pos] = read_idx++;
}
if (has_row_column) {
auto st = fetch_value_through_row_column(rowset_iter->second,
*tablet_schema,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index a5330822154..b4aca7ba3cb 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -486,7 +486,8 @@ public:
const std::vector<uint32_t> cids_to_read,
const PartialUpdateReadPlan& read_plan,
const std::map<RowsetId, RowsetSharedPtr>&
rsid_to_rowset,
- vectorized::Block& block, std::map<uint32_t,
uint32_t>* read_index);
+ vectorized::Block& block, std::map<uint32_t,
uint32_t>* read_index,
+ const signed char* __restrict skip_map =
nullptr);
void prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan);
Status generate_new_block_for_partial_update(
diff --git
a/regression-test/data/fault_injection_p0/partial_update/test_delete_publish_skip_read.out
b/regression-test/data/fault_injection_p0/partial_update/test_delete_publish_skip_read.out
new file mode 100644
index 00000000000..b7ee85325a1
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/partial_update/test_delete_publish_skip_read.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1 1 1
+2 2 2 2 2
+3 3 3 3 3
+
+-- !sql --
+1 999 999 1 1
+3 777 777 3 3
+
+-- !sql --
+1 1 1 1 1 0 2
+1 999 999 1 1 0 3
+2 \N \N \N \N 1 4
+2 2 2 2 2 0 2
+2 888 888 2 2 0 3
+3 3 3 3 3 0 2
+3 777 777 3 3 0 3
+
diff --git
a/regression-test/suites/fault_injection_p0/partial_update/test_delete_publish_skip_read.groovy
b/regression-test/suites/fault_injection_p0/partial_update/test_delete_publish_skip_read.groovy
new file mode 100644
index 00000000000..65e30369f00
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/partial_update/test_delete_publish_skip_read.groovy
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_delete_publish_skip_read", "nonConcurrent") {
+
+ def table1 = "test_delete_publish_skip_read"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int,
+ `c4` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_mow_light_delete" = "false",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
+ sql "sync;"
+ order_qt_sql "select * from ${table1};"
+
+
+ def enable_publish_spin_wait = {
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+ }
+ }
+
+ def disable_publish_spin_wait = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+ }
+ }
+
+ def enable_block_in_publish = {
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+ }
+
+ def disable_block_in_publish = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+ }
+
+ try {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ // block the partial update in publish phase
+ enable_publish_spin_wait()
+ enable_block_in_publish()
+ def t1 = Thread.start {
+ sql "set enable_unique_key_partial_update=true;"
+ sql "sync;"
+ sql "insert into ${table1}(k1,c1,c2)
values(1,999,999),(2,888,888),(3,777,777);"
+ }
+
+ Thread.sleep(500)
+
+ def t2 = Thread.start {
+ sql "insert into ${table1}(k1,__DORIS_DELETE_SIGN__) values(2,1);"
+ }
+
+
+ // let the partial update load publish
+ disable_block_in_publish()
+ t1.join()
+ t2.join()
+
+ order_qt_sql "select * from ${table1};"
+
+ sql "set skip_delete_sign=true;"
+ sql "set skip_storage_engine_merge=true;"
+ sql "set skip_delete_bitmap=true;"
+ sql "set show_hidden_columns=true;"
+ sql "sync;"
+
+ order_qt_sql "select * from ${table1};"
+
+
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ sql "DROP TABLE IF EXISTS ${table1};"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]