This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ffd7f8f4280 [fix](partial update) fix some bugs about delete sign 
(#25712) (#25860)
ffd7f8f4280 is described below

commit ffd7f8f428066fc222aab2cb1dc3ef0fbf525d47
Author: zhannngchen <[email protected]>
AuthorDate: Thu Oct 26 10:43:31 2023 +0800

    [fix](partial update) fix some bugs about delete sign (#25712) (#25860)
---
 be/src/olap/calc_delete_bitmap_executor.cpp        | 21 +-----
 be/src/olap/calc_delete_bitmap_executor.h          |  7 +-
 be/src/olap/delta_writer.cpp                       |  1 -
 be/src/olap/rowset/segment_v2/segment_writer.cpp   | 20 +++--
 be/src/olap/tablet.cpp                             | 23 +++---
 be/src/service/backend_service.cpp                 |  1 -
 .../partial_update/delete_sign.csv                 |  3 +-
 ...elete_sign.csv => partial_update_parallel4.csv} |  2 +-
 .../test_partial_update_delete_sign.out            |  3 +
 .../test_partial_update_parallel.groovy            | 87 ++++++++++++++++++++++
 10 files changed, 119 insertions(+), 49 deletions(-)

diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp 
b/be/src/olap/calc_delete_bitmap_executor.cpp
index 51e77a36280..f2e3f8e3a00 100644
--- a/be/src/olap/calc_delete_bitmap_executor.cpp
+++ b/be/src/olap/calc_delete_bitmap_executor.cpp
@@ -33,20 +33,16 @@ using namespace ErrorCode;
 Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr 
cur_rowset,
                                      const segment_v2::SegmentSharedPtr& 
cur_segment,
                                      const std::vector<RowsetSharedPtr>& 
target_rowsets,
-                                     int64_t end_version, RowsetWriter* 
rowset_writer) {
+                                     int64_t end_version, DeleteBitmapPtr 
delete_bitmap,
+                                     RowsetWriter* rowset_writer) {
     {
         std::shared_lock rlock(_lock);
         RETURN_IF_ERROR(_status);
     }
 
-    DeleteBitmapPtr bitmap = 
std::make_shared<DeleteBitmap>(tablet->tablet_id());
-    {
-        std::lock_guard wlock(_lock);
-        _delete_bitmaps.push_back(bitmap);
-    }
     return _thread_token->submit_func([=, this]() {
         auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, 
target_rowsets,
-                                                     bitmap, end_version, 
rowset_writer);
+                                                     delete_bitmap, 
end_version, rowset_writer);
         if (!st.ok()) {
             LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: "
                          << tablet->tablet_id() << " rowset: " << 
cur_rowset->rowset_id()
@@ -66,17 +62,6 @@ Status CalcDeleteBitmapToken::wait() {
     return _status;
 }
 
-Status CalcDeleteBitmapToken::get_delete_bitmap(DeleteBitmapPtr res_bitmap) {
-    std::lock_guard wlock(_lock);
-    RETURN_IF_ERROR(_status);
-
-    for (auto bitmap : _delete_bitmaps) {
-        res_bitmap->merge(*bitmap);
-    }
-    _delete_bitmaps.clear();
-    return Status::OK();
-}
-
 void CalcDeleteBitmapExecutor::init() {
     ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool")
             .set_min_threads(1)
diff --git a/be/src/olap/calc_delete_bitmap_executor.h 
b/be/src/olap/calc_delete_bitmap_executor.h
index d2c392a04d4..ec088373b38 100644
--- a/be/src/olap/calc_delete_bitmap_executor.h
+++ b/be/src/olap/calc_delete_bitmap_executor.h
@@ -43,7 +43,6 @@ using TabletSharedPtr = std::shared_ptr<Tablet>;
 // 1. create a token
 // 2. submit delete bitmap calculate tasks
 // 3. wait all tasks complete
-// 4. call `get_delete_bitmap()` to get the result of all tasks
 class CalcDeleteBitmapToken {
 public:
     explicit CalcDeleteBitmapToken(std::unique_ptr<ThreadPoolToken> 
thread_token)
@@ -52,21 +51,17 @@ public:
     Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
                   const segment_v2::SegmentSharedPtr& cur_segment,
                   const std::vector<RowsetSharedPtr>& target_rowsets, int64_t 
end_version,
-                  RowsetWriter* rowset_writer);
+                  DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer);
 
     // wait all tasks in token to be completed.
     Status wait();
 
     void cancel() { _thread_token->shutdown(); }
 
-    Status get_delete_bitmap(DeleteBitmapPtr res_bitmap);
-
 private:
     std::unique_ptr<ThreadPoolToken> _thread_token;
 
     std::shared_mutex _lock;
-    std::vector<DeleteBitmapPtr> _delete_bitmaps;
-
     // Records the current status of the calc delete bitmap job.
     // Note: Once its value is set to Failed, it cannot return to SUCCESS.
     Status _status;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 3634e19c68c..e1a421f1416 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -493,7 +493,6 @@ Status DeltaWriter::wait_calc_delete_bitmap() {
     }
     std::lock_guard<std::mutex> l(_lock);
     RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
-    
RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap));
     LOG(INFO) << "Got result of calc delete bitmap task from executor, 
