This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e2f45225d6f [branch-2.1] Picks "[opt](merge-on-write) eliminate 
reading the old values of non-key columns for delete stmt in publish phase 
#38703" (#39074)
e2f45225d6f is described below

commit e2f45225d6fb1ab9f63e93020069e6b5b9ced0c1
Author: bobhan1 <[email protected]>
AuthorDate: Fri Aug 9 10:42:52 2024 +0800

    [branch-2.1] Picks "[opt](merge-on-write) eliminate reading the old values 
of non-key columns for delete stmt in publish phase #38703" (#39074)
    
    picks https://github.com/apache/doris/pull/38703
---
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  10 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  10 +-
 be/src/olap/tablet.cpp                             | 108 +++++++++++-------
 be/src/olap/tablet.h                               |   3 +-
 .../test_delete_publish_skip_read.out              |  19 ++++
 .../test_delete_publish_skip_read.groovy           | 122 +++++++++++++++++++++
 6 files changed, 222 insertions(+), 50 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 76a072644be..54c27205431 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -523,7 +523,7 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
 
         // 1. if the delete sign is marked, it means that the value columns of 
the row will not
         //    be read. So we don't need to read the missing values from the 
previous rows.
-        // 2. the one exception is when there are sequence columns in the 
table, we need to read
+        // 2. the one exception is when there is sequence column in the table, 
we need to read
         //    the sequence columns, otherwise it may cause the merge-on-read 
based compaction
         //    policy to produce incorrect results
         if (have_delete_sign && !_tablet_schema->has_sequence_col()) {
@@ -642,9 +642,9 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
             auto rowset = _rsid_to_rowset[rs_it.first];
             CHECK(rowset);
             std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                read_index[id_and_pos.pos] = read_idx++;
+            for (auto [rid, pos] : seg_it.second) {
+                rids.emplace_back(rid);
+                read_index[pos] = read_idx++;
             }
             if (has_row_column) {
                 auto st = tablet->fetch_value_through_row_column(
@@ -698,7 +698,7 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
 
     // fill all missing value from mutable_old_columns, need to consider 
default value and null value
     for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
-        // `use_default_or_null_flag[idx] == true` doesn't mean that we should 
read values from the old row
+        // `use_default_or_null_flag[idx] == false` doesn't mean that we 
should read values from the old row
         // for the missing columns. For example, if a table has sequence 
column, the rows with DELETE_SIGN column
         // marked will not be marked in delete bitmap(see 
https://github.com/apache/doris/pull/24011), so it will
         // be found in Tablet::lookup_row_key() and 
`use_default_or_null_flag[idx]` will be false. But we should not
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 6bc4445b626..d32e75fbebb 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -463,7 +463,7 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
 
         // 1. if the delete sign is marked, it means that the value columns of 
the row will not
         //    be read. So we don't need to read the missing values from the 
previous rows.
-        // 2. the one exception is when there are sequence columns in the 
table, we need to read
+        // 2. the one exception is when there is sequence column in the table, 
we need to read
         //    the sequence columns, otherwise it may cause the merge-on-read 
based compaction
         //    policy to produce incorrect results
         if (have_delete_sign && !_tablet_schema->has_sequence_col()) {
@@ -582,9 +582,9 @@ Status VerticalSegmentWriter::_fill_missing_columns(
             auto rowset = _rsid_to_rowset[rs_it.first];
             CHECK(rowset);
             std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                read_index[id_and_pos.pos] = read_idx++;
+            for (auto [rid, pos] : seg_it.second) {
+                rids.emplace_back(rid);
+                read_index[pos] = read_idx++;
             }
             if (has_row_column) {
                 auto st = tablet->fetch_value_through_row_column(
@@ -636,7 +636,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(
 
     // fill all missing value from mutable_old_columns, need to consider 
default value and null value
     for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
-        // `use_default_or_null_flag[idx] == true` doesn't mean that we should 
read values from the old row
+        // `use_default_or_null_flag[idx] == false` doesn't mean that we 
should read values from the old row
         // for the missing columns. For example, if a table has sequence 
column, the rows with DELETE_SIGN column
         // marked will not be marked in delete bitmap(see 
https://github.com/apache/doris/pull/24011), so it will
         // be found in Tablet::lookup_row_key() and 
`use_default_or_null_flag[idx]` will be false. But we should not
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 212b21a8174..79405b1fe0b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3235,27 +3235,56 @@ Status Tablet::generate_new_block_for_partial_update(
     auto old_block = rowset_schema->create_block_by_cids(missing_cids);
     auto update_block = rowset_schema->create_block_by_cids(update_cids);
 
-    std::map<uint32_t, uint32_t> read_index_old;
-    RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, 
read_plan_ori, rsid_to_rowset,
-                                         old_block, &read_index_old));
+    auto get_delete_sign_column_data = [](vectorized::Block& block,
+                                          size_t rows) -> const signed char* {
+        if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+                    block.try_get_by_name(DELETE_SIGN);
+            delete_sign_column != nullptr) {
+            const auto& delete_sign_col =
+                    reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+            if (delete_sign_col.size() >= rows) {
+                return delete_sign_col.get_data().data();
+            }
+        }
+        return nullptr;
+    };
 
+    // rowid in the final block(start from 0, increase continuously) -> rowid 
to read in update_block
     std::map<uint32_t, uint32_t> read_index_update;
+
+    // read current rowset first, if a row in the current rowset has delete 
sign mark
+    // we don't need to read values from old block
     RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, 
read_plan_update,
                                          rsid_to_rowset, update_block, 
&read_index_update));
 
-    const vectorized::Int8* delete_sign_column_data = nullptr;
-    if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
-                old_block.try_get_by_name(DELETE_SIGN);
-        delete_sign_column != nullptr) {
-        auto& delete_sign_col =
-                reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
-        delete_sign_column_data = delete_sign_col.get_data().data();
+    size_t update_rows = read_index_update.size();
+    for (auto i = 0; i < update_cids.size(); ++i) {
+        for (auto idx = 0; idx < update_rows; ++idx) {
+            full_mutable_columns[update_cids[i]]->insert_from(
+                    
*update_block.get_columns_with_type_and_name()[i].column.get(),
+                    read_index_update[idx]);
+        }
     }
 
+    // if there is sequence column in the table, we need to read the sequence 
column,
+    // otherwise it may cause the merge-on-read based compaction policy to 
produce incorrect results
+    const auto* __restrict new_block_delete_signs =
+            rowset_schema->has_sequence_col()
+                    ? nullptr
+                    : get_delete_sign_column_data(update_block, update_rows);
+
+    // rowid in the final block(start from 0, increase, may not continuous 
becasue we skip to read some rows) -> rowid to read in old_block
+    std::map<uint32_t, uint32_t> read_index_old;
+    RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, 
read_plan_ori, rsid_to_rowset,
+                                         old_block, &read_index_old, 
new_block_delete_signs));
+    size_t old_rows = read_index_old.size();
+    const auto* __restrict old_block_delete_signs =
+            get_delete_sign_column_data(old_block, old_rows);
+
     // build default value block
     auto default_value_block = old_block.clone_empty();
     auto mutable_default_value_columns = default_value_block.mutate_columns();
-    if (delete_sign_column_data != nullptr) {
+    if (old_block_delete_signs != nullptr || new_block_delete_signs != 
nullptr) {
         for (auto i = 0; i < missing_cids.size(); ++i) {
             const auto& column = rowset_schema->column(missing_cids[i]);
             if (column.has_default_value()) {
@@ -3268,22 +3297,26 @@ Status Tablet::generate_new_block_for_partial_update(
         }
     }
 
-    // build full block
-    CHECK(read_index_old.size() == read_index_update.size());
+    CHECK(update_rows >= old_rows);
 
+    // build full block
     for (auto i = 0; i < missing_cids.size(); ++i) {
         const auto& rs_column = rowset_schema->column(missing_cids[i]);
-        for (auto idx = 0; idx < read_index_old.size(); ++idx) {
-            // if the conflict update is a delete sign, which means that the 
key is
-            // not exist now, we should not read old values from the deleted 
data,
-            // and should use default value instead.
-            // NOTE: since now we are in the publishing phase, all data is 
commited
-            // before, even the `strict_mode` is true (which requires partial 
update
-            // load job can't insert new keys), this "new" key MUST be written 
into
-            // the new generated segment file.
-            if (delete_sign_column_data != nullptr &&
-                delete_sign_column_data[read_index_old[idx]] != 0) {
-                auto& mutable_column = full_mutable_columns[missing_cids[i]];
+        auto& mutable_column = full_mutable_columns[missing_cids[i]];
+        for (auto idx = 0; idx < update_rows; ++idx) {
+            // There are two cases we don't need to read values from old data:
+            //     1. if the conflicting new row's delete sign is marked, 
which means the value columns
+            //     of the row will not be read. So we don't need to read the 
missing values from the previous rows.
+            //     2. if the conflicting old row's delete sign is marked, 
which means that the key is not exist now,
+            //     we should not read old values from the deleted data, and 
should use default value instead.
+            //     NOTE: since now we are in the publishing phase, all data is 
commited
+            //         before, even the `strict_mode` is true (which requires 
partial update
+            //         load job can't insert new keys), this "new" key MUST be 
written into
+            //         the new generated segment file.
+            if (new_block_delete_signs != nullptr && 
new_block_delete_signs[idx]) {
+                mutable_column->insert_default();
+            } else if (old_block_delete_signs != nullptr &&
+                       old_block_delete_signs[read_index_old[idx]] != 0) {
                 if (rs_column.has_default_value()) {
                     
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
                 } else if (rs_column.is_nullable()) {
@@ -3292,18 +3325,11 @@ Status Tablet::generate_new_block_for_partial_update(
                 } else {
                     mutable_column->insert_default();
                 }
-                continue;
+            } else {
+                mutable_column->insert_from(
+                        
*old_block.get_columns_with_type_and_name()[i].column.get(),
+                        read_index_old[idx]);
             }
-            full_mutable_columns[missing_cids[i]]->insert_from(
-                    
*old_block.get_columns_with_type_and_name()[i].column.get(),
-                    read_index_old[idx]);
-        }
-    }
-    for (auto i = 0; i < update_cids.size(); ++i) {
-        for (auto idx = 0; idx < read_index_update.size(); ++idx) {
-            full_mutable_columns[update_cids[i]]->insert_from(
-                    
*update_block.get_columns_with_type_and_name()[i].column.get(),
-                    read_index_update[idx]);
         }
     }
     output_block->set_columns(std::move(full_mutable_columns));
@@ -3318,7 +3344,8 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr 
tablet_schema,
                                     const PartialUpdateReadPlan& read_plan,
                                     const std::map<RowsetId, RowsetSharedPtr>& 
rsid_to_rowset,
                                     vectorized::Block& block,
-                                    std::map<uint32_t, uint32_t>* read_index) {
+                                    std::map<uint32_t, uint32_t>* read_index,
+                                    const signed char* __restrict skip_map) {
     bool has_row_column = tablet_schema->store_row_column();
     auto mutable_columns = block.mutate_columns();
     size_t read_idx = 0;
@@ -3327,9 +3354,12 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr 
tablet_schema,
             auto rowset_iter = rsid_to_rowset.find(rs_it.first);
             CHECK(rowset_iter != rsid_to_rowset.end());
             std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                (*read_index)[id_and_pos.pos] = read_idx++;
+            for (auto [rid, pos] : seg_it.second) {
+                if (skip_map && skip_map[pos]) {
+                    continue;
+                }
+                rids.emplace_back(rid);
+                (*read_index)[pos] = read_idx++;
             }
             if (has_row_column) {
                 auto st = fetch_value_through_row_column(rowset_iter->second, 
*tablet_schema,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index a5330822154..b4aca7ba3cb 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -486,7 +486,8 @@ public:
                                 const std::vector<uint32_t> cids_to_read,
                                 const PartialUpdateReadPlan& read_plan,
                                 const std::map<RowsetId, RowsetSharedPtr>& 
rsid_to_rowset,
-                                vectorized::Block& block, std::map<uint32_t, 
uint32_t>* read_index);
+                                vectorized::Block& block, std::map<uint32_t, 
uint32_t>* read_index,
+                                const signed char* __restrict skip_map = 
nullptr);
     void prepare_to_read(const RowLocation& row_location, size_t pos,
                          PartialUpdateReadPlan* read_plan);
     Status generate_new_block_for_partial_update(
diff --git 
a/regression-test/data/fault_injection_p0/partial_update/test_delete_publish_skip_read.out
 
b/regression-test/data/fault_injection_p0/partial_update/test_delete_publish_skip_read.out
new file mode 100644
index 00000000000..b7ee85325a1
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/partial_update/test_delete_publish_skip_read.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+
+-- !sql --
+1      999     999     1       1
+3      777     777     3       3
+
+-- !sql --
+1      1       1       1       1       0       2
+1      999     999     1       1       0       3
+2      \N      \N      \N      \N      1       4
+2      2       2       2       2       0       2
+2      888     888     2       2       0       3
+3      3       3       3       3       0       2
+3      777     777     3       3       0       3
+
diff --git 
a/regression-test/suites/fault_injection_p0/partial_update/test_delete_publish_skip_read.groovy
 
b/regression-test/suites/fault_injection_p0/partial_update/test_delete_publish_skip_read.groovy
new file mode 100644
index 00000000000..65e30369f00
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/partial_update/test_delete_publish_skip_read.groovy
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_delete_publish_skip_read", "nonConcurrent") {
+
+    def table1 = "test_delete_publish_skip_read"
+    sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+    sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+            `k1` int NOT NULL,
+            `c1` int,
+            `c2` int,
+            `c3` int,
+            `c4` int
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_mow_light_delete" = "false",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
+    sql "sync;"
+    order_qt_sql "select * from ${table1};"
+
+
+    def enable_publish_spin_wait = {
+        if (isCloudMode()) {
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+        } else {
+            
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+        }
+    }
+
+    def disable_publish_spin_wait = {
+        if (isCloudMode()) {
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+        } else {
+            
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+        }
+    }
+
+    def enable_block_in_publish = {
+        if (isCloudMode()) {
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+        } else {
+            
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+        }
+    }
+
+    def disable_block_in_publish = {
+        if (isCloudMode()) {
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+        } else {
+            
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+        }
+    }
+
+    try {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        // block the partial update in publish phase
+        enable_publish_spin_wait()
+        enable_block_in_publish()
+        def t1 = Thread.start {
+            sql "set enable_unique_key_partial_update=true;"
+            sql "sync;"
+            sql "insert into ${table1}(k1,c1,c2) 
values(1,999,999),(2,888,888),(3,777,777);"
+        }
+
+        Thread.sleep(500)
+
+        def t2 = Thread.start {
+            sql "insert into ${table1}(k1,__DORIS_DELETE_SIGN__) values(2,1);"
+        }
+
+
+        // let the partial update load publish
+        disable_block_in_publish()
+        t1.join()
+        t2.join()
+
+        order_qt_sql "select * from ${table1};"
+
+        sql "set skip_delete_sign=true;"
+        sql "set skip_storage_engine_merge=true;"
+        sql "set skip_delete_bitmap=true;"
+        sql "set show_hidden_columns=true;"
+        sql "sync;"
+
+        order_qt_sql "select * from ${table1};"
+
+
+
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+        throw e
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+
+    sql "DROP TABLE IF EXISTS ${table1};"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to