This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new ffd7f8f4280 [fix](partial update) fix some bugs about delete sign
(#25712) (#25860)
ffd7f8f4280 is described below
commit ffd7f8f428066fc222aab2cb1dc3ef0fbf525d47
Author: zhannngchen <[email protected]>
AuthorDate: Thu Oct 26 10:43:31 2023 +0800
[fix](partial update) fix some bugs about delete sign (#25712) (#25860)
---
be/src/olap/calc_delete_bitmap_executor.cpp | 21 +-----
be/src/olap/calc_delete_bitmap_executor.h | 7 +-
be/src/olap/delta_writer.cpp | 1 -
be/src/olap/rowset/segment_v2/segment_writer.cpp | 20 +++--
be/src/olap/tablet.cpp | 23 +++---
be/src/service/backend_service.cpp | 1 -
.../partial_update/delete_sign.csv | 3 +-
...elete_sign.csv => partial_update_parallel4.csv} | 2 +-
.../test_partial_update_delete_sign.out | 3 +
.../test_partial_update_parallel.groovy | 87 ++++++++++++++++++++++
10 files changed, 119 insertions(+), 49 deletions(-)
diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp
b/be/src/olap/calc_delete_bitmap_executor.cpp
index 51e77a36280..f2e3f8e3a00 100644
--- a/be/src/olap/calc_delete_bitmap_executor.cpp
+++ b/be/src/olap/calc_delete_bitmap_executor.cpp
@@ -33,20 +33,16 @@ using namespace ErrorCode;
Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr
cur_rowset,
const segment_v2::SegmentSharedPtr&
cur_segment,
const std::vector<RowsetSharedPtr>&
target_rowsets,
- int64_t end_version, RowsetWriter*
rowset_writer) {
+ int64_t end_version, DeleteBitmapPtr
delete_bitmap,
+ RowsetWriter* rowset_writer) {
{
std::shared_lock rlock(_lock);
RETURN_IF_ERROR(_status);
}
- DeleteBitmapPtr bitmap =
std::make_shared<DeleteBitmap>(tablet->tablet_id());
- {
- std::lock_guard wlock(_lock);
- _delete_bitmaps.push_back(bitmap);
- }
return _thread_token->submit_func([=, this]() {
auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment,
target_rowsets,
- bitmap, end_version,
rowset_writer);
+ delete_bitmap,
end_version, rowset_writer);
if (!st.ok()) {
LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: "
<< tablet->tablet_id() << " rowset: " <<
cur_rowset->rowset_id()
@@ -66,17 +62,6 @@ Status CalcDeleteBitmapToken::wait() {
return _status;
}
-Status CalcDeleteBitmapToken::get_delete_bitmap(DeleteBitmapPtr res_bitmap) {
- std::lock_guard wlock(_lock);
- RETURN_IF_ERROR(_status);
-
- for (auto bitmap : _delete_bitmaps) {
- res_bitmap->merge(*bitmap);
- }
- _delete_bitmaps.clear();
- return Status::OK();
-}
-
void CalcDeleteBitmapExecutor::init() {
ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool")
.set_min_threads(1)
diff --git a/be/src/olap/calc_delete_bitmap_executor.h
b/be/src/olap/calc_delete_bitmap_executor.h
index d2c392a04d4..ec088373b38 100644
--- a/be/src/olap/calc_delete_bitmap_executor.h
+++ b/be/src/olap/calc_delete_bitmap_executor.h
@@ -43,7 +43,6 @@ using TabletSharedPtr = std::shared_ptr<Tablet>;
// 1. create a token
// 2. submit delete bitmap calculate tasks
// 3. wait all tasks complete
-// 4. call `get_delete_bitmap()` to get the result of all tasks
class CalcDeleteBitmapToken {
public:
explicit CalcDeleteBitmapToken(std::unique_ptr<ThreadPoolToken>
thread_token)
@@ -52,21 +51,17 @@ public:
Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
const segment_v2::SegmentSharedPtr& cur_segment,
const std::vector<RowsetSharedPtr>& target_rowsets, int64_t
end_version,
- RowsetWriter* rowset_writer);
+ DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer);
// wait all tasks in token to be completed.
Status wait();
void cancel() { _thread_token->shutdown(); }
- Status get_delete_bitmap(DeleteBitmapPtr res_bitmap);
-
private:
std::unique_ptr<ThreadPoolToken> _thread_token;
std::shared_mutex _lock;
- std::vector<DeleteBitmapPtr> _delete_bitmaps;
-
// Records the current status of the calc delete bitmap job.
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
Status _status;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 3634e19c68c..e1a421f1416 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -493,7 +493,6 @@ Status DeltaWriter::wait_calc_delete_bitmap() {
}
std::lock_guard<std::mutex> l(_lock);
RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
-
RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap));
LOG(INFO) << "Got result of calc delete bitmap task from executor,
tablet_id: "
<< _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 47387eceec2..add9e7f8697 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -427,6 +427,17 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
_maybe_invalid_row_cache(key);
+ // mark key with delete sign as deleted.
+ bool have_delete_sign =
+ (delete_sign_column_data != nullptr &&
delete_sign_column_data[block_pos] != 0);
+ if (have_delete_sign && !_tablet_schema->has_sequence_col() &&
!have_input_seq_column) {
+ // we can directly use delete bitmap to mark the rows with delete
sign as deleted
+ // if sequence column doesn't exist to eliminate reading delete
sign columns in later reads
+ _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id,
_segment_id,
+
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
+ segment_pos);
+ }
+
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
@@ -458,16 +469,9 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
// if the delete sign is marked, it means that the value columns of
the row
// will not be read. So we don't need to read the missing values from
the previous rows.
// But we still need to mark the previous row on delete bitmap
- if (delete_sign_column_data != nullptr &&
delete_sign_column_data[block_pos] != 0) {
+ if (have_delete_sign) {
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
- if (!_tablet_schema->has_sequence_col() && !have_input_seq_column)
{
- // we can directly use delete bitmap to mark the rows with
delete sign as deleted
- // if sequence column doesn't exist to eliminate reading
delete sign columns in later reads
- _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id,
_segment_id,
-
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
- segment_pos);
- }
} else {
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8a18f0d1d1b..bb9f2dd1916 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3060,6 +3060,14 @@ Status
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
continue;
}
if (is_partial_update && rowset_writer != nullptr) {
+ if (delete_bitmap->contains(
+ {rowset_id, seg->id(),
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
+ row_id)) {
+ LOG(INFO)
+ << "DEBUG: skip a delete sign column while
calc_segment_delete_bitmap "
+ << "processing confict for partial update";
+ continue;
+ }
// In publish version, record rows to be deleted for
concurrent update
// For example, if version 5 and 6 update a row, but version 6
only see
// version 4 when write, and when publish version, version 5's
value will
@@ -3146,26 +3154,17 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr
rowset,
return Status::InternalError("Can't find tablet id: {}, maybe already
dropped.",
tablet_id());
}
- std::vector<DeleteBitmapPtr> seg_delete_bitmaps;
for (size_t i = 0; i < segments.size(); i++) {
auto& seg = segments[i];
if (token != nullptr) {
RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg,
specified_rowsets, end_version,
- rowset_writer));
+ delete_bitmap, rowset_writer));
} else {
- DeleteBitmapPtr seg_delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
- seg_delete_bitmaps.push_back(seg_delete_bitmap);
RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i],
specified_rowsets,
- seg_delete_bitmap,
end_version,
- rowset_writer));
+ delete_bitmap,
end_version, rowset_writer));
}
}
- if (token == nullptr) {
- for (auto seg_delete_bitmap : seg_delete_bitmaps) {
- delete_bitmap->merge(*seg_delete_bitmap);
- }
- }
return Status::OK();
}
@@ -3331,7 +3330,6 @@ Status Tablet::update_delete_bitmap_without_lock(const
RowsetSharedPtr& rowset)
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets,
delete_bitmap,
cur_version - 1, token.get()));
RETURN_IF_ERROR(token->wait());
- RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
@@ -3437,7 +3435,6 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets,
delete_bitmap,
cur_version - 1, token.get(),
rowset_writer));
RETURN_IF_ERROR(token->wait());
- RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap));
std::stringstream ss;
if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 9fc43559980..f7e8321bfca 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -692,7 +692,6 @@ void BackendService::ingest_binlog(TIngestBinlogResult&
result,
segments, txn_id,
calc_delete_bitmap_token.get(), nullptr);
calc_delete_bitmap_token->wait();
- calc_delete_bitmap_token->get_delete_bitmap(delete_bitmap);
}
// Step 6.3: commit txn
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
b/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
index 712d86b9aba..332000d696f 100644
--- a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
+++ b/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
@@ -1,3 +1,4 @@
1,1
3,1
-5,1
\ No newline at end of file
+5,1
+6,1
\ No newline at end of file
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv
similarity index 66%
copy from regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
copy to
regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv
index 712d86b9aba..0a7cbd412fa 100644
--- a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv
+++
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv
@@ -1,3 +1,3 @@
1,1
3,1
-5,1
\ No newline at end of file
+5,1
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
index eb0a501090a..baf484fd3d8 100644
---
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
@@ -19,6 +19,7 @@
4 4 4 4 4 0
5 \N \N \N \N 1
5 5 5 5 5 0
+6 \N \N \N \N 1
-- !2 --
2 2 2 2 2 0
@@ -44,6 +45,7 @@
4 4 4 4 4 0
5 \N \N \N \N 1
5 5 5 5 5 0
+6 \N \N \N \N 1
-- !2 --
1 \N \N \N \N 1
@@ -51,6 +53,7 @@
3 \N \N \N \N 1
4 4 4 4 4 0
5 \N \N \N \N 1
+6 \N \N \N \N 1
-- !1 --
1 1 1
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
index ba0c1766aa1..9a96c3358e1 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
@@ -331,5 +331,92 @@ suite("test_primary_key_partial_update_parallel", "p0") {
qt_sql """ select
id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__
from ${tableName} order by id;"""
sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+ // case 5: partial update with delete sign in parallel
+ tableName = "test_primary_key_partial_update_delete_sign"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NOT NULL COMMENT "用户 ID",
+ `name` varchar(65533) NULL COMMENT "用户姓名",
+ `score` int(11) NULL COMMENT "用户得分",
+ `test` int(11) NULL COMMENT "null test",
+ `dft` int(11) DEFAULT "4321")
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
"enable_unique_key_merge_on_write" = "true")
+ """
+
+ sql """insert into ${tableName} values
+ (2, "doris2", 2000, 223, 2),
+ (1, "doris", 1000, 123, 1),
+ (5, "doris5", 5000, 523, 5),
+ (4, "doris4", 4000, 423, 4),
+ (3, "doris3", 3000, 323, 3);"""
+
+ t1 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,name'
+
+ file 'partial_update_parallel1.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t2 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,__DORIS_DELETE_SIGN__'
+
+ file 'partial_update_parallel4.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t3 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,dft'
+
+ file 'partial_update_parallel3.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t1.join()
+ t2.join()
+ t3.join()
+
+ // Delete again, to make sure the result is right
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,__DORIS_DELETE_SIGN__'
+
+ file 'partial_update_parallel4.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+
+ // don't check result here, since the different finish order of t1/t2/t3
will
+ // generate different result, it's hard to check.
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]