This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 6ec9a731e81 [branch-2.1](cherry-pick) partial update should not read 
old fileds from rows with delete sign (#36210) (#36755)
6ec9a731e81 is described below

commit 6ec9a731e81fc7487092ca54066b658b8c04cd58
Author: zhannngchen <[email protected]>
AuthorDate: Mon Jun 24 21:13:24 2024 +0800

    [branch-2.1](cherry-pick) partial update should not read old fileds from 
rows with delete sign (#36210) (#36755)
    
    cherry-pick #36210
---
 be/src/olap/partial_update_info.h                  |  41 ++++++
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  27 +---
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  27 +---
 be/src/olap/tablet.cpp                             |  58 +++++++-
 be/src/olap/tablet.h                               |   4 +-
 .../partial_update_parallel_with_delete_sign.csv   |   5 +
 .../test_partial_update_delete_sign.out            |  82 ++++++++++-
 ...st_partial_update_delete_sign_with_conflict.out |  19 +++
 .../test_partial_update_delete_sign.groovy         | 117 ++++++++--------
 ...partial_update_delete_sign_with_conflict.groovy | 151 +++++++++++++++++++++
 10 files changed, 415 insertions(+), 116 deletions(-)

diff --git a/be/src/olap/partial_update_info.h 
b/be/src/olap/partial_update_info.h
index e08021b4f38..f20f9680b0b 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -28,6 +28,7 @@ struct PartialUpdateInfo {
               const std::string& auto_increment_column) {
         is_partial_update = partial_update;
         partial_update_input_columns = partial_update_cols;
+
         this->timestamp_ms = timestamp_ms;
         this->timezone = timezone;
         missing_cids.clear();
@@ -50,8 +51,45 @@ struct PartialUpdateInfo {
         this->is_strict_mode = is_strict_mode;
         is_input_columns_contains_auto_inc_column =
                 is_partial_update && 
partial_update_input_columns.contains(auto_increment_column);
+        _generate_default_values_for_missing_cids(tablet_schema);
+    }
+
+private:
+    void _generate_default_values_for_missing_cids(const TabletSchema& 
tablet_schema) {
+        for (auto i = 0; i < missing_cids.size(); ++i) {
+            auto cur_cid = missing_cids[i];
+            const auto& column = tablet_schema.column(cur_cid);
+            if (column.has_default_value()) {
+                std::string default_value;
+                if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+                                     FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
+                             
to_lower(tablet_schema.column(cur_cid).default_value())
+                                             
.find(to_lower("CURRENT_TIMESTAMP")) !=
+                                     std::string::npos)) {
+                    DateV2Value<DateTimeV2ValueType> dtv;
+                    dtv.from_unixtime(timestamp_ms / 1000, timezone);
+                    default_value = dtv.debug_string();
+                } else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+                                            FieldType::OLAP_FIELD_TYPE_DATEV2 
&&
+                                    
to_lower(tablet_schema.column(cur_cid).default_value())
+                                                    
.find(to_lower("CURRENT_DATE")) !=
+                                            std::string::npos)) {
+                    DateV2Value<DateV2ValueType> dv;
+                    dv.from_unixtime(timestamp_ms / 1000, timezone);
+                    default_value = dv.debug_string();
+                } else {
+                    default_value = 
tablet_schema.column(cur_cid).default_value();
+                }
+                default_values.emplace_back(default_value);
+            } else {
+                // place an empty string here
+                default_values.emplace_back();
+            }
+        }
+        CHECK_EQ(missing_cids.size(), default_values.size());
     }
 
+public:
     bool is_partial_update {false};
     std::set<std::string> partial_update_input_columns;
     std::vector<uint32_t> missing_cids;
@@ -64,5 +102,8 @@ struct PartialUpdateInfo {
     std::string timezone;
     bool is_input_columns_contains_auto_inc_column = false;
     bool is_schema_contains_auto_inc_column = false;
+
+    // default values for missing cids
+    std::vector<std::string> default_values;
 };
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 7665aec1372..38b79f47f10 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -673,7 +673,7 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
     const vectorized::Int8* delete_sign_column_data = nullptr;
     if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
                 old_value_block.try_get_by_name(DELETE_SIGN);