tablet_id: "
               << _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
     return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 47387eceec2..add9e7f8697 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -427,6 +427,17 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
         }
         _maybe_invalid_row_cache(key);
 
+        // mark key with delete sign as deleted.
+        bool have_delete_sign =
+                (delete_sign_column_data != nullptr && 
delete_sign_column_data[block_pos] != 0);
+        if (have_delete_sign && !_tablet_schema->has_sequence_col() && 
!have_input_seq_column) {
+            // we can directly use delete bitmap to mark the rows with delete 
sign as deleted
+            // if sequence column doesn't exist to eliminate reading delete 
sign columns in later reads
+            _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, 
_segment_id,
+                                              
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
+                                             segment_pos);
+        }
+
         RowLocation loc;
         // save rowset shared ptr so this rowset wouldn't delete
         RowsetSharedPtr rowset;
@@ -458,16 +469,9 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
         // if the delete sign is marked, it means that the value columns of 
the row
         // will not be read. So we don't need to read the missing values from 
the previous rows.
         // But we still need to mark the previous row on delete bitmap
-        if (delete_sign_column_data != nullptr && 
delete_sign_column_data[block_pos] != 0) {
+        if (have_delete_sign) {
             has_default_or_nullable = true;
             use_default_or_null_flag.emplace_back(true);
-            if (!_tablet_schema->has_sequence_col() && !have_input_seq_column) 
{
-                // we can directly use delete bitmap to mark the rows with 
delete sign as deleted
-                // if sequence column doesn't exist to eliminate reading 
delete sign columns in later reads
-                _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, 
_segment_id,
-                                                  
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
-                                                 segment_pos);
-            }
         } else {
             // partial update should not contain invisible columns
             use_default_or_null_flag.emplace_back(false);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8a18f0d1d1b..bb9f2dd1916 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3060,6 +3060,14 @@ Status 
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                 continue;
             }
             if (is_partial_update && rowset_writer != nullptr) {
+                if (delete_bitmap->contains(
+                            {rowset_id, seg->id(), 
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
+                            row_id)) {
+                    LOG(INFO)
+                            << "DEBUG: skip a delete sign column while 
calc_segment_delete_bitmap "
+                            << "processing confict for partial update";
+                    continue;
+                }
                 // In publish version, record rows to be deleted for 
concurrent update
                 // For example, if version 5 and 6 update a row, but version 6 
only see
                 // version 4 when write, and when publish version, version 5's 
value will
@@ -3146,26 +3154,17 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr 
rowset,
         return Status::InternalError("Can't find tablet id: {}, maybe already 
dropped.",
                                      tablet_id());
     }
-    std::vector<DeleteBitmapPtr> seg_delete_bitmaps;
     for (size_t i = 0; i < segments.size(); i++) {
         auto& seg = segments[i];
         if (token != nullptr) {
             RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg, 
specified_rowsets, end_version,
-                                          rowset_writer));
+                                          delete_bitmap, rowset_writer));
         } else {
-            DeleteBitmapPtr seg_delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
-            seg_delete_bitmaps.push_back(seg_delete_bitmap);
             RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i], 
