This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 69ec030cc5f [Fix](merge-on-write) Fix duplicate key problem after
adding sequence column for merge-on-write table (#39958)
69ec030cc5f is described below
commit 69ec030cc5f527f433747f1aa5fb8b6b9f70c044
Author: bobhan1 <[email protected]>
AuthorDate: Wed Aug 28 11:15:42 2024 +0800
[Fix](merge-on-write) Fix duplicate key problem after adding sequence
column for merge-on-write table (#39958)
## Proposed changes
Currently, `BaseTablet::lookup_row_key()` use tablet_meta's schema to
decide whether a tablet has sequence column. But users can use `ALTER
TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ...` to add hidden
sequence column on MOW table. This is a light schema change which will
not change the BE's tablet meta, thus causing wrong behavior in
`BaseTablet::lookup_row_key()`.
This PR use the schema of the current load, which is the latest schema,
to decide whether a tablet has sequence column and correct the lookup
procedure in `BaseTablet::lookup_row_key()` and
`Segment::lookup_row_key()`.
branch-2.1-pick: https://github.com/apache/doris/pull/40010
branch-2.0-pick: https://github.com/apache/doris/pull/40015
---
be/src/olap/base_tablet.cpp | 26 ++++----
be/src/olap/base_tablet.h | 2 +-
be/src/olap/rowset/segment_v2/segment.cpp | 30 ++++-----
be/src/olap/rowset/segment_v2/segment.h | 4 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 5 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 5 +-
be/src/service/point_query_executor.cpp | 6 +-
.../test_mow_enable_sequence_col.out | 16 +++++
.../test_mow_enable_sequence_col.groovy | 72 ++++++++++++++++++++++
9 files changed, 130 insertions(+), 36 deletions(-)
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index e39bc965fbe..143c1ad706b 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -427,21 +427,22 @@ Status BaseTablet::lookup_row_data(const Slice&
encoded_key, const RowLocation&
return Status::OK();
}
-Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
+Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema*
latest_schema,
+ bool with_seq_col,
const std::vector<RowsetSharedPtr>&
specified_rowsets,
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset, bool with_rowid) {
SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency);
size_t seq_col_length = 0;
- if (_tablet_meta->tablet_schema()->has_sequence_col() && with_seq_col) {
- seq_col_length = _tablet_meta->tablet_schema()
-
->column(_tablet_meta->tablet_schema()->sequence_col_idx())
- .length() +
- 1;
+ // use the latest tablet schema to decide if the tablet has sequence
column currently
+ const TabletSchema* schema =
+ (latest_schema == nullptr ? _tablet_meta->tablet_schema().get() :
latest_schema);
+ if (schema->has_sequence_col() && with_seq_col) {
+ seq_col_length = schema->column(schema->sequence_col_idx()).length() +
1;
}
size_t rowid_length = 0;
- if (with_rowid &&
!_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) {
+ if (with_rowid && !schema->cluster_key_idxes().empty()) {
rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
}
Slice key_without_seq =
@@ -457,7 +458,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key,
bool with_seq_col,
for (int i = num_segments - 1; i >= 0; i--) {
// If mow table has cluster keys, the key bounds is short keys,
not primary keys
// use PrimaryKeyIndexMetaPB in primary key index?
- if (_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) {
+ if (schema->cluster_key_idxes().empty()) {
if (key_without_seq.compare(segments_key_bounds[i].max_key())
> 0 ||
key_without_seq.compare(segments_key_bounds[i].min_key())
< 0) {
continue;
@@ -478,7 +479,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key,
bool with_seq_col,
DCHECK_EQ(segments.size(), num_segments);
for (auto id : picked_segments) {
- Status s = segments[id]->lookup_row_key(encoded_key, with_seq_col,
with_rowid, &loc);
+ Status s = segments[id]->lookup_row_key(encoded_key, schema,
with_seq_col, with_rowid,
+ &loc);
if (s.is<KEY_NOT_FOUND>()) {
continue;
}
@@ -489,7 +491,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key,
bool with_seq_col,
{loc.rowset_id, loc.segment_id, version},
loc.row_id)) {
// if has sequence col, we continue to compare the sequence_id
of
// all rowsets, util we find an existing key.
- if (_tablet_meta->tablet_schema()->has_sequence_col()) {
+ if (schema->has_sequence_col()) {
continue;
}
// The key is deleted, we don't need to search for it any more.
@@ -649,8 +651,8 @@ Status
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}
RowsetSharedPtr rowset_find;
- auto st = lookup_row_key(key, true, specified_rowsets, &loc,
dummy_version.first - 1,
- segment_caches, &rowset_find);
+ auto st = lookup_row_key(key, rowset_schema.get(), true,
specified_rowsets, &loc,
+ dummy_version.first - 1, segment_caches,
&rowset_find);
bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() ||
st.is<KEY_ALREADY_EXISTS>();
// It's a defensive DCHECK, we need to exclude some common errors
to avoid core-dump
// while stress test
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index ab289822df8..cfaf536902e 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -145,7 +145,7 @@ public:
// Lookup the row location of `encoded_key`, the function sets
`row_location` on success.
// NOTE: the method only works in unique key model with primary key index,
you will got a
// not supported error in other data model.
- Status lookup_row_key(const Slice& encoded_key, bool with_seq_col,
+ Status lookup_row_key(const Slice& encoded_key, TabletSchema*
latest_schema, bool with_seq_col,
const std::vector<RowsetSharedPtr>&
specified_rowsets,
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>&
segment_caches,
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index db784afe977..9043e68e72b 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -796,14 +796,14 @@ Status Segment::new_inverted_index_iterator(const
TabletColumn& tablet_column,
return Status::OK();
}
-Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool
with_rowid,
- RowLocation* row_location) {
+Status Segment::lookup_row_key(const Slice& key, const TabletSchema*
latest_schema,
+ bool with_seq_col, bool with_rowid,
RowLocation* row_location) {
RETURN_IF_ERROR(load_pk_index_and_bf());
- bool has_seq_col = _tablet_schema->has_sequence_col();
- bool has_rowid = !_tablet_schema->cluster_key_idxes().empty();
+ bool has_seq_col = latest_schema->has_sequence_col();
+ bool has_rowid = !latest_schema->cluster_key_idxes().empty();
size_t seq_col_length = 0;
if (has_seq_col) {
- seq_col_length =
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
+ seq_col_length =
latest_schema->column(latest_schema->sequence_col_idx()).length() + 1;
}
size_t rowid_length = has_rowid ? PrimaryKeyIndexReader::ROW_ID_LENGTH : 0;
@@ -839,16 +839,20 @@ Status Segment::lookup_row_key(const Slice& key, bool
with_seq_col, bool with_ro
Slice sought_key = Slice(index_column->get_data_at(0).data,
index_column->get_data_at(0).size);
+ // user may use "ALTER TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ..."
to add a hidden sequence column
+ // for a merge-on-write table which doesn't have sequence column, so
`has_seq_col == true` doesn't mean
+ // data in segment has sequence column value
+ bool segment_has_seq_col = _tablet_schema->has_sequence_col();
+ Slice sought_key_without_seq = Slice(
+ sought_key.get_data(),
+ sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0)
- rowid_length);
if (has_seq_col) {
- Slice sought_key_without_seq =
- Slice(sought_key.get_data(), sought_key.get_size() -
seq_col_length - rowid_length);
-
// compare key
if (key_without_seq.compare(sought_key_without_seq) != 0) {
return Status::Error<ErrorCode::KEY_NOT_FOUND>("Can't find key in
the segment");
}
- if (with_seq_col) {
+ if (with_seq_col && segment_has_seq_col) {
// compare sequence id
Slice sequence_id =
Slice(key.get_data() + key_without_seq.get_size() + 1,
seq_col_length - 1);
@@ -870,11 +874,9 @@ Status Segment::lookup_row_key(const Slice& key, bool
with_seq_col, bool with_ro
}
// found the key, use rowid in pk index if necessary.
if (has_rowid) {
- Slice sought_key_without_seq =
- Slice(sought_key.get_data(), sought_key.get_size() -
seq_col_length - rowid_length);
- Slice rowid_slice = Slice(
- sought_key.get_data() + sought_key_without_seq.get_size() +
seq_col_length + 1,
- rowid_length - 1);
+ Slice rowid_slice = Slice(sought_key.get_data() +
sought_key_without_seq.get_size() +
+ (segment_has_seq_col ?
seq_col_length : 0) + 1,
+ rowid_length - 1);
const auto* type_info =
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
const auto* rowid_coder = get_key_coder(type_info->type());
RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice,
rowid_length,
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index 9ae3797d054..d6600018822 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -128,8 +128,8 @@ public:
return _pk_index_reader.get();
}
- Status lookup_row_key(const Slice& key, bool with_seq_col, bool with_rowid,
- RowLocation* row_location);
+ Status lookup_row_key(const Slice& key, const TabletSchema* latest_schema,
bool with_seq_col,
+ bool with_rowid, RowLocation* row_location);
Status read_key_by_rowid(uint32_t row_id, std::string* key);
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 2c94942bac0..105433d2689 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -584,8 +584,9 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
- auto st = _tablet->lookup_row_key(key, have_input_seq_column,
specified_rowsets, &loc,
- _mow_context->max_version,
segment_caches, &rowset);
+ auto st = _tablet->lookup_row_key(key, _tablet_schema.get(),
have_input_seq_column,
+ specified_rowsets, &loc,
_mow_context->max_version,
+ segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 891fd8c6a10..64f72bc0c46 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -444,8 +444,9 @@ Status
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
- auto st = _tablet->lookup_row_key(key, have_input_seq_column,
specified_rowsets, &loc,
- _mow_context->max_version,
segment_caches, &rowset);
+ auto st = _tablet->lookup_row_key(key, _tablet_schema.get(),
have_input_seq_column,
+ specified_rowsets, &loc,
_mow_context->max_version,
+ segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index da79726f389..88293ba9b03 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -383,9 +383,9 @@ Status PointQueryExecutor::_lookup_row_key() {
}
// Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this
ptr
auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
- st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, false,
specified_rowsets,
- &location, INT32_MAX /*rethink?*/,
segment_caches,
- rowset_ptr.get(), false));
+ st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr,
false,
+ specified_rowsets, &location, INT32_MAX
/*rethink?*/,
+ segment_caches, rowset_ptr.get(),
false));
if (st.is<ErrorCode::KEY_NOT_FOUND>()) {
continue;
}
diff --git
a/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out
b/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out
new file mode 100644
index 00000000000..d99510cfbcb
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+111 aaa bbb 11
+222 bbb bbb 11
+333 ccc ddd 11
+
+-- !sql --
+111 aaa bbb 11 \N 0 2
+222 bbb bbb 11 \N 0 3
+333 ccc ddd 11 \N 0 4
+
+-- !sql --
+111 zzz yyy 100 99 0 5
+222 xxx www 400 99 0 8
+333 ccc ddd 11 \N 0 4
+
diff --git
a/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy
b/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy
new file mode 100644
index 00000000000..2cfb8133fd6
--- /dev/null
+++
b/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy
@@ -0,0 +1,72 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_enable_sequence_col") {
+
+ def tableName = "test_mow_enable_sequence_col"
+ sql """ DROP TABLE IF EXISTS ${tableName} force;"""
+ sql """CREATE TABLE IF NOT EXISTS ${tableName}
+ (`user_id` BIGINT NOT NULL,
+ `username` VARCHAR(50) NOT NULL,
+ `city` VARCHAR(20),
+ `age` SMALLINT)
+ UNIQUE KEY(`user_id`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = true,
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true");"""
+
+ sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`)
VALUES(111,'aaa','bbb',11);"""
+ sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`)
VALUES(222,'bbb','bbb',11);"""
+ sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`)
VALUES(333,'ccc','ddd',11);"""
+ order_qt_sql "select * from ${tableName};"
+
+ sql "set show_hidden_columns = true;"
+ sql "sync;"
+ def res = sql "desc ${tableName} all;"
+ assertTrue(!res.toString().contains("__DORIS_SEQUENCE_COL__"))
+ sql "set show_hidden_columns = false;"
+ sql "sync;"
+
+ def doSchemaChange = { cmd ->
+ sql cmd
+ waitForSchemaChangeDone {
+ sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}'
ORDER BY createtime DESC LIMIT 1"""
+ time 2000
+ }
+ }
+ doSchemaChange """ALTER TABLE ${tableName} ENABLE FEATURE "SEQUENCE_LOAD"
WITH PROPERTIES ("function_column.sequence_type" = "bigint");"""
+
+ sql "set show_hidden_columns = true;"
+ sql "sync;"
+ res = sql "desc ${tableName} all;"
+ assertTrue(res.toString().contains("__DORIS_SEQUENCE_COL__"))
+ order_qt_sql "select * from ${tableName};"
+ sql "set show_hidden_columns = false;"
+ sql "sync;"
+
+ sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`,
`__DORIS_SEQUENCE_COL__`) VALUES(111,'zzz','yyy',100,99);"""
+ sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`,
`__DORIS_SEQUENCE_COL__`) VALUES(111,'hhh','mmm',200,88);"""
+ sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`,
`__DORIS_SEQUENCE_COL__`) VALUES(222,'qqq','ppp',300,77);"""
+ sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`,
`__DORIS_SEQUENCE_COL__`) VALUES(222,'xxx','www',400,99);"""
+
+ sql "set show_hidden_columns = true;"
+ sql "sync;"
+ order_qt_sql "select * from ${tableName};"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]