-        delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
+        delete_sign_column != nullptr) {
         auto& delete_sign_col =
                 reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
         delete_sign_column_data = delete_sign_col.get_data().data();
@@ -683,29 +683,8 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
         for (auto i = 0; i < cids_missing.size(); ++i) {
             const auto& column = _tablet_schema->column(cids_missing[i]);
             if (column.has_default_value()) {
-                std::string default_value;
-                if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() ==
-                                     FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-                             
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
-                                             
.find(to_lower("CURRENT_TIMESTAMP")) !=
-                                     std::string::npos)) {
-                    DateV2Value<DateTimeV2ValueType> dtv;
-                    
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-                                      
_opts.rowset_ctx->partial_update_info->timezone);
-                    default_value = dtv.debug_string();
-                } else if (UNLIKELY(
-                                   
_tablet_schema->column(cids_missing[i]).type() ==
-                                           FieldType::OLAP_FIELD_TYPE_DATEV2 &&
-                                   
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
-                                                   
.find(to_lower("CURRENT_DATE")) !=
-                                           std::string::npos)) {
-                    DateV2Value<DateV2ValueType> dv;
-                    
dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-                                     
_opts.rowset_ctx->partial_update_info->timezone);
-                    default_value = dv.debug_string();
-                } else {
-                    default_value = 
_tablet_schema->column(cids_missing[i]).default_value();
-                }
+                const auto& default_value =
+                        
_opts.rowset_ctx->partial_update_info->default_values[i];
                 vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
                                           default_value.size());
                 
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 15b3688585c..5eadac2abde 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -607,7 +607,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(
     const vectorized::Int8* delete_sign_column_data = nullptr;
     if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
                 old_value_block.try_get_by_name(DELETE_SIGN);
-        delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
+        delete_sign_column != nullptr) {
         auto& delete_sign_col =
                 reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
         delete_sign_column_data = delete_sign_col.get_data().data();
@@ -617,29 +617,8 @@ Status VerticalSegmentWriter::_fill_missing_columns(
         for (auto i = 0; i < missing_cids.size(); ++i) {
             const auto& column = _tablet_schema->column(missing_cids[i]);
             if (column.has_default_value()) {
-                std::string default_value;
-                if (UNLIKELY(_tablet_schema->column(missing_cids[i]).type() ==
-                                     FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-                             
to_lower(_tablet_schema->column(missing_cids[i]).default_value())
-                                             
.find(to_lower("CURRENT_TIMESTAMP")) !=
-                                     std::string::npos)) {
-                    DateV2Value<DateTimeV2ValueType> dtv;
-                    
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-                                      
_opts.rowset_ctx->partial_update_info->timezone);
-                    default_value = dtv.debug_string();
-                } else if (UNLIKELY(
-                                   
_tablet_schema->column(missing_cids[i]).type() ==
-                                           FieldType::OLAP_FIELD_TYPE_DATEV2 &&
-                                   
to_lower(_tablet_schema->column(missing_cids[i]).default_value())
-                                                   
.find(to_lower("CURRENT_DATE")) !=
-                                           std::string::npos)) {
-                    DateV2Value<DateV2ValueType> dv;
-                    
dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-                                     
_opts.rowset_ctx->partial_update_info->timezone);
-                    default_value = dv.debug_string();
-                } else {
-                    default_value = 
_tablet_schema->column(missing_cids[i]).default_value();
-                }
+                const auto& default_value =
+                        
_opts.rowset_ctx->partial_update_info->default_values[i];
                 vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
                                           default_value.size());
                 
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 85706668100..27b1f94530d 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3067,8 +3067,8 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr 
rowset,
         auto partial_update_info = rowset_writer->get_partial_update_info();
         DCHECK(partial_update_info);
         RETURN_IF_ERROR(generate_new_block_for_partial_update(
-                rowset_schema, partial_update_info->missing_cids, 
partial_update_info->update_cids,
-                read_plan_ori, read_plan_update, rsid_to_rowset, &block));
+                rowset_schema, partial_update_info.get(), read_plan_ori, 
read_plan_update,
+                rsid_to_rowset, &block));
         RETURN_IF_ERROR(sort_block(block, ordered_block));
         RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
         if (new_generated_rows != rowset_writer->num_rows()) {
@@ -3155,9 +3155,8 @@ std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
 }
 
 Status Tablet::generate_new_block_for_partial_update(
-        TabletSchemaSPtr rowset_schema, const std::vector<uint32>& 
missing_cids,
-        const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& 
read_plan_ori,
-        const PartialUpdateReadPlan& read_plan_update,
+        TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* 
partial_update_info,
+        const PartialUpdateReadPlan& read_plan_ori, const 
PartialUpdateReadPlan& read_plan_update,
         const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
         vectorized::Block* output_block) {
     // do partial update related works
@@ -3167,6 +3166,8 @@ Status Tablet::generate_new_block_for_partial_update(
     // 4. mark current keys deleted
     CHECK(output_block);
     auto full_mutable_columns = output_block->mutate_columns();
+    const auto& missing_cids = partial_update_info->missing_cids;
+    const auto& update_cids = partial_update_info->update_cids;
     auto old_block = rowset_schema->create_block_by_cids(missing_cids);
     auto update_block = rowset_schema->create_block_by_cids(update_cids);
 
@@ -3178,10 +3179,57 @@ Status Tablet::generate_new_block_for_partial_update(
     RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, 
read_plan_update,
                                          rsid_to_rowset, update_block, 
&read_index_update));
 
+    const vectorized::Int8* delete_sign_column_data = nullptr;
+    if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+                old_block.try_get_by_name(DELETE_SIGN);
+        delete_sign_column != nullptr) {
+        auto& delete_sign_col =
+                reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+        delete_sign_column_data = delete_sign_col.get_data().data();
+    }
+
+    // build default value block
+    auto default_value_block = old_block.clone_empty();
+    auto mutable_default_value_columns = default_value_block.mutate_columns();
+    if (delete_sign_column_data != nullptr) {
+        for (auto i = 0; i < missing_cids.size(); ++i) {
+            const auto& column = rowset_schema->column(missing_cids[i]);
+            if (column.has_default_value()) {
+                const auto& default_value = 
partial_update_info->default_values[i];
+                vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
+                                          default_value.size());
+                RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string(
+                        rb, mutable_default_value_columns[i].get()));
+            }
+        }
+    }
+
     // build full block
     CHECK(read_index_old.size() == read_index_update.size());
+
     for (auto i = 0; i < missing_cids.size(); ++i) {
+        const auto& rs_column = rowset_schema->column(missing_cids[i]);
         for (auto idx = 0; idx < read_index_old.size(); ++idx) {
+            // if the conflict update is a delete sign, which means that the 
key is
+            // not exist now, we should not read old values from the deleted 
data,
+            // and should use default value instead.
+            // NOTE: since now we are in the publishing phase, all data is 
commited
+            // before, even the `strict_mode` is true (which requires partial 
update
+            // load job can't insert new keys), this "new" key MUST be written 
into
+            // the new generated segment file.
+            if (delete_sign_column_data != nullptr &&
+                delete_sign_column_data[read_index_old[idx]] != 0) {
+                auto& mutable_column = full_mutable_columns[missing_cids[i]];
+                if (rs_column.has_default_value()) {
+                    
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
+                } else if (rs_column.is_nullable()) {
+                    
assert_cast<vectorized::ColumnNullable*>(mutable_column.get())
+                            ->insert_null_elements(1);
+                } else {
+                    mutable_column->insert_default();
+                }
+                continue;
+            }
             full_mutable_columns[missing_cids[i]]->insert_from(
                     
*old_block.get_columns_with_type_and_name()[i].column.get(),
                     read_index_old[idx]);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 028567bb5a4..46bd6802495 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -484,8 +484,8 @@ public:
     void prepare_to_read(const RowLocation& row_location, size_t pos,
                          PartialUpdateReadPlan* read_plan);
     Status generate_new_block_for_partial_update(
-            TabletSchemaSPtr rowset_schema, const std::vector<uint32>& 
missing_cids,
-            const std::vector<uint32>& update_cids, const 
PartialUpdateReadPlan& read_plan_ori,
+            TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* 
partial_update_info,
+            const PartialUpdateReadPlan& read_plan_ori,
             const PartialUpdateReadPlan& read_plan_update,
             const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
             vectorized::Block* output_block);
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
new file mode 100644
index 00000000000..62aa3a38c16
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
@@ -0,0 +1,5 @@
+1,10,1
+2,20,0
+3,30,1
+4,40,0
+5,50,1
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
index 8d3e69bbe26..784dbd69536 100644
--- 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
@@ -10,7 +10,7 @@
 2      2       2       2       2
 4      4       4       4       4
 
--- !with_delete_sign --
+-- !1 --
 1      \N      \N      \N      \N      1
 1      1       1       1       1       0
 2      2       2       2       2       0
@@ -21,12 +21,51 @@
 5      5       5       5       5       0
 6      \N      \N      \N      \N      1
 
+-- !2 --
+1      \N      \N      \N      \N      1
+2      2       2       2       2       0
+3      \N      \N      \N      \N      1
+4      4       4       4       4       0
+5      \N      \N      \N      \N      1
+6      \N      \N      \N      \N      1
+
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+4      4       4       4       4
+5      5       5       5       5
+
+-- !after_delete --
+2      2       2       2       2
+4      4       4       4       4
+
+-- !1 --
+1      1       1       1       1       0
+1      1       1       1       1       1
+2      2       2       2       2       0
+3      3       3       3       3       0
+3      3       3       3       3       1
+4      4       4       4       4       0
+5      5       5       5       5       0
+5      5       5       5       5       1
+6      \N      \N      \N      \N      1
+
+-- !2 --
+1      1       1       1       1       1
+2      2       2       2       2       0
+3      3       3       3       3       1
+4      4       4       4       4       0
+5      5       5       5       5       1
+6      \N      \N      \N      \N      1
+
 -- !1 --
 1      1       1
 
 -- !2 --
 
 -- !3 --
+1      2       \N
 
 -- !1 --
 1      1       1       1
@@ -47,7 +86,7 @@
 2      2       2       2       2
 4      4       4       4       4
 
--- !with_delete_sign --
+-- !1 --
 1      \N      \N      \N      \N      1
 1      1       1       1       1       0
 2      2       2       2       2       0
@@ -58,12 +97,51 @@
 5      5       5       5       5       0
 6      \N      \N      \N      \N      1
 
+-- !2 --
+1      \N      \N      \N      \N      1
+2      2       2       2       2       0
+3      \N      \N      \N      \N      1
+4      4       4       4       4       0
+5      \N      \N      \N      \N      1
+6      \N      \N      \N      \N      1
+
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+4      4       4       4       4
+5      5       5       5       5
+
+-- !after_delete --
+2      2       2       2       2
+4      4       4       4       4
+
+-- !1 --
+1      1       1       1       1       0
+1      1       1       1       1       1
+2      2       2       2       2       0
+3      3       3       3       3       0
+3      3       3       3       3       1
+4      4       4       4       4       0
+5      5       5       5       5       0
+5      5       5       5       5       1
+6      \N      \N      \N      \N      1
+
+-- !2 --
+1      1       1       1       1       1
+2      2       2       2       2       0
+3      3       3       3       3       1
+4      4       4       4       4       0
+5      5       5       5       5       1
+6      \N      \N      \N      \N      1
+
 -- !1 --
 1      1       1
 
 -- !2 --
 
 -- !3 --
+1      2       \N
 
 -- !1 --
 1      1       1       1
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
new file mode 100644
index 00000000000..faa4ca1d0bb
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+4      4       4       4       4
+5      5       5       5       5
+
+-- !sql --
+2      20      \N      \N      foo
+4      40      \N      \N      foo
+
+-- !sql --
+1      100     10      \N      foo
+2      20      20      \N      foo
+3      100     30      \N      foo
+4      40      40      \N      foo
+5      100     50      \N      foo
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
index 1539293d9fd..f2d93d9d715 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
@@ -61,68 +61,67 @@ suite('test_partial_update_delete_sign') {
             sql "set skip_storage_engine_merge=true;"
             sql "set skip_delete_bitmap=true;"
             sql "sync"
-            // // skip_delete_bitmap=true, skip_delete_sign=true
-            // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-
-            // sql "set skip_delete_sign=true;"
-            // sql "set skip_delete_bitmap=false;"
-            // sql "sync"
-            // // skip_delete_bitmap=false, skip_delete_sign=true
-            // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-            qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ 
from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+            // skip_delete_bitmap=true, skip_delete_sign=true
+            qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+
+            sql "set skip_delete_sign=true;"
+            sql "set skip_delete_bitmap=false;"
+            sql "sync"
+            // skip_delete_bitmap=false, skip_delete_sign=true
+            qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
             sql "drop table if exists ${tableName1};"
 
 
-            // sql "set skip_delete_sign=false;"
-            // sql "set skip_storage_engine_merge=false;"
-            // sql "set skip_delete_bitmap=false;"
-            // sql "sync"
-            // def tableName2 = "test_partial_update_delete_sign2"
-            // sql "DROP TABLE IF EXISTS ${tableName2};"
-            // sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
-            //         `k1` int NOT NULL,
-            //         `c1` int,
-            //         `c2` int,
-            //         `c3` int,
-            //         `c4` int
-            //         )UNIQUE KEY(k1)
-            //     DISTRIBUTED BY HASH(k1) BUCKETS 1
-            //     PROPERTIES (
-            //         "enable_unique_key_merge_on_write" = "true",
-            //         "disable_auto_compaction" = "true",
-            //         "replication_num" = "1",
-            //         "function_column.sequence_col" = 'c4'
-            //     );"""
-
-            // sql "insert into ${tableName2} 
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
-            // qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
-            // streamLoad {
-            //     table "${tableName2}"
-
-            //     set 'column_separator', ','
-            //     set 'format', 'csv'
-            //     set 'partial_columns', 'true'
-            //     set 'columns', 'k1,__DORIS_DELETE_SIGN__'
-
-            //     file 'delete_sign.csv'
-            //     time 10000 // limit inflight 10s
-            // }
-            // sql "sync"
-            // qt_after_delete "select * from ${tableName2} order by 
k1,c1,c2,c3,c4;"
-
-            // sql "set skip_delete_sign=true;"
-            // sql "set skip_storage_engine_merge=true;"
-            // sql "set skip_delete_bitmap=true;"
-            // sql "sync"
-            // // skip_delete_bitmap=true, skip_delete_sign=true
-            // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-
-            // sql "set skip_delete_sign=true;"
-            // sql "set skip_delete_bitmap=false;"
-            // sql "sync"
-            // // skip_delete_bitmap=false, skip_delete_sign=true
-            // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-            // sql "drop table if exists ${tableName2};"
+            sql "set skip_delete_sign=false;"
+            sql "set skip_storage_engine_merge=false;"
+            sql "set skip_delete_bitmap=false;"
+            sql "sync"
+            def tableName2 = "test_partial_update_delete_sign2"
+            sql "DROP TABLE IF EXISTS ${tableName2};"
+            sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
+                    `k1` int NOT NULL,
+                    `c1` int,
+                    `c2` int,
+                    `c3` int,
+                    `c4` int
+                    )UNIQUE KEY(k1)
+                DISTRIBUTED BY HASH(k1) BUCKETS 1
+                PROPERTIES (
+                    "enable_unique_key_merge_on_write" = "true",
+                    "disable_auto_compaction" = "true",
+                    "replication_num" = "1",
+                    "function_column.sequence_col" = 'c4'
+                );"""
+
+            sql "insert into ${tableName2} 
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
+            qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
+            streamLoad {
+                table "${tableName2}"
+
+                set 'column_separator', ','
+                set 'format', 'csv'
+                set 'partial_columns', 'true' /* NOTE: it's a partial update */
+                set 'columns', 'k1,__DORIS_DELETE_SIGN__'
+
+                file 'delete_sign.csv'
+                time 10000 // limit inflight 10s
+            }
+            sql "sync"
+            qt_after_delete "select * from ${tableName2} order by 
k1,c1,c2,c3,c4;"
+
+            sql "set skip_delete_sign=true;"
+            sql "set skip_storage_engine_merge=true;"
+            sql "set skip_delete_bitmap=true;"
+            sql "sync"
+            // skip_delete_bitmap=true, skip_delete_sign=true
+            qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+
+            sql "set skip_delete_sign=true;"
+            sql "set skip_delete_bitmap=false;"
+            sql "sync"
+            // skip_delete_bitmap=false, skip_delete_sign=true
+            qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+            sql "drop table if exists ${tableName2};"
 
 
             // partial update a row that has been deleted by delete sign(table 
without sequence column)
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
new file mode 100644
index 00000000000..7e2cd9cdfe3
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
@@ -0,0 +1,151 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+
+suite("test_partial_update_delete_sign_with_conflict") {
+    def dbName = context.config.getDbNameByFile(context.file)
+    def tableName = "test_partial_update_delete_sign_with_conflict"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int NOT NULL,
+            `c1` int default 100,
+            `c2` int,
+            `c3` int,
+            `c4` varchar(100) default 'foo'
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${tableName} 
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
+    sql "sync;"
+    qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+
+    // NOTE: use streamload 2pc to construct the conflict of publish
+    def do_streamload_2pc_commit = { txnId ->
+        def command = "curl -X PUT --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+                " -H txn_id:${txnId}" +
+                " -H txn_operation:commit" +
+                " 
http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc";
+        log.info("http_stream execute 2pc: ${command}")
+
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        json2pc = parseJson(out)
+        log.info("http_stream 2pc result: ${out}".toString())
+        assertEquals(code, 0)
+        assertEquals("success", json2pc.status.toLowerCase())
+    }
+
+    def wait_for_publish = {txnId, waitSecond ->
+        String st = "PREPARE"
+        while (!st.equalsIgnoreCase("VISIBLE") && 
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+            Thread.sleep(1000)
+            waitSecond -= 1
+            def result = sql_return_maparray "show transaction from ${dbName} 
where id = ${txnId}"
+            assertNotNull(result)
+            st = result[0].TransactionStatus
+        }
+        log.info("Stream load with txn ${txnId} is ${st}")
+        assertEquals(st, "VISIBLE")
+    }
+
+    // concurrent load 1
+    String txnId1
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'k1,c1,label_c2'
+        set 'merge_type', 'MERGE'
+        set 'delete', 'label_c2=1'
+        set 'strict_mode', 'false'
+        set 'two_phase_commit', 'true'
+        file 'partial_update_parallel_with_delete_sign.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            txnId1 = json.TxnId
+            assertEquals("success", json.Status.toLowerCase())
+        }
+    }
+
+    String txnId2
+    // concurrent load 2
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'k1,c2'
+        set 'strict_mode', "false"
+        set 'two_phase_commit', 'true'
+        file 'partial_update_parallel3.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            txnId2 = json.TxnId
+            assertEquals("success", json.Status.toLowerCase())
+        }
+    }
+    sql "sync;"
+
+    // complete load 1 first
+    do_streamload_2pc_commit(txnId1)
+    wait_for_publish(txnId1, 10)
+
+    sql "sync;"
+    qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+
+    // publish will retry until success
+    // FE retry may take logger time, wait for 20 secs
+    do_streamload_2pc_commit(txnId2)
+    wait_for_publish(txnId2, 20)
+
+    sql "sync;"
+    qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to