This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 74ed6e85013 [Fix](partial update) Fix rowset not found error when
doing partial update #34112 (#34357)
74ed6e85013 is described below
commit 74ed6e8501302c5824f0a00b3be8e1ae2b196cb9
Author: abmdocrt <[email protected]>
AuthorDate: Tue Apr 30 20:04:24 2024 +0800
[Fix](partial update) Fix rowset not found error when doing partial update
#34112 (#34357)
---
be/src/olap/cumulative_compaction_policy.cpp | 2 +
be/src/olap/delta_writer.cpp | 5 +-
be/src/olap/delta_writer.h | 1 +
be/src/olap/olap_common.h | 10 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 21 +---
be/src/olap/tablet.cpp | 10 +-
be/src/olap/tablet.h | 4 +-
..._update_rowset_not_found_fault_injection.groovy | 112 +++++++++++++++++++++
8 files changed, 132 insertions(+), 33 deletions(-)
diff --git a/be/src/olap/cumulative_compaction_policy.cpp
b/be/src/olap/cumulative_compaction_policy.cpp
index 8587b29f512..2f47826b7f0 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -284,6 +284,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
transient_size += 1;
input_rowsets->push_back(rowset);
}
+
DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets",
+ { return transient_size; })
if (total_size >= promotion_size) {
return transient_size;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 35d2a13905c..594bc7b630c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -175,6 +175,7 @@ Status DeltaWriter::init() {
} else {
RETURN_IF_ERROR(_tablet->all_rs_id(_cur_max_version,
&_rowset_ids));
}
+ _rowset_ptrs = _tablet->get_rowset_by_ids(&_rowset_ids);
}
// check tablet version number
@@ -217,7 +218,7 @@ Status DeltaWriter::init() {
context.tablet = _tablet;
context.write_type = DataWriteType::TYPE_DIRECT;
context.mow_context = std::make_shared<MowContext>(_cur_max_version,
_req.txn_id, _rowset_ids,
- _delete_bitmap);
+ _rowset_ptrs,
_delete_bitmap);
context.partial_update_info = _partial_update_info;
RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
@@ -363,7 +364,7 @@ void DeltaWriter::_reset_mem_table() {
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
auto mow_context = std::make_shared<MowContext>(_cur_max_version,
_req.txn_id, _rowset_ids,
- _delete_bitmap);
+ _rowset_ptrs,
_delete_bitmap);
_mem_table.reset(new MemTable(_tablet, _schema.get(),
_tablet_schema.get(), _req.slots,
_req.tuple_desc, _rowset_writer.get(),
mow_context,
_partial_update_info.get(),
mem_table_insert_tracker,
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 7d81d6344a2..ede5ca1f03b 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -197,6 +197,7 @@ private:
std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
// current rowset_ids, used to do diff in publish_version
RowsetIdUnorderedSet _rowset_ids;
+ std::vector<RowsetSharedPtr> _rowset_ptrs;
// current max version, used to calculate delete bitmap
int64_t _cur_max_version;
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 58b8ded5f81..a8696b5c3b5 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -39,6 +39,8 @@
namespace doris {
+class Rowset;
+
static constexpr int64_t MAX_ROWSET_ID = 1L << 56;
static constexpr int64_t LOW_56_BITS = 0x00ffffffffffffff;
@@ -470,11 +472,17 @@ class DeleteBitmap;
// merge on write context
struct MowContext {
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
+ const std::vector<std::shared_ptr<Rowset>>& rowset_ptrs,
std::shared_ptr<DeleteBitmap> db)
- : max_version(version), txn_id(txnid), rowset_ids(ids),
delete_bitmap(db) {}
+ : max_version(version),
+ txn_id(txnid),
+ rowset_ids(ids),
+ rowset_ptrs(rowset_ptrs),
+ delete_bitmap(db) {}
int64_t max_version;
int64_t txn_id;
const RowsetIdUnorderedSet& rowset_ids;
+ std::vector<std::shared_ptr<Rowset>> rowset_ptrs;
std::shared_ptr<DeleteBitmap> delete_bitmap;
};
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 5d131c25537..b696a88602d 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -402,25 +402,8 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
std::vector<RowsetSharedPtr> specified_rowsets;
{
std::shared_lock rlock(_tablet->get_header_lock());
- // Under normal circumstances, `get_rowset_by_ids` does not need to
consider the stale
- // rowset, in other word, if a rowset id is not found in the normal
rowset, we can ignore
- // it. This is because even if we handle stale rowset here, we need to
recalculate the
- // new rowset generated by the corresponding compaction in the publish
phase.
- // However, for partial update, ignoring the stale rowset may cause
some keys to not be
- // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND),
and thus be mistaken
- // as new keys in the flush phase, which will cause the load to fail
in the following
- // two cases:
- // 1. when strict_mode is enabled, new keys are not allowed to be
added.
- // 2. Some columns that need to be filled are neither nullable nor
have a default value,
- // in which case the value of the field cannot be filled as a new
key, leading to a
- // failure of the load.
- bool should_include_stale =
- _opts.rowset_ctx->partial_update_info->is_strict_mode ||
-
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
- specified_rowsets =
- _tablet->get_rowset_by_ids(&_mow_context->rowset_ids,
should_include_stale);
- if (_opts.rowset_ctx->partial_update_info->is_strict_mode &&
- specified_rowsets.size() != _mow_context->rowset_ids.size()) {
+ specified_rowsets = _mow_context->rowset_ptrs;
+ if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing
rowsets here will lead to problems.
// In other case, the missing rowsets will be calculated in later
phases(commit phase/publish phase)
LOG(WARNING) << fmt::format(
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index d1d6fa19066..51811c2d22d 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3266,7 +3266,7 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
}
std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
- const RowsetIdUnorderedSet* specified_rowset_ids, bool include_stale) {
+ const RowsetIdUnorderedSet* specified_rowset_ids) {
std::vector<RowsetSharedPtr> rowsets;
for (auto& rs : _rs_version_map) {
if (!specified_rowset_ids ||
@@ -3274,14 +3274,6 @@ std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
rowsets.push_back(rs.second);
}
}
- if (include_stale && specified_rowset_ids != nullptr &&
- rowsets.size() != specified_rowset_ids->size()) {
- for (auto& rs : _stale_rs_version_map) {
- if (specified_rowset_ids->find(rs.second->rowset_id()) !=
specified_rowset_ids->end()) {
- rowsets.push_back(rs.second);
- }
- }
- }
std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs,
RowsetSharedPtr& rhs) {
return lhs->end_version() > rhs->end_version();
});
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 29d7209b906..775bfa9262b 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -469,8 +469,8 @@ public:
DeleteBitmapPtr delete_bitmap, int64_t version,
CalcDeleteBitmapToken* token, RowsetWriter*
rowset_writer = nullptr);
- std::vector<RowsetSharedPtr> get_rowset_by_ids(const RowsetIdUnorderedSet*
specified_rowset_ids,
- bool include_stale = false);
+ std::vector<RowsetSharedPtr> get_rowset_by_ids(
+ const RowsetIdUnorderedSet* specified_rowset_ids);
Status calc_segment_delete_bitmap(RowsetSharedPtr rowset,
const segment_v2::SegmentSharedPtr& seg,
diff --git
a/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
new file mode 100644
index 00000000000..befad64da0a
--- /dev/null
+++
b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_partial_update_rowset_not_found_fault_injection",
"p2,nonConcurrent") {
+ def testTable = "test_partial_update_rowset_not_found_fault_injection"
+ sql """ DROP TABLE IF EXISTS ${testTable}"""
+ sql """
+ create table ${testTable}
+ (
+ `k1` INT,
+ `v1` INT NOT NULL,
+ `v2` INT NOT NULL,
+ `v3` INT NOT NULL,
+ `v4` INT NOT NULL,
+ `v5` INT NOT NULL,
+ `v6` INT NOT NULL,
+ `v7` INT NOT NULL,
+ `v8` INT NOT NULL,
+ `v9` INT NOT NULL,
+ `v10` INT NOT NULL
+ )
+ UNIQUE KEY (`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ def load_data = {
+ streamLoad {
+ table 'test_partial_update_rowset_not_found_fault_injection'
+ set 'column_separator', ','
+ set 'compress_type', 'GZ'
+
+
+ file
"""${getS3Url()}/regression/fault_injection/test_partial_update_rowset_not_found_falut_injection1.csv.gz"""
+
+ time 300000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string:[:]]
+
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ load_data()
+ def error = false
+
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep")
+
GetDebugPoint().enableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets")
+ def thread = Thread.start{
+ try {
+ sql """update ${testTable} set v10=1"""
+ }
+ catch (Exception e){
+ logger.info(e.getMessage())
+ error = true
+ }
+ }
+
+ Thread.sleep(2000)
+ // trigger compactions for all tablets in ${tableName}
+ def tablets = sql_return_maparray """ show tablets from ${testTable};
"""
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ }
+
+ thread.join()
+ assertFalse(error)
+ } catch (Exception e){
+ logger.info(e.getMessage())
+ assertFalse(true)
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep")
+
GetDebugPoint().disableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]