specified_rowsets,
-                                                       seg_delete_bitmap, 
end_version,
-                                                       rowset_writer));
+                                                       delete_bitmap, 
end_version, rowset_writer));
         }
     }
 
-    if (token == nullptr) {
-        for (auto seg_delete_bitmap : seg_delete_bitmaps) {
-            delete_bitmap->merge(*seg_delete_bitmap);
-        }
-    }
     return Status::OK();
 }
 
@@ -3331,7 +3330,6 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, 
delete_bitmap,
                                        cur_version - 1, token.get()));
     RETURN_IF_ERROR(token->wait());
-    RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap));
     size_t total_rows = std::accumulate(
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
@@ -3437,7 +3435,6 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, 
delete_bitmap,
                                        cur_version - 1, token.get(), 
rowset_writer));
     RETURN_IF_ERROR(token->wait());
-    RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap));
 
     std::stringstream ss;
     if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 9fc43559980..f7e8321bfca 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -692,7 +692,6 @@ void BackendService::ingest_binlog(TIngestBinlogResult& 
result,
                                                         segments, txn_id,
                                                         
calc_delete_bitmap_token.get(), nullptr);
         calc_delete_bitmap_token->wait();
-        calc_delete_bitmap_token->get_delete_bitmap(delete_bitmap);
     }
 
     // Step 6.3: commit txn
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv 
b/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
index 712d86b9aba..332000d696f 100644
--- a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
+++ b/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
@@ -1,3 +1,4 @@
 1,1
 3,1
-5,1
\ No newline at end of file
+5,1
+6,1
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv
similarity index 66%
copy from regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
copy to 
regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv
index 712d86b9aba..0a7cbd412fa 100644
--- a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv
@@ -1,3 +1,3 @@
 1,1
 3,1
-5,1
\ No newline at end of file
+5,1
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
index eb0a501090a..baf484fd3d8 100644
--- 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
@@ -19,6 +19,7 @@
 4      4       4       4       4       0
 5      \N      \N      \N      \N      1
 5      5       5       5       5       0
+6      \N      \N      \N      \N      1
 
 -- !2 --
 2      2       2       2       2       0
@@ -44,6 +45,7 @@
 4      4       4       4       4       0
 5      \N      \N      \N      \N      1
 5      5       5       5       5       0
+6      \N      \N      \N      \N      1
 
 -- !2 --
 1      \N      \N      \N      \N      1
@@ -51,6 +53,7 @@
 3      \N      \N      \N      \N      1
 4      4       4       4       4       0
 5      \N      \N      \N      \N      1
+6      \N      \N      \N      \N      1
 
 -- !1 --
 1      1       1
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
index ba0c1766aa1..9a96c3358e1 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
@@ -331,5 +331,92 @@ suite("test_primary_key_partial_update_parallel", "p0") {
     qt_sql """ select 
id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__
 from ${tableName} order by id;"""
 
     sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+    // case 5: partial update with delete sign in parallel
+    tableName = "test_primary_key_partial_update_delete_sign"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NULL COMMENT "用户姓名",
+                `score` int(11) NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321")
+                UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true")
+    """
+
+    sql """insert into ${tableName} values
+        (2, "doris2", 2000, 223, 2),
+        (1, "doris", 1000, 123, 1),
+        (5, "doris5", 5000, 523, 5),
+        (4, "doris4", 4000, 423, 4),
+        (3, "doris3", 3000, 323, 3);"""
+
+    t1 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,name'
+
+            file 'partial_update_parallel1.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t2 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,__DORIS_DELETE_SIGN__'
+
+            file 'partial_update_parallel4.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t3 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,dft'
+
+            file 'partial_update_parallel3.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t1.join()
+    t2.join()
+    t3.join()
+
+    // Delete again, to make sure the result is right
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'id,__DORIS_DELETE_SIGN__'
+
+        file 'partial_update_parallel4.csv'
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+
+    // don't check result here, since the different finish order of t1/t2/t3 
will
+    // generate different result, it's hard to check.
+
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to