This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 6ec9a731e81 [branch-2.1](cherry-pick) partial update should not read
old fileds from rows with delete sign (#36210) (#36755)
6ec9a731e81 is described below
commit 6ec9a731e81fc7487092ca54066b658b8c04cd58
Author: zhannngchen <[email protected]>
AuthorDate: Mon Jun 24 21:13:24 2024 +0800
[branch-2.1](cherry-pick) partial update should not read old fileds from
rows with delete sign (#36210) (#36755)
cherry-pick #36210
---
be/src/olap/partial_update_info.h | 41 ++++++
be/src/olap/rowset/segment_v2/segment_writer.cpp | 27 +---
.../rowset/segment_v2/vertical_segment_writer.cpp | 27 +---
be/src/olap/tablet.cpp | 58 +++++++-
be/src/olap/tablet.h | 4 +-
.../partial_update_parallel_with_delete_sign.csv | 5 +
.../test_partial_update_delete_sign.out | 82 ++++++++++-
...st_partial_update_delete_sign_with_conflict.out | 19 +++
.../test_partial_update_delete_sign.groovy | 117 ++++++++--------
...partial_update_delete_sign_with_conflict.groovy | 151 +++++++++++++++++++++
10 files changed, 415 insertions(+), 116 deletions(-)
diff --git a/be/src/olap/partial_update_info.h
b/be/src/olap/partial_update_info.h
index e08021b4f38..f20f9680b0b 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -28,6 +28,7 @@ struct PartialUpdateInfo {
const std::string& auto_increment_column) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
+
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
@@ -50,8 +51,45 @@ struct PartialUpdateInfo {
this->is_strict_mode = is_strict_mode;
is_input_columns_contains_auto_inc_column =
is_partial_update &&
partial_update_input_columns.contains(auto_increment_column);
+ _generate_default_values_for_missing_cids(tablet_schema);
+ }
+
+private:
+ void _generate_default_values_for_missing_cids(const TabletSchema&
tablet_schema) {
+ for (auto i = 0; i < missing_cids.size(); ++i) {
+ auto cur_cid = missing_cids[i];
+ const auto& column = tablet_schema.column(cur_cid);
+ if (column.has_default_value()) {
+ std::string default_value;
+ if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+ FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
+
to_lower(tablet_schema.column(cur_cid).default_value())
+
.find(to_lower("CURRENT_TIMESTAMP")) !=
+ std::string::npos)) {
+ DateV2Value<DateTimeV2ValueType> dtv;
+ dtv.from_unixtime(timestamp_ms / 1000, timezone);
+ default_value = dtv.debug_string();
+ } else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+ FieldType::OLAP_FIELD_TYPE_DATEV2
&&
+
to_lower(tablet_schema.column(cur_cid).default_value())
+
.find(to_lower("CURRENT_DATE")) !=
+ std::string::npos)) {
+ DateV2Value<DateV2ValueType> dv;
+ dv.from_unixtime(timestamp_ms / 1000, timezone);
+ default_value = dv.debug_string();
+ } else {
+ default_value =
tablet_schema.column(cur_cid).default_value();
+ }
+ default_values.emplace_back(default_value);
+ } else {
+ // place an empty string here
+ default_values.emplace_back();
+ }
+ }
+ CHECK_EQ(missing_cids.size(), default_values.size());
}
+public:
bool is_partial_update {false};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
@@ -64,5 +102,8 @@ struct PartialUpdateInfo {
std::string timezone;
bool is_input_columns_contains_auto_inc_column = false;
bool is_schema_contains_auto_inc_column = false;
+
+ // default values for missing cids
+ std::vector<std::string> default_values;
};
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 7665aec1372..38b79f47f10 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -673,7 +673,7 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
const vectorized::Int8* delete_sign_column_data = nullptr;
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
old_value_block.try_get_by_name(DELETE_SIGN);
- delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
+ delete_sign_column != nullptr) {
auto& delete_sign_col =
reinterpret_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column->column));
delete_sign_column_data = delete_sign_col.get_data().data();
@@ -683,29 +683,8 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
for (auto i = 0; i < cids_missing.size(); ++i) {
const auto& column = _tablet_schema->column(cids_missing[i]);
if (column.has_default_value()) {
- std::string default_value;
- if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() ==
- FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
-
.find(to_lower("CURRENT_TIMESTAMP")) !=
- std::string::npos)) {
- DateV2Value<DateTimeV2ValueType> dtv;
-
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-
_opts.rowset_ctx->partial_update_info->timezone);
- default_value = dtv.debug_string();
- } else if (UNLIKELY(
-
_tablet_schema->column(cids_missing[i]).type() ==
- FieldType::OLAP_FIELD_TYPE_DATEV2 &&
-
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
-
.find(to_lower("CURRENT_DATE")) !=
- std::string::npos)) {
- DateV2Value<DateV2ValueType> dv;
-
dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-
_opts.rowset_ctx->partial_update_info->timezone);
- default_value = dv.debug_string();
- } else {
- default_value =
_tablet_schema->column(cids_missing[i]).default_value();
- }
+ const auto& default_value =
+
_opts.rowset_ctx->partial_update_info->default_values[i];
vectorized::ReadBuffer
rb(const_cast<char*>(default_value.c_str()),
default_value.size());
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 15b3688585c..5eadac2abde 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -607,7 +607,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(
const vectorized::Int8* delete_sign_column_data = nullptr;
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
old_value_block.try_get_by_name(DELETE_SIGN);
- delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
+ delete_sign_column != nullptr) {
auto& delete_sign_col =
reinterpret_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column->column));
delete_sign_column_data = delete_sign_col.get_data().data();
@@ -617,29 +617,8 @@ Status VerticalSegmentWriter::_fill_missing_columns(
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& column = _tablet_schema->column(missing_cids[i]);
if (column.has_default_value()) {
- std::string default_value;
- if (UNLIKELY(_tablet_schema->column(missing_cids[i]).type() ==
- FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-
to_lower(_tablet_schema->column(missing_cids[i]).default_value())
-
.find(to_lower("CURRENT_TIMESTAMP")) !=
- std::string::npos)) {
- DateV2Value<DateTimeV2ValueType> dtv;
-
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-
_opts.rowset_ctx->partial_update_info->timezone);
- default_value = dtv.debug_string();
- } else if (UNLIKELY(
-
_tablet_schema->column(missing_cids[i]).type() ==
- FieldType::OLAP_FIELD_TYPE_DATEV2 &&
-
to_lower(_tablet_schema->column(missing_cids[i]).default_value())
-
.find(to_lower("CURRENT_DATE")) !=
- std::string::npos)) {
- DateV2Value<DateV2ValueType> dv;
-
dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-
_opts.rowset_ctx->partial_update_info->timezone);
- default_value = dv.debug_string();
- } else {
- default_value =
_tablet_schema->column(missing_cids[i]).default_value();
- }
+ const auto& default_value =
+
_opts.rowset_ctx->partial_update_info->default_values[i];
vectorized::ReadBuffer
rb(const_cast<char*>(default_value.c_str()),
default_value.size());
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 85706668100..27b1f94530d 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3067,8 +3067,8 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr
rowset,
auto partial_update_info = rowset_writer->get_partial_update_info();
DCHECK(partial_update_info);
RETURN_IF_ERROR(generate_new_block_for_partial_update(
- rowset_schema, partial_update_info->missing_cids,
partial_update_info->update_cids,
- read_plan_ori, read_plan_update, rsid_to_rowset, &block));
+ rowset_schema, partial_update_info.get(), read_plan_ori,
read_plan_update,
+ rsid_to_rowset, &block));
RETURN_IF_ERROR(sort_block(block, ordered_block));
RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
if (new_generated_rows != rowset_writer->num_rows()) {
@@ -3155,9 +3155,8 @@ std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
}
Status Tablet::generate_new_block_for_partial_update(
- TabletSchemaSPtr rowset_schema, const std::vector<uint32>&
missing_cids,
- const std::vector<uint32>& update_cids, const PartialUpdateReadPlan&
read_plan_ori,
- const PartialUpdateReadPlan& read_plan_update,
+ TabletSchemaSPtr rowset_schema, const PartialUpdateInfo*
partial_update_info,
+ const PartialUpdateReadPlan& read_plan_ori, const
PartialUpdateReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block) {
// do partial update related works
@@ -3167,6 +3166,8 @@ Status Tablet::generate_new_block_for_partial_update(
// 4. mark current keys deleted
CHECK(output_block);
auto full_mutable_columns = output_block->mutate_columns();
+ const auto& missing_cids = partial_update_info->missing_cids;
+ const auto& update_cids = partial_update_info->update_cids;
auto old_block = rowset_schema->create_block_by_cids(missing_cids);
auto update_block = rowset_schema->create_block_by_cids(update_cids);
@@ -3178,10 +3179,57 @@ Status Tablet::generate_new_block_for_partial_update(
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids,
read_plan_update,
rsid_to_rowset, update_block,
&read_index_update));
+ const vectorized::Int8* delete_sign_column_data = nullptr;
+ if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+ old_block.try_get_by_name(DELETE_SIGN);
+ delete_sign_column != nullptr) {
+ auto& delete_sign_col =
+ reinterpret_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+ delete_sign_column_data = delete_sign_col.get_data().data();
+ }
+
+ // build default value block
+ auto default_value_block = old_block.clone_empty();
+ auto mutable_default_value_columns = default_value_block.mutate_columns();
+ if (delete_sign_column_data != nullptr) {
+ for (auto i = 0; i < missing_cids.size(); ++i) {
+ const auto& column = rowset_schema->column(missing_cids[i]);
+ if (column.has_default_value()) {
+ const auto& default_value =
partial_update_info->default_values[i];
+ vectorized::ReadBuffer
rb(const_cast<char*>(default_value.c_str()),
+ default_value.size());
+ RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string(
+ rb, mutable_default_value_columns[i].get()));
+ }
+ }
+ }
+
// build full block
CHECK(read_index_old.size() == read_index_update.size());
+
for (auto i = 0; i < missing_cids.size(); ++i) {
+ const auto& rs_column = rowset_schema->column(missing_cids[i]);
for (auto idx = 0; idx < read_index_old.size(); ++idx) {
+ // if the conflict update is a delete sign, which means that the
key is
+ // not exist now, we should not read old values from the deleted
data,
+ // and should use default value instead.
+ // NOTE: since now we are in the publishing phase, all data is
commited
+ // before, even the `strict_mode` is true (which requires partial
update
+ // load job can't insert new keys), this "new" key MUST be written
into
+ // the new generated segment file.
+ if (delete_sign_column_data != nullptr &&
+ delete_sign_column_data[read_index_old[idx]] != 0) {
+ auto& mutable_column = full_mutable_columns[missing_cids[i]];
+ if (rs_column.has_default_value()) {
+
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
+ } else if (rs_column.is_nullable()) {
+
assert_cast<vectorized::ColumnNullable*>(mutable_column.get())
+ ->insert_null_elements(1);
+ } else {
+ mutable_column->insert_default();
+ }
+ continue;
+ }
full_mutable_columns[missing_cids[i]]->insert_from(
*old_block.get_columns_with_type_and_name()[i].column.get(),
read_index_old[idx]);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 028567bb5a4..46bd6802495 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -484,8 +484,8 @@ public:
void prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan);
Status generate_new_block_for_partial_update(
- TabletSchemaSPtr rowset_schema, const std::vector<uint32>&
missing_cids,
- const std::vector<uint32>& update_cids, const
PartialUpdateReadPlan& read_plan_ori,
+ TabletSchemaSPtr rowset_schema, const PartialUpdateInfo*
partial_update_info,
+ const PartialUpdateReadPlan& read_plan_ori,
const PartialUpdateReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block);
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
new file mode 100644
index 00000000000..62aa3a38c16
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
@@ -0,0 +1,5 @@
+1,10,1
+2,20,0
+3,30,1
+4,40,0
+5,50,1
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
index 8d3e69bbe26..784dbd69536 100644
---
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
@@ -10,7 +10,7 @@
2 2 2 2 2
4 4 4 4 4
--- !with_delete_sign --
+-- !1 --
1 \N \N \N \N 1
1 1 1 1 1 0
2 2 2 2 2 0
@@ -21,12 +21,51 @@
5 5 5 5 5 0
6 \N \N \N \N 1
+-- !2 --
+1 \N \N \N \N 1
+2 2 2 2 2 0
+3 \N \N \N \N 1
+4 4 4 4 4 0
+5 \N \N \N \N 1
+6 \N \N \N \N 1
+
+-- !sql --
+1 1 1 1 1
+2 2 2 2 2
+3 3 3 3 3
+4 4 4 4 4
+5 5 5 5 5
+
+-- !after_delete --
+2 2 2 2 2
+4 4 4 4 4
+
+-- !1 --
+1 1 1 1 1 0
+1 1 1 1 1 1
+2 2 2 2 2 0
+3 3 3 3 3 0
+3 3 3 3 3 1
+4 4 4 4 4 0
+5 5 5 5 5 0
+5 5 5 5 5 1
+6 \N \N \N \N 1
+
+-- !2 --
+1 1 1 1 1 1
+2 2 2 2 2 0
+3 3 3 3 3 1
+4 4 4 4 4 0
+5 5 5 5 5 1
+6 \N \N \N \N 1
+
-- !1 --
1 1 1
-- !2 --
-- !3 --
+1 2 \N
-- !1 --
1 1 1 1
@@ -47,7 +86,7 @@
2 2 2 2 2
4 4 4 4 4
--- !with_delete_sign --
+-- !1 --
1 \N \N \N \N 1
1 1 1 1 1 0
2 2 2 2 2 0
@@ -58,12 +97,51 @@
5 5 5 5 5 0
6 \N \N \N \N 1
+-- !2 --
+1 \N \N \N \N 1
+2 2 2 2 2 0
+3 \N \N \N \N 1
+4 4 4 4 4 0
+5 \N \N \N \N 1
+6 \N \N \N \N 1
+
+-- !sql --
+1 1 1 1 1
+2 2 2 2 2
+3 3 3 3 3
+4 4 4 4 4
+5 5 5 5 5
+
+-- !after_delete --
+2 2 2 2 2
+4 4 4 4 4
+
+-- !1 --
+1 1 1 1 1 0
+1 1 1 1 1 1
+2 2 2 2 2 0
+3 3 3 3 3 0
+3 3 3 3 3 1
+4 4 4 4 4 0
+5 5 5 5 5 0
+5 5 5 5 5 1
+6 \N \N \N \N 1
+
+-- !2 --
+1 1 1 1 1 1
+2 2 2 2 2 0
+3 3 3 3 3 1
+4 4 4 4 4 0
+5 5 5 5 5 1
+6 \N \N \N \N 1
+
-- !1 --
1 1 1
-- !2 --
-- !3 --
+1 2 \N
-- !1 --
1 1 1 1
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
new file mode 100644
index 00000000000..faa4ca1d0bb
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1 1 1
+2 2 2 2 2
+3 3 3 3 3
+4 4 4 4 4
+5 5 5 5 5
+
+-- !sql --
+2 20 \N \N foo
+4 40 \N \N foo
+
+-- !sql --
+1 100 10 \N foo
+2 20 20 \N foo
+3 100 30 \N foo
+4 40 40 \N foo
+5 100 50 \N foo
+
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
index 1539293d9fd..f2d93d9d715 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
@@ -61,68 +61,67 @@ suite('test_partial_update_delete_sign') {
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
sql "sync"
- // // skip_delete_bitmap=true, skip_delete_sign=true
- // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-
- // sql "set skip_delete_sign=true;"
- // sql "set skip_delete_bitmap=false;"
- // sql "sync"
- // // skip_delete_bitmap=false, skip_delete_sign=true
- // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
- qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__
from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+ // skip_delete_bitmap=true, skip_delete_sign=true
+ qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+
+ sql "set skip_delete_sign=true;"
+ sql "set skip_delete_bitmap=false;"
+ sql "sync"
+ // skip_delete_bitmap=false, skip_delete_sign=true
+ qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "drop table if exists ${tableName1};"
- // sql "set skip_delete_sign=false;"
- // sql "set skip_storage_engine_merge=false;"
- // sql "set skip_delete_bitmap=false;"
- // sql "sync"
- // def tableName2 = "test_partial_update_delete_sign2"
- // sql "DROP TABLE IF EXISTS ${tableName2};"
- // sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
- // `k1` int NOT NULL,
- // `c1` int,
- // `c2` int,
- // `c3` int,
- // `c4` int
- // )UNIQUE KEY(k1)
- // DISTRIBUTED BY HASH(k1) BUCKETS 1
- // PROPERTIES (
- // "enable_unique_key_merge_on_write" = "true",
- // "disable_auto_compaction" = "true",
- // "replication_num" = "1",
- // "function_column.sequence_col" = 'c4'
- // );"""
-
- // sql "insert into ${tableName2}
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
- // qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
- // streamLoad {
- // table "${tableName2}"
-
- // set 'column_separator', ','
- // set 'format', 'csv'
- // set 'partial_columns', 'true'
- // set 'columns', 'k1,__DORIS_DELETE_SIGN__'
-
- // file 'delete_sign.csv'
- // time 10000 // limit inflight 10s
- // }
- // sql "sync"
- // qt_after_delete "select * from ${tableName2} order by
k1,c1,c2,c3,c4;"
-
- // sql "set skip_delete_sign=true;"
- // sql "set skip_storage_engine_merge=true;"
- // sql "set skip_delete_bitmap=true;"
- // sql "sync"
- // // skip_delete_bitmap=true, skip_delete_sign=true
- // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-
- // sql "set skip_delete_sign=true;"
- // sql "set skip_delete_bitmap=false;"
- // sql "sync"
- // // skip_delete_bitmap=false, skip_delete_sign=true
- // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
- // sql "drop table if exists ${tableName2};"
+ sql "set skip_delete_sign=false;"
+ sql "set skip_storage_engine_merge=false;"
+ sql "set skip_delete_bitmap=false;"
+ sql "sync"
+ def tableName2 = "test_partial_update_delete_sign2"
+ sql "DROP TABLE IF EXISTS ${tableName2};"
+ sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int,
+ `c4` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1",
+ "function_column.sequence_col" = 'c4'
+ );"""
+
+ sql "insert into ${tableName2}
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
+ qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
+ streamLoad {
+ table "${tableName2}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true' /* NOTE: it's a partial update */
+ set 'columns', 'k1,__DORIS_DELETE_SIGN__'
+
+ file 'delete_sign.csv'
+ time 10000 // limit inflight 10s
+ }
+ sql "sync"
+ qt_after_delete "select * from ${tableName2} order by
k1,c1,c2,c3,c4;"
+
+ sql "set skip_delete_sign=true;"
+ sql "set skip_storage_engine_merge=true;"
+ sql "set skip_delete_bitmap=true;"
+ sql "sync"
+ // skip_delete_bitmap=true, skip_delete_sign=true
+ qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+
+ sql "set skip_delete_sign=true;"
+ sql "set skip_delete_bitmap=false;"
+ sql "sync"
+ // skip_delete_bitmap=false, skip_delete_sign=true
+ qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+ sql "drop table if exists ${tableName2};"
// partial update a row that has been deleted by delete sign(table
without sequence column)
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
new file mode 100644
index 00000000000..7e2cd9cdfe3
--- /dev/null
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
@@ -0,0 +1,151 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+
+suite("test_partial_update_delete_sign_with_conflict") {
+ def dbName = context.config.getDbNameByFile(context.file)
+ def tableName = "test_partial_update_delete_sign_with_conflict"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int NOT NULL,
+ `c1` int default 100,
+ `c2` int,
+ `c3` int,
+ `c4` varchar(100) default 'foo'
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${tableName}
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
+ sql "sync;"
+ qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+
+ // NOTE: use streamload 2pc to construct the conflict of publish
+ def do_streamload_2pc_commit = { txnId ->
+ def command = "curl -X PUT --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+ " -H txn_id:${txnId}" +
+ " -H txn_operation:commit" +
+ "
http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc"
+ log.info("http_stream execute 2pc: ${command}")
+
+ def process = command.execute()
+ code = process.waitFor()
+ out = process.text
+ json2pc = parseJson(out)
+ log.info("http_stream 2pc result: ${out}".toString())
+ assertEquals(code, 0)
+ assertEquals("success", json2pc.status.toLowerCase())
+ }
+
+ def wait_for_publish = {txnId, waitSecond ->
+ String st = "PREPARE"
+ while (!st.equalsIgnoreCase("VISIBLE") &&
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+ Thread.sleep(1000)
+ waitSecond -= 1
+ def result = sql_return_maparray "show transaction from ${dbName}
where id = ${txnId}"
+ assertNotNull(result)
+ st = result[0].TransactionStatus
+ }
+ log.info("Stream load with txn ${txnId} is ${st}")
+ assertEquals(st, "VISIBLE")
+ }
+
+ // concurrent load 1
+ String txnId1
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'columns', 'k1,c1,label_c2'
+ set 'merge_type', 'MERGE'
+ set 'delete', 'label_c2=1'
+ set 'strict_mode', 'false'
+ set 'two_phase_commit', 'true'
+ file 'partial_update_parallel_with_delete_sign.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ txnId1 = json.TxnId
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+
+ String txnId2
+ // concurrent load 2
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'k1,c2'
+ set 'strict_mode', "false"
+ set 'two_phase_commit', 'true'
+ file 'partial_update_parallel3.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ txnId2 = json.TxnId
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ sql "sync;"
+
+ // complete load 1 first
+ do_streamload_2pc_commit(txnId1)
+ wait_for_publish(txnId1, 10)
+
+ sql "sync;"
+ qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+
+ // publish will retry until success
+ // FE retry may take logger time, wait for 20 secs
+ do_streamload_2pc_commit(txnId2)
+ wait_for_publish(txnId2, 20)
+
+ sql "sync;"
+ qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]