This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 39473cdf481 [performance](load) add vertical segment writer (#24403)
39473cdf481 is described below
commit 39473cdf4819202a9641a1143dba529158d911ef
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 14 11:53:09 2023 +0800
[performance](load) add vertical segment writer (#24403)
---
be/src/olap/rowset/segment_creator.cpp | 90 +-
be/src/olap/rowset/segment_creator.h | 8 +
be/src/olap/rowset/segment_v2/column_writer.cpp | 17 +-
be/src/olap/rowset/segment_v2/column_writer.h | 20 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 1008 ++++++++++++++++++++
.../rowset/segment_v2/vertical_segment_writer.h | 196 ++++
be/src/vec/olap/olap_data_convertor.cpp | 6 +-
7 files changed, 1311 insertions(+), 34 deletions(-)
diff --git a/be/src/olap/rowset/segment_creator.cpp
b/be/src/olap/rowset/segment_creator.cpp
index 15002350f7f..f40d899bdce 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -30,6 +30,7 @@
#include "io/fs/file_writer.h"
#include "olap/rowset/beta_rowset_writer.h" // SegmentStatistics
#include "olap/rowset/segment_v2/segment_writer.h"
+#include "olap/rowset/segment_v2/vertical_segment_writer.h"
#include "vec/core/block.h"
namespace doris {
@@ -49,7 +50,7 @@ Status SegmentFlusher::flush_single_block(const
vectorized::Block* block, int32_
if (block->rows() == 0) {
return Status::OK();
}
- std::unique_ptr<segment_v2::SegmentWriter> writer;
+ std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
bool no_compression = block->bytes() <=
config::segment_compression_threshold_kb * 1024;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression,
flush_schema));
RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows()));
@@ -73,10 +74,16 @@ Status SegmentFlusher::close() {
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>&
segment_writer,
const vectorized::Block* block, size_t
row_offset,
size_t row_num) {
- auto s = segment_writer->append_block(block, row_offset, row_num);
- if (UNLIKELY(!s.ok())) {
- return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to append block:
{}", s.to_string());
- }
+ RETURN_IF_ERROR(segment_writer->append_block(block, row_offset, row_num));
+ _num_rows_written += row_num;
+ return Status::OK();
+}
+
+Status
SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>&
segment_writer,
+ const vectorized::Block* block, size_t
row_offset,
+ size_t row_num) {
+ RETURN_IF_ERROR(segment_writer->batch_block(block, row_offset, row_num));
+ RETURN_IF_ERROR(segment_writer->write_batch());
_num_rows_written += row_num;
return Status::OK();
}
@@ -112,6 +119,79 @@ Status
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
return Status::OK();
}
+Status SegmentFlusher::_create_segment_writer(
+ std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t
segment_id,
+ bool no_compression, TabletSchemaSPtr flush_schema) {
+ io::FileWriterPtr file_writer;
+ RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id,
file_writer));
+
+ segment_v2::VerticalSegmentWriterOptions writer_options;
+ writer_options.enable_unique_key_merge_on_write =
_context.enable_unique_key_merge_on_write;
+ writer_options.rowset_ctx = &_context;
+ writer_options.write_type = _context.write_type;
+ if (no_compression) {
+ writer_options.compression_type = NO_COMPRESSION;
+ }
+
+ const auto& tablet_schema = flush_schema ? flush_schema :
_context.tablet_schema;
+ writer.reset(new segment_v2::VerticalSegmentWriter(
+ file_writer.get(), segment_id, tablet_schema, _context.tablet,
_context.data_dir,
+ _context.max_rows_per_segment, writer_options,
_context.mow_context));
+ {
+ std::lock_guard<SpinLock> l(_lock);
+ _file_writers.push_back(std::move(file_writer));
+ }
+ auto s = writer->init();
+ if (!s.ok()) {
+ LOG(WARNING) << "failed to init segment writer: " << s.to_string();
+ writer.reset();
+ return s;
+ }
+ return Status::OK();
+}
+
+Status SegmentFlusher::_flush_segment_writer(
+ std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t*
flush_size) {
+ uint32_t row_num = writer->num_rows_written();
+ _num_rows_filtered += writer->num_rows_filtered();
+
+ if (row_num == 0) {
+ return Status::OK();
+ }
+ uint64_t segment_size;
+ uint64_t index_size;
+ Status s = writer->finalize(&segment_size, &index_size);
+ if (!s.ok()) {
+ return Status::Error(s.code(), "failed to finalize segment: {}",
s.to_string());
+ }
+ VLOG_DEBUG << "tablet_id:" << _context.tablet_id
+ << " flushing filename: " << writer->data_dir_path()
+ << " rowset_id:" << _context.rowset_id;
+
+ KeyBoundsPB key_bounds;
+ Slice min_key = writer->min_encoded_key();
+ Slice max_key = writer->max_encoded_key();
+ DCHECK_LE(min_key.compare(max_key), 0);
+ key_bounds.set_min_key(min_key.to_string());
+ key_bounds.set_max_key(max_key.to_string());
+
+ uint32_t segment_id = writer->segment_id();
+ SegmentStatistics segstat;
+ segstat.row_num = row_num;
+ segstat.data_size = segment_size + writer->inverted_index_file_size();
+ segstat.index_size = index_size + writer->inverted_index_file_size();
+ segstat.key_bounds = key_bounds;
+
+ writer.reset();
+
+ RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
+
+ if (flush_size) {
+ *flush_size = segment_size + index_size;
+ }
+ return Status::OK();
+}
+
Status
SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
diff --git a/be/src/olap/rowset/segment_creator.h
b/be/src/olap/rowset/segment_creator.h
index 750aa148724..cf5456a28aa 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -35,6 +35,7 @@ class Block;
namespace segment_v2 {
class SegmentWriter;
+class VerticalSegmentWriter;
} // namespace segment_v2
struct SegmentStatistics;
@@ -127,11 +128,18 @@ public:
private:
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>&
segment_writer,
const vectorized::Block* block, size_t row_offset, size_t
row_num);
+ Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>&
segment_writer,
+ const vectorized::Block* block, size_t row_offset, size_t
row_num);
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
int32_t segment_id, bool no_compression =
false,
TabletSchemaSPtr flush_schema = nullptr);
+ Status
_create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>&
writer,
+ int32_t segment_id, bool no_compression =
false,
+ TabletSchemaSPtr flush_schema = nullptr);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
int64_t* flush_size = nullptr);
+ Status
_flush_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>&
writer,
+ int64_t* flush_size = nullptr);
private:
RowsetWriterContext _context;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 3891c1235e5..bacf6fa1ce0 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -446,12 +446,7 @@ ScalarColumnWriter::ScalarColumnWriter(const
ColumnWriterOptions& opts,
ScalarColumnWriter::~ScalarColumnWriter() {
// delete all pages
- Page* page = _pages.head;
- while (page != nullptr) {
- Page* next_page = page->next;
- delete page;
- page = next_page;
- }
+ _pages.clear();
}
Status ScalarColumnWriter::init() {
@@ -601,11 +596,10 @@ Status ScalarColumnWriter::finish() {
}
Status ScalarColumnWriter::write_data() {
- Page* page = _pages.head;
- while (page != nullptr) {
- RETURN_IF_ERROR(_write_data_page(page));
- page = page->next;
+ for (auto& page : _pages) {
+ RETURN_IF_ERROR(_write_data_page(page.get()));
}
+ _pages.clear();
// write column dict
if (_encoding_info->encoding() == DICT_ENCODING) {
OwnedSlice dict_body;
@@ -622,6 +616,7 @@ Status ScalarColumnWriter::write_data() {
{dict_body.slice()}, footer, &dict_pp));
dict_pp.to_proto(_opts.meta->mutable_dict_page());
}
+ _page_builder.reset();
return Status::OK();
}
@@ -731,7 +726,7 @@ Status ScalarColumnWriter::finish_current_page() {
page->data.emplace_back(std::move(compressed_body));
}
- _push_back_page(page.release());
+ _push_back_page(std::move(page));
_first_rowid = _next_rowid;
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h
b/be/src/olap/rowset/segment_v2/column_writer.h
index 1bc0afb972b..67cefc3c9ce 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -230,28 +230,16 @@ private:
// use vector for easier management for lifetime of OwnedSlice
std::vector<OwnedSlice> data;
PageFooterPB footer;
- Page* next = nullptr;
};
- struct PageHead {
- Page* head = nullptr;
- Page* tail = nullptr;
- };
-
- void _push_back_page(Page* page) {
- // add page to pages' tail
- if (_pages.tail != nullptr) {
- _pages.tail->next = page;
- }
- _pages.tail = page;
- if (_pages.head == nullptr) {
- _pages.head = page;
- }
+ void _push_back_page(std::unique_ptr<Page> page) {
for (auto& data_slice : page->data) {
_data_size += data_slice.slice().size;
}
// estimate (page footer + footer size + checksum) took 20 bytes
_data_size += 20;
+ // add page to pages' tail
+ _pages.emplace_back(std::move(page));
}
Status _write_data_page(Page* page);
@@ -262,7 +250,7 @@ private:
uint64_t _data_size;
// cached generated pages,
- PageHead _pages;
+ std::vector<std::unique_ptr<Page>> _pages;
ordinal_t _first_rowid = 0;
BlockCompressionCodec* _compress_codec;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
new file mode 100644
index 00000000000..760ef3d6c9c
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -0,0 +1,1008 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/vertical_segment_writer.h"
+
+#include <gen_cpp/segment_v2.pb.h>
+#include <parallel_hashmap/phmap.h>
+
+#include <algorithm>
+#include <cassert>
+#include <ostream>
+#include <unordered_map>
+#include <utility>
+
+#include "cloud/config.h"
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
+#include "common/logging.h" // LOG
+#include "gutil/port.h"
+#include "io/fs/file_writer.h"
+#include "olap/data_dir.h"
+#include "olap/key_coder.h"
+#include "olap/olap_common.h"
+#include "olap/primary_key_index.h"
+#include "olap/row_cursor.h" // RowCursor // IWYU pragma:
keep
+#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
+#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
+#include "olap/rowset/segment_v2/page_io.h"
+#include "olap/rowset/segment_v2/page_pointer.h"
+#include "olap/segment_loader.h"
+#include "olap/short_key_index.h"
+#include "olap/tablet_schema.h"
+#include "olap/utils.h"
+#include "runtime/memory/mem_tracker.h"
+#include "service/point_query_executor.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+#include "util/faststring.h"
+#include "util/key_util.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/common/schema_util.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/types.h"
+#include "vec/io/reader_buffer.h"
+#include "vec/jsonb/serialize.h"
+#include "vec/olap/olap_data_convertor.h"
+
+namespace doris {
+namespace segment_v2 {
+
+using namespace ErrorCode;
+
+static const char* k_segment_magic = "D0R1";
+static const uint32_t k_segment_magic_length = 4;
+
+VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer,
uint32_t segment_id,
+ TabletSchemaSPtr tablet_schema,
BaseTabletSPtr tablet,
+ DataDir* data_dir, uint32_t
max_row_per_segment,
+ const
VerticalSegmentWriterOptions& opts,
+ std::shared_ptr<MowContext>
mow_context)
+ : _segment_id(segment_id),
+ _tablet_schema(std::move(tablet_schema)),
+ _tablet(std::move(tablet)),
+ _data_dir(data_dir),
+ _opts(opts),
+ _file_writer(file_writer),
+
_mem_tracker(std::make_unique<MemTracker>("VerticalSegmentWriter:Segment-" +
+
std::to_string(segment_id))),
+ _mow_context(std::move(mow_context)) {
+ CHECK_NOTNULL(file_writer);
+ _num_key_columns = _tablet_schema->num_key_columns();
+ _num_short_key_columns = _tablet_schema->num_short_key_columns();
+ DCHECK(_num_key_columns >= _num_short_key_columns);
+ for (size_t cid = 0; cid < _num_key_columns; ++cid) {
+ const auto& column = _tablet_schema->column(cid);
+ _key_coders.push_back(get_key_coder(column.type()));
+ _key_index_size.push_back(column.index_length());
+ }
+ // encode the sequence id into the primary key index
+ if (_tablet_schema->has_sequence_col() && _tablet_schema->keys_type() ==
UNIQUE_KEYS &&
+ _opts.enable_unique_key_merge_on_write) {
+ const auto& column =
_tablet_schema->column(_tablet_schema->sequence_col_idx());
+ _seq_coder = get_key_coder(column.type());
+ }
+}
+
+VerticalSegmentWriter::~VerticalSegmentWriter() {
+ _mem_tracker->release(_mem_tracker->consumption());
+}
+
+void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t
column_id,
+ const TabletColumn& column) {
+ meta->set_column_id(column_id);
+ meta->set_unique_id(column.unique_id());
+ meta->set_type(int(column.type()));
+ meta->set_length(column.length());
+ meta->set_encoding(DEFAULT_ENCODING);
+ meta->set_compression(_opts.compression_type);
+ meta->set_is_nullable(column.is_nullable());
+ for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
+ _init_column_meta(meta->add_children_columns(), column_id,
column.get_sub_column(i));
+ }
+}
+
+Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const
TabletColumn& column) {
+ ColumnWriterOptions opts;
+ opts.meta = _footer.add_columns();
+
+ _init_column_meta(opts.meta, cid, column);
+
+ // now we create zone map for key columns in AGG_KEYS or all column in
UNIQUE_KEYS or DUP_KEYS
+ // and not support zone map for array type and jsonb type.
+ opts.need_zone_map = (column.is_key() || _tablet_schema->keys_type() !=
KeysType::AGG_KEYS) &&
+ column.type() != FieldType::OLAP_FIELD_TYPE_OBJECT;
+ opts.need_bloom_filter = column.is_bf_column();
+ auto* tablet_index =
_tablet_schema->get_ngram_bf_index(column.unique_id());
+ if (tablet_index) {
+ opts.need_bloom_filter = true;
+ opts.is_ngram_bf_index = true;
+ opts.gram_size = tablet_index->get_gram_size();
+ opts.gram_bf_size = tablet_index->get_gram_bf_size();
+ }
+
+ opts.need_bitmap_index = column.has_bitmap_index();
+ bool skip_inverted_index = false;
+ if (_opts.rowset_ctx != nullptr) {
+ // skip write inverted index for index compaction
+ skip_inverted_index =
_opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0;
+ }
+ // skip write inverted index on load if skip_write_index_on_load is true
+ if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
+ _tablet_schema->skip_write_index_on_load()) {
+ skip_inverted_index = true;
+ }
+ // indexes for this column
+ opts.indexes = _tablet_schema->get_indexes_for_column(column.unique_id());
+ for (auto index : opts.indexes) {
+ if (!skip_inverted_index && index && index->index_type() ==
IndexType::INVERTED) {
+ opts.inverted_index = index;
+ // TODO support multiple inverted index
+ break;
+ }
+ }
+ if (column.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) {
+ opts.need_zone_map = false;
+ if (opts.need_bloom_filter) {
+ return Status::NotSupported("Do not support bloom filter for
struct type");
+ }
+ if (opts.need_bitmap_index) {
+ return Status::NotSupported("Do not support bitmap index for
struct type");
+ }
+ }
+ if (column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+ opts.need_zone_map = false;
+ if (opts.need_bloom_filter) {
+ return Status::NotSupported("Do not support bloom filter for array
type");
+ }
+ if (opts.need_bitmap_index) {
+ return Status::NotSupported("Do not support bitmap index for array
type");
+ }
+ }
+ if (column.type() == FieldType::OLAP_FIELD_TYPE_JSONB) {
+ opts.need_zone_map = false;
+ if (opts.need_bloom_filter) {
+ return Status::NotSupported("Do not support bloom filter for jsonb
type");
+ }
+ if (opts.need_bitmap_index) {
+ return Status::NotSupported("Do not support bitmap index for jsonb
type");
+ }
+ }
+ if (column.type() == FieldType::OLAP_FIELD_TYPE_AGG_STATE) {
+ opts.need_zone_map = false;
+ if (opts.need_bloom_filter) {
+ return Status::NotSupported("Do not support bloom filter for
agg_state type");
+ }
+ if (opts.need_bitmap_index) {
+ return Status::NotSupported("Do not support bitmap index for
agg_state type");
+ }
+ }
+ if (column.type() == FieldType::OLAP_FIELD_TYPE_MAP) {
+ opts.need_zone_map = false;
+ if (opts.need_bloom_filter) {
+ return Status::NotSupported("Do not support bloom filter for map
type");
+ }
+ if (opts.need_bitmap_index) {
+ return Status::NotSupported("Do not support bitmap index for map
type");
+ }
+ }
+
+ if (column.is_row_store_column()) {
+ // smaller page size for row store column
+ opts.data_page_size = config::row_column_page_size;
+ }
+
+ std::unique_ptr<ColumnWriter> writer;
+ RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer,
&writer));
+ RETURN_IF_ERROR(writer->init());
+ _column_writers.push_back(std::move(writer));
+
+ _olap_data_convertor->add_column_data_convertor(column);
+ return Status::OK();
+};
+
+Status VerticalSegmentWriter::init() {
+ DCHECK(_column_writers.empty());
+ if (_opts.compression_type == UNKNOWN_COMPRESSION) {
+ _opts.compression_type = _tablet_schema->compression_type();
+ }
+ _olap_data_convertor =
std::make_unique<vectorized::OlapBlockDataConvertor>();
+ _olap_data_convertor->reserve(_tablet_schema->num_columns());
+ _column_writers.reserve(_tablet_schema->columns().size());
+ // we don't need the short key index for unique key merge on write table.
+ if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
+ size_t seq_col_length = 0;
+ if (_tablet_schema->has_sequence_col()) {
+ seq_col_length =
+
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
+ }
+ _primary_key_index_builder.reset(new
PrimaryKeyIndexBuilder(_file_writer, seq_col_length));
+ RETURN_IF_ERROR(_primary_key_index_builder->init());
+ } else {
+ _short_key_index_builder.reset(
+ new ShortKeyIndexBuilder(_segment_id,
_opts.num_rows_per_block));
+ }
+ return Status::OK();
+}
+
+void VerticalSegmentWriter::_maybe_invalid_row_cache(const std::string& key)
const {
+ // Just invalid row cache for simplicity, since the rowset is not visible
at present.
+ // If we update/insert cache, if load failed rowset will not be visible
but cached data
+ // will be visible, and lead to inconsistency.
+ if (!config::disable_storage_row_cache &&
_tablet_schema->store_row_column() &&
+ _opts.write_type == DataWriteType::TYPE_DIRECT) {
+ // invalidate cache
+ RowCache::instance()->erase({_opts.rowset_ctx->tablet_id, key});
+ }
+}
+
+void VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block&
block) {
+ if (block.rows() == 0) {
+ return;
+ }
+ MonotonicStopWatch watch;
+ watch.start();
+ // find row column id
+ int row_column_id = 0;
+ for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
+ if (_tablet_schema->column(i).is_row_store_column()) {
+ row_column_id = i;
+ break;
+ }
+ }
+ if (row_column_id == 0) {
+ return;
+ }
+ auto* row_store_column =
+
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
+
.column->assume_mutable_ref()
+ .assume_mutable()
+ .get());
+ row_store_column->clear();
+ vectorized::DataTypeSerDeSPtrs serdes =
+ vectorized::create_data_type_serdes(block.get_data_types());
+ vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block,
*row_store_column,
+
_tablet_schema->num_columns(), serdes);
+ VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ",
row_column_id:" << row_column_id
+ << ", total_byte_size:" << block.allocated_bytes() << ",
serialize_cost(us)"
+ << watch.elapsed_time() / 1000;
+}
+
+// for partial update, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all key_column's converted
slice
+// 2. get pk of input block, and read missing columns
+// 2.1 first find key location{rowset_id, segment_id, row_id}
+// 2.2 build read plan to read by batch
+// 2.3 fill block
+// 3. set columns to data convertor and then write all columns
+Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock&
data) {
+ if (config::cloud_mode) {
+ // TODO(plat1ko)
+ return Status::NotSupported("append_block_with_partial_content");
+ }
+ DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
+ DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
+
+ auto tablet = static_cast<Tablet*>(_tablet.get());
+ // create full block and fill with input columns
+ auto full_block = _tablet_schema->create_block();
+ const auto& including_cids =
_opts.rowset_ctx->partial_update_info->update_cids;
+ size_t input_id = 0;
+ for (auto i : including_cids) {
+ full_block.replace_by_position(i,
data.block->get_by_position(input_id++).column);
+ }
+
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
data.row_pos,
+
data.num_rows, including_cids);
+
+ bool have_input_seq_column = false;
+ // write including columns
+ std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
+ vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+ size_t segment_start_pos;
+ for (auto cid : including_cids) {
+ // here we get segment column row num before append data.
+ segment_start_pos = _column_writers[cid]->get_next_rowid();
+ // olap data convertor alway start from id = 0
+ auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+ if (!status.ok()) {
+ return status;
+ }
+ if (cid < _num_key_columns) {
+ key_columns.push_back(column);
+ } else if (_tablet_schema->has_sequence_col() &&
+ cid == _tablet_schema->sequence_col_idx()) {
+ seq_column = column;
+ have_input_seq_column = true;
+ }
+ RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(),
column->get_data(),
+ data.num_rows));
+ }
+
+ bool has_default_or_nullable = false;
+ std::vector<bool> use_default_or_null_flag;
+ use_default_or_null_flag.reserve(data.num_rows);
+ const vectorized::Int8* delete_sign_column_data = nullptr;
+ if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+ full_block.try_get_by_name(DELETE_SIGN);
+ delete_sign_column != nullptr) {
+ auto& delete_sign_col =
+ reinterpret_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+ if (delete_sign_col.size() >= data.row_pos + data.num_rows) {
+ delete_sign_column_data = delete_sign_col.get_data().data();
+ }
+ }
+
+ std::vector<RowsetSharedPtr> specified_rowsets;
+ {
+ std::shared_lock rlock(tablet->get_header_lock());
+ specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
+ }
+ std::vector<std::unique_ptr<SegmentCacheHandle>>
segment_caches(specified_rowsets.size());
+ // locate rows in base data
+
+ int64_t num_rows_filtered = 0;
+ for (size_t block_pos = data.row_pos; block_pos < data.row_pos +
data.num_rows; block_pos++) {
+ // block segment
+ // 2 -> 0
+ // 3 -> 1
+ // 4 -> 2
+ // 5 -> 3
+ // here row_pos = 2, num_rows = 4.
+ size_t delta_pos = block_pos - data.row_pos;
+ size_t segment_pos = segment_start_pos + delta_pos;
+ std::string key = _full_encode_keys(key_columns, delta_pos);
+ if (have_input_seq_column) {
+ _encode_seq_column(seq_column, delta_pos, &key);
+ }
+ // If the table have sequence column, and the include-cids don't
contain the sequence
+ // column, we need to update the primary key index builder at the end
of this method.
+ // At that time, we have a valid sequence column to encode the key
with seq col.
+ if (!_tablet_schema->has_sequence_col() || have_input_seq_column) {
+ RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+ }
+ _maybe_invalid_row_cache(key);
+
+ // mark key with delete sign as deleted.
+ bool have_delete_sign =
+ (delete_sign_column_data != nullptr &&
delete_sign_column_data[block_pos] != 0);
+
+ RowLocation loc;
+ // save rowset shared ptr so this rowset wouldn't delete
+ RowsetSharedPtr rowset;
+ auto st = tablet->lookup_row_key(key, have_input_seq_column,
specified_rowsets, &loc,
+ _mow_context->max_version,
segment_caches, &rowset);
+ if (st.is<KEY_NOT_FOUND>()) {
+ if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
+ ++num_rows_filtered;
+ // delete the invalid newly inserted row
+ _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id,
_segment_id,
+
DeleteBitmap::TEMP_VERSION_COMMON},
+ segment_pos);
+ }
+
+ if
(!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update)
{
+ return Status::InternalError(
+ "the unmentioned columns should have default value or
be nullable for "
+ "newly inserted rows in non-strict mode partial
update");
+ }
+ has_default_or_nullable = true;
+ use_default_or_null_flag.emplace_back(true);
+ continue;
+ }
+ if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) {
+ LOG(WARNING) << "failed to lookup row key, error: " << st;
+ return st;
+ }
+
+ // if the delete sign is marked, it means that the value columns of
the row
+ // will not be read. So we don't need to read the missing values from
the previous rows.
+ // But we still need to mark the previous row on delete bitmap
+ if (have_delete_sign) {
+ has_default_or_nullable = true;
+ use_default_or_null_flag.emplace_back(true);
+ } else {
+ // partial update should not contain invisible columns
+ use_default_or_null_flag.emplace_back(false);
+ _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
+ tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);
+ }
+
+ if (st.is<KEY_ALREADY_EXISTS>()) {
+ // although we need to mark delete current row, we still need to
read missing columns
+ // for this row, we need to ensure that each column is aligned
+ _mow_context->delete_bitmap->add(
+ {_opts.rowset_ctx->rowset_id, _segment_id,
DeleteBitmap::TEMP_VERSION_COMMON},
+ segment_pos);
+ } else {
+ _mow_context->delete_bitmap->add(
+ {loc.rowset_id, loc.segment_id,
DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
+ }
+ }
+ CHECK(use_default_or_null_flag.size() == data.num_rows);
+
+ if (config::enable_merge_on_write_correctness_check) {
+
tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
+ _mow_context->rowset_ids);
+ }
+
+ // read and fill block
+ auto mutable_full_columns = full_block.mutate_columns();
+ RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns,
use_default_or_null_flag,
+ has_default_or_nullable,
segment_start_pos));
+ // row column should be filled here
+ if (_tablet_schema->store_row_column()) {
+ // convert block to row store format
+ _serialize_block_to_row_column(full_block);
+ }
+
+ // convert missing columns and send to column writer
+ const auto& missing_cids =
_opts.rowset_ctx->partial_update_info->missing_cids;
+
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
data.row_pos,
+
data.num_rows, missing_cids);
+ for (auto cid : missing_cids) {
+ auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+ if (!status.ok()) {
+ return status;
+ }
+ if (_tablet_schema->has_sequence_col() && !have_input_seq_column &&
+ cid == _tablet_schema->sequence_col_idx()) {
+ DCHECK_EQ(seq_column, nullptr);
+ seq_column = column;
+ }
+ RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(),
column->get_data(),
+ data.num_rows));
+ }
+
+ _num_rows_filtered += num_rows_filtered;
+ if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
+ DCHECK_NE(seq_column, nullptr);
+ DCHECK_EQ(_num_rows_written, data.row_pos)
+ << "_num_rows_written: " << _num_rows_written << ", row_pos"
<< data.row_pos;
+ DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
+ << "primary key index builder num rows(" <<
_primary_key_index_builder->num_rows()
+ << ") not equal to segment writer's num rows written(" <<
_num_rows_written << ")";
+ if (_num_rows_written != data.row_pos ||
+ _primary_key_index_builder->num_rows() != _num_rows_written) {
+ return Status::InternalError(
+ "Correctness check failed, _num_rows_written: {}, row_pos:
{}, primary key "
+ "index builder num rows: {}",
+ _num_rows_written, data.row_pos,
_primary_key_index_builder->num_rows());
+ }
+ for (size_t block_pos = data.row_pos; block_pos < data.row_pos +
data.num_rows;
+ block_pos++) {
+ std::string key = _full_encode_keys(key_columns, block_pos -
data.row_pos);
+ _encode_seq_column(seq_column, block_pos - data.row_pos, &key);
+ RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+ }
+ }
+
+ _num_rows_written += data.num_rows;
+ DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
+ << "primary key index builder num rows(" <<
_primary_key_index_builder->num_rows()
+ << ") not equal to segment writer's num rows written(" <<
_num_rows_written << ")";
+ _olap_data_convertor->clear_source_content();
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::_fill_missing_columns(
+ vectorized::MutableColumns& mutable_full_columns,
+ const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
+ const size_t& segment_start_pos) {
+ if (config::cloud_mode) [[unlikely]] {
+ return Status::NotSupported("fill_missing_columns");
+ }
+ auto tablet = static_cast<Tablet*>(_tablet.get());
+ // create old value columns
+ const auto& missing_cids =
_opts.rowset_ctx->partial_update_info->missing_cids;
+ auto old_value_block = _tablet_schema->create_block_by_cids(missing_cids);
+ CHECK(missing_cids.size() == old_value_block.columns());
+ auto mutable_old_columns = old_value_block.mutate_columns();
+ bool has_row_column = _tablet_schema->store_row_column();
+ // record real pos, key is input line num, value is old_block line num
+ std::map<uint32_t, uint32_t> read_index;
+ size_t read_idx = 0;
+ for (auto rs_it : _rssid_to_rid) {
+ for (auto seg_it : rs_it.second) {
+ auto rowset = _rsid_to_rowset[rs_it.first];
+ CHECK(rowset);
+ std::vector<uint32_t> rids;
+ for (auto id_and_pos : seg_it.second) {
+ rids.emplace_back(id_and_pos.rid);
+ read_index[id_and_pos.pos] = read_idx++;
+ }
+ if (has_row_column) {
+ auto st = tablet->fetch_value_through_row_column(
+ rowset, *_tablet_schema, seg_it.first, rids,
missing_cids, old_value_block);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to fetch value through row column";
+ return st;
+ }
+ continue;
+ }
+ for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
+ TabletColumn tablet_column =
_tablet_schema->column(missing_cids[cid]);
+ auto st = tablet->fetch_value_by_rowids(rowset, seg_it.first,
rids, tablet_column,
+
mutable_old_columns[cid]);
+ // set read value to output block
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to fetch value by rowids";
+ return st;
+ }
+ }
+ }
+ }
+ // build default value columns
+ auto default_value_block = old_value_block.clone_empty();
+ auto mutable_default_value_columns = default_value_block.mutate_columns();
+
+ const vectorized::Int8* delete_sign_column_data = nullptr;
+ if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+ old_value_block.try_get_by_name(DELETE_SIGN);
+ delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
+ auto& delete_sign_col =
+ reinterpret_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+ delete_sign_column_data = delete_sign_col.get_data().data();
+ }
+
+ if (has_default_or_nullable || delete_sign_column_data != nullptr) {
+ for (auto i = 0; i < missing_cids.size(); ++i) {
+ const auto& column = _tablet_schema->column(missing_cids[i]);
+ if (column.has_default_value()) {
+ auto default_value =
_tablet_schema->column(missing_cids[i]).default_value();
+ vectorized::ReadBuffer
rb(const_cast<char*>(default_value.c_str()),
+ default_value.size());
+
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
+ rb, mutable_default_value_columns[i].get()));
+ }
+ }
+ }
+
+ // fill all missing value from mutable_old_columns, need to consider
default value and null value
+ for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
+ // `use_default_or_null_flag[idx] == true` doesn't mean that we should
read values from the old row
+ // for the missing columns. For example, if a table has sequence
column, the rows with DELETE_SIGN column
+ // marked will not be marked in delete bitmap(see
https://github.com/apache/doris/pull/24011), so it will
+ // be found in Tablet::lookup_row_key() and
`use_default_or_null_flag[idx]` will be false. But we should not
+ // read values from old rows for missing values in this occasion. So
we should read the DELETE_SIGN column
+ // to check if a row REALLY exists in the table.
+ if (use_default_or_null_flag[idx] ||
+ (delete_sign_column_data != nullptr &&
+ delete_sign_column_data[read_index[idx + segment_start_pos]] !=
0)) {
+ for (auto i = 0; i < missing_cids.size(); ++i) {
+ // if the column has default value, fiil it with default value
+ // otherwise, if the column is nullable, fill it with null
value
+ const auto& tablet_column =
_tablet_schema->column(missing_cids[i]);
+ if (tablet_column.has_default_value()) {
+ mutable_full_columns[missing_cids[i]]->insert_from(
+ *mutable_default_value_columns[i].get(), 0);
+ } else if (tablet_column.is_nullable()) {
+ auto nullable_column =
assert_cast<vectorized::ColumnNullable*>(
+ mutable_full_columns[missing_cids[i]].get());
+ nullable_column->insert_null_elements(1);
+ } else {
+ // If the control flow reaches this branch, the column
neither has default value
+ // nor is nullable. It means that the row's delete sign is
marked, and the value
+ // columns are useless and won't be read. So we can just
put arbitary values in the cells
+ mutable_full_columns[missing_cids[i]]->insert_default();
+ }
+ }
+ continue;
+ }
+ auto pos_in_old_block = read_index[idx + segment_start_pos];
+ for (auto i = 0; i < missing_cids.size(); ++i) {
+ mutable_full_columns[missing_cids[i]]->insert_from(
+
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
+ pos_in_old_block);
+ }
+ }
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::batch_block(const vectorized::Block* block,
size_t row_pos,
+ size_t num_rows) {
+ if (_opts.rowset_ctx->partial_update_info &&
+ _opts.rowset_ctx->partial_update_info->is_partial_update &&
+ _opts.write_type == DataWriteType::TYPE_DIRECT &&
+ !_opts.rowset_ctx->is_transient_rowset_writer) {
+ if (block->columns() <= _tablet_schema->num_key_columns() ||
+ block->columns() >= _tablet_schema->num_columns()) {
+ return Status::InternalError(fmt::format(
+ "illegal partial update block columns: {}, num key
columns: {}, total "
+ "schema columns: {}",
+ block->columns(), _tablet_schema->num_key_columns(),
+ _tablet_schema->num_columns()));
+ }
+ } else if (block->columns() != _tablet_schema->num_columns()) {
+ return Status::InternalError(
+ "illegal block columns, block columns = {}, tablet_schema
columns = {}",
+ block->columns(), _tablet_schema->num_columns());
+ }
+ _batched_blocks.emplace_back(block, row_pos, num_rows);
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::write_batch() {
+ if (_opts.rowset_ctx->partial_update_info &&
+ _opts.rowset_ctx->partial_update_info->is_partial_update &&
+ _opts.write_type == DataWriteType::TYPE_DIRECT &&
+ !_opts.rowset_ctx->is_transient_rowset_writer) {
+ for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+ RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid)));
+ }
+ for (auto& data : _batched_blocks) {
+ RETURN_IF_ERROR(_append_block_with_partial_content(data));
+ }
+ for (auto& column_writer : _column_writers) {
+ RETURN_IF_ERROR(column_writer->finish());
+ RETURN_IF_ERROR(column_writer->write_data());
+ }
+ return Status::OK();
+ }
+ // Row column should be filled here when it's a directly write from
memtable
+ // or it's schema change write(since column data type maybe changed, so we
should reubild)
+ if (_tablet_schema->store_row_column() &&
+ (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+ _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE)) {
+ for (auto& data : _batched_blocks) {
+ // TODO: maybe we should pass range to this method
+
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(data.block));
+ }
+ }
+
+ std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
+ vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+ for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+ RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid)));
+ for (auto& data : _batched_blocks) {
+ _olap_data_convertor->set_source_content_with_specifid_columns(
+ data.block, data.row_pos, data.num_rows,
std::vector<uint32_t> {cid});
+
+ // convert column data from engine format to storage layer format
+ auto [status, column] =
_olap_data_convertor->convert_column_data(cid);
+ if (!status.ok()) {
+ return status;
+ }
+ if (cid < _num_key_columns) {
+ key_columns.push_back(column);
+ } else if (_tablet_schema->has_sequence_col() &&
+ cid == _tablet_schema->sequence_col_idx()) {
+ seq_column = column;
+ }
+
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(),
column->get_data(),
+ data.num_rows));
+ _olap_data_convertor->clear_source_content();
+ }
+ if (_data_dir != nullptr &&
+
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
+ return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed
capacity limit.",
+
_data_dir->path_hash());
+ }
+ RETURN_IF_ERROR(_column_writers[cid]->finish());
+ RETURN_IF_ERROR(_column_writers[cid]->write_data());
+ }
+
+ for (auto& data : _batched_blocks) {
+ _olap_data_convertor->set_source_content(data.block, data.row_pos,
data.num_rows);
+ // find all row pos for short key indexes
+ std::vector<size_t> short_key_pos;
+ // We build a short key index every `_opts.num_rows_per_block` rows.
Specifically, we
+ // build a short key index using 1st rows for first block and
`_short_key_row_pos - _row_count`
+ // for next blocks.
+ if (_short_key_row_pos == 0 && _num_rows_written == 0) {
+ short_key_pos.push_back(0);
+ }
+ while (_short_key_row_pos + _opts.num_rows_per_block <
_num_rows_written + data.num_rows) {
+ _short_key_row_pos += _opts.num_rows_per_block;
+ short_key_pos.push_back(_short_key_row_pos - _num_rows_written);
+ }
+ if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
+ // create primary indexes
+ std::string last_key;
+ for (size_t pos = 0; pos < data.num_rows; pos++) {
+ std::string key = _full_encode_keys(key_columns, pos);
+ if (_tablet_schema->has_sequence_col()) {
+ _encode_seq_column(seq_column, pos, &key);
+ }
+ DCHECK(key.compare(last_key) > 0)
+ << "found duplicate key or key is not sorted! current
key: " << key
+ << ", last key" << last_key;
+ RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+ _maybe_invalid_row_cache(key);
+ last_key = std::move(key);
+ }
+ } else {
+ // create short key indexes'
+ // for min_max key
+ _set_min_key(_full_encode_keys(key_columns, 0));
+ _set_max_key(_full_encode_keys(key_columns, data.num_rows - 1));
+
+ key_columns.resize(_num_short_key_columns);
+ for (const auto pos : short_key_pos) {
+
RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns,
pos)));
+ }
+ }
+ _olap_data_convertor->clear_source_content();
+ _num_rows_written += data.num_rows;
+ }
+
+ _batched_blocks.clear();
+ return Status::OK();
+}
+
+std::string VerticalSegmentWriter::_full_encode_keys(
+ const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos) {
+ assert(_key_index_size.size() == _num_key_columns);
+ assert(key_columns.size() == _num_key_columns && _key_coders.size() ==
_num_key_columns);
+
+ std::string encoded_keys;
+ size_t cid = 0;
+ for (const auto& column : key_columns) {
+ auto field = column->get_data_at(pos);
+ if (UNLIKELY(!field)) {
+ encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
+ ++cid;
+ continue;
+ }
+ encoded_keys.push_back(KEY_NORMAL_MARKER);
+ _key_coders[cid]->full_encode_ascending(field, &encoded_keys);
+ ++cid;
+ }
+ return encoded_keys;
+}
+
+void VerticalSegmentWriter::_encode_seq_column(
+ const vectorized::IOlapColumnDataAccessor* seq_column, size_t pos,
string* encoded_keys) {
+ const auto* field = seq_column->get_data_at(pos);
+ // To facilitate the use of the primary key index, encode the seq column
+ // to the minimum value of the corresponding length when the seq column
+ // is null
+ if (UNLIKELY(!field)) {
+ encoded_keys->push_back(KEY_NULL_FIRST_MARKER);
+ size_t seq_col_length =
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length();
+ encoded_keys->append(seq_col_length, KEY_MINIMAL_MARKER);
+ return;
+ }
+ encoded_keys->push_back(KEY_NORMAL_MARKER);
+ _seq_coder->full_encode_ascending(field, encoded_keys);
+}
+
+std::string VerticalSegmentWriter::_encode_keys(
+ const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos) {
+ assert(key_columns.size() == _num_short_key_columns);
+
+ std::string encoded_keys;
+ size_t cid = 0;
+ for (const auto& column : key_columns) {
+ auto field = column->get_data_at(pos);
+ if (UNLIKELY(!field)) {
+ encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
+ ++cid;
+ continue;
+ }
+ encoded_keys.push_back(KEY_NORMAL_MARKER);
+ _key_coders[cid]->encode_ascending(field, _key_index_size[cid],
&encoded_keys);
+ ++cid;
+ }
+ return encoded_keys;
+}
+
+// TODO(lingbin): Currently this function does not include the size of various
indexes,
+// We should make this more precise.
+uint64_t VerticalSegmentWriter::_estimated_remaining_size() {
+ // footer_size(4) + checksum(4) + segment_magic(4)
+ uint64_t size = 12;
+ if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
+ size += _primary_key_index_builder->size();
+ } else {
+ size += _short_key_index_builder->size();
+ }
+
+ // update the mem_tracker of segment size
+ _mem_tracker->consume(size - _mem_tracker->consumption());
+ return size;
+}
+
+size_t VerticalSegmentWriter::_calculate_inverted_index_file_size() {
+ size_t total_size = 0;
+ for (auto& column_writer : _column_writers) {
+ total_size += column_writer->get_inverted_index_size();
+ }
+ return total_size;
+}
+
+Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) {
+ uint64_t index_start = _file_writer->bytes_appended();
+ RETURN_IF_ERROR(_write_ordinal_index());
+ RETURN_IF_ERROR(_write_zone_map());
+ RETURN_IF_ERROR(_write_bitmap_index());
+ RETURN_IF_ERROR(_write_inverted_index());
+ RETURN_IF_ERROR(_write_bloom_filter_index());
+
+ *index_size = _file_writer->bytes_appended() - index_start;
+ if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
+ RETURN_IF_ERROR(_write_primary_key_index());
+ // IndexedColumnWriter write data pages mixed with segment data, we
should use
+ // the stat from primary key index builder.
+ *index_size += _primary_key_index_builder->disk_size();
+ } else {
+ RETURN_IF_ERROR(_write_short_key_index());
+ *index_size = _file_writer->bytes_appended() - index_start;
+ }
+ _inverted_index_file_size = _calculate_inverted_index_file_size();
+ // reset all column writers and data_conveter
+ clear();
+
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::finalize_footer(uint64_t* segment_file_size) {
+ RETURN_IF_ERROR(_write_footer());
+ // finish
+ RETURN_IF_ERROR(_file_writer->finalize());
+ *segment_file_size = _file_writer->bytes_appended();
+ if (*segment_file_size == 0) {
+ return Status::Corruption("Bad segment, file size = 0");
+ }
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::finalize(uint64_t* segment_file_size, uint64_t*
index_size) {
+ MonotonicStopWatch timer;
+ timer.start();
+ // check disk capacity
+ if (_data_dir != nullptr &&
+ _data_dir->reach_capacity_limit((int64_t)_estimated_remaining_size()))
{
+ return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed
capacity limit.",
+
_data_dir->path_hash());
+ }
+ _row_count = _num_rows_written;
+ _num_rows_written = 0;
+ // write index
+ RETURN_IF_ERROR(finalize_columns_index(index_size));
+ // write footer
+ RETURN_IF_ERROR(finalize_footer(segment_file_size));
+
+ if (timer.elapsed_time() > 5000000000L) {
+ LOG(INFO) << "segment flush consumes a lot time_ns " <<
timer.elapsed_time()
+ << ", segmemt_size " << *segment_file_size;
+ }
+ return Status::OK();
+}
+
+void VerticalSegmentWriter::clear() {
+ for (auto& column_writer : _column_writers) {
+ column_writer.reset();
+ }
+ _column_writers.clear();
+ _olap_data_convertor.reset();
+}
+
+// write ordinal index after data has been written
+Status VerticalSegmentWriter::_write_ordinal_index() {
+ for (auto& column_writer : _column_writers) {
+ RETURN_IF_ERROR(column_writer->write_ordinal_index());
+ }
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_zone_map() {
+ for (auto& column_writer : _column_writers) {
+ RETURN_IF_ERROR(column_writer->write_zone_map());
+ }
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_bitmap_index() {
+ for (auto& column_writer : _column_writers) {
+ RETURN_IF_ERROR(column_writer->write_bitmap_index());
+ }
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_inverted_index() {
+ for (auto& column_writer : _column_writers) {
+ RETURN_IF_ERROR(column_writer->write_inverted_index());
+ }
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_bloom_filter_index() {
+ for (auto& column_writer : _column_writers) {
+ RETURN_IF_ERROR(column_writer->write_bloom_filter_index());
+ }
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_short_key_index() {
+ std::vector<Slice> body;
+ PageFooterPB footer;
+ RETURN_IF_ERROR(_short_key_index_builder->finalize(_row_count, &body,
&footer));
+ PagePointer pp;
+ // short key index page is not compressed right now
+ RETURN_IF_ERROR(PageIO::write_page(_file_writer, body, footer, &pp));
+ pp.to_proto(_footer.mutable_short_key_index_page());
+ return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_primary_key_index() {
+ CHECK(_primary_key_index_builder->num_rows() == _row_count);
+ return
_primary_key_index_builder->finalize(_footer.mutable_primary_key_index_meta());
+}
+
+Status VerticalSegmentWriter::_write_footer() {
+ _footer.set_num_rows(_row_count);
+
+ // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4),
MagicNumber(4)
+ std::string footer_buf;
+ if (!_footer.SerializeToString(&footer_buf)) {
+ return Status::InternalError("failed to serialize segment footer");
+ }
+
+ faststring fixed_buf;
+ // footer's size
+ put_fixed32_le(&fixed_buf, footer_buf.size());
+ // footer's checksum
+ uint32_t checksum = crc32c::Value(footer_buf.data(), footer_buf.size());
+ put_fixed32_le(&fixed_buf, checksum);
+ // Append magic number. we don't write magic number in the header because
+ // that will need an extra seek when reading
+ fixed_buf.append(k_segment_magic, k_segment_magic_length);
+
+ std::vector<Slice> slices {footer_buf, fixed_buf};
+ return _write_raw_data(slices);
+}
+
+Status VerticalSegmentWriter::_write_raw_data(const std::vector<Slice>&
slices) {
+ RETURN_IF_ERROR(_file_writer->appendv(&slices[0], slices.size()));
+ return Status::OK();
+}
+
+Slice VerticalSegmentWriter::min_encoded_key() {
+ return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(),
_min_key.size())
+ :
_primary_key_index_builder->min_key();
+}
+Slice VerticalSegmentWriter::max_encoded_key() {
+ return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(),
_max_key.size())
+ :
_primary_key_index_builder->max_key();
+}
+
+void VerticalSegmentWriter::_set_min_max_key(const Slice& key) {
+ if (UNLIKELY(_is_first_row)) {
+ _min_key.append(key.get_data(), key.get_size());
+ _is_first_row = false;
+ }
+ if (key.compare(_max_key) > 0) {
+ _max_key.clear();
+ _max_key.append(key.get_data(), key.get_size());
+ }
+}
+
+void VerticalSegmentWriter::_set_min_key(const Slice& key) {
+ if (UNLIKELY(_is_first_row)) {
+ _min_key.append(key.get_data(), key.get_size());
+ _is_first_row = false;
+ }
+}
+
+void VerticalSegmentWriter::_set_max_key(const Slice& key) {
+ _max_key.clear();
+ _max_key.append(key.get_data(), key.get_size());
+}
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
new file mode 100644
index 00000000000..773751934b8
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/olap_file.pb.h>
+#include <gen_cpp/segment_v2.pb.h>
+
+#include <cstddef>
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <memory> // unique_ptr
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h" // Status
+#include "gutil/macros.h"
+#include "gutil/strings/substitute.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/segment_v2/column_writer.h"
+#include "olap/tablet.h"
+#include "olap/tablet_schema.h"
+#include "util/faststring.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace vectorized {
+class Block;
+class IOlapColumnDataAccessor;
+class OlapBlockDataConvertor;
+} // namespace vectorized
+
+class DataDir;
+class MemTracker;
+class ShortKeyIndexBuilder;
+class PrimaryKeyIndexBuilder;
+class KeyCoder;
+struct RowsetWriterContext;
+
+namespace io {
+class FileWriter;
+} // namespace io
+
+namespace segment_v2 {
+
+struct VerticalSegmentWriterOptions {
+ uint32_t num_rows_per_block = 1024;
+ bool enable_unique_key_merge_on_write = false;
+ CompressionTypePB compression_type = UNKNOWN_COMPRESSION;
+
+ RowsetWriterContext* rowset_ctx = nullptr;
+ DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
+};
+
+struct RowsInBlock {
+ const vectorized::Block* block;
+ size_t row_pos;
+ size_t num_rows;
+};
+
+class VerticalSegmentWriter {
+public:
+ explicit VerticalSegmentWriter(io::FileWriter* file_writer, uint32_t
segment_id,
+ TabletSchemaSPtr tablet_schema,
BaseTabletSPtr tablet,
+ DataDir* data_dir, uint32_t
max_row_per_segment,
+ const VerticalSegmentWriterOptions& opts,
+ std::shared_ptr<MowContext> mow_context);
+ ~VerticalSegmentWriter();
+
+ VerticalSegmentWriter(const VerticalSegmentWriter&) = delete;
+ const VerticalSegmentWriter& operator=(const VerticalSegmentWriter&) =
delete;
+
+ Status init();
+
+ // Add one block to batch, memory is owned by the caller.
+ // The batched blocks will be flushed in write_batch.
+ // Once write_batch is called, no more blocks shoud be added.
+ Status batch_block(const vectorized::Block* block, size_t row_pos, size_t
num_rows);
+ Status write_batch();
+
+ [[nodiscard]] std::string data_dir_path() const {
+ return _data_dir == nullptr ? "" : _data_dir->path();
+ }
+ [[nodiscard]] size_t inverted_index_file_size() const { return
_inverted_index_file_size; }
+ [[nodiscard]] uint32_t num_rows_written() const { return
_num_rows_written; }
+ [[nodiscard]] int64_t num_rows_filtered() const { return
_num_rows_filtered; }
+ [[nodiscard]] uint32_t row_count() const { return _row_count; }
+ [[nodiscard]] uint32_t segment_id() const { return _segment_id; }
+
+ Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
+
+ Status finalize_columns_index(uint64_t* index_size);
+ Status finalize_footer(uint64_t* segment_file_size);
+
+ Slice min_encoded_key();
+ Slice max_encoded_key();
+
+ void clear();
+
+private:
+ void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const
TabletColumn& column);
+ Status _create_column_writer(uint32_t cid, const TabletColumn& column);
+ size_t _calculate_inverted_index_file_size();
+ uint64_t _estimated_remaining_size();
+ Status _write_ordinal_index();
+ Status _write_zone_map();
+ Status _write_bitmap_index();
+ Status _write_inverted_index();
+ Status _write_bloom_filter_index();
+ Status _write_short_key_index();
+ Status _write_primary_key_index();
+ Status _write_footer();
+ Status _write_raw_data(const std::vector<Slice>& slices);
+ void _maybe_invalid_row_cache(const std::string& key) const;
+ std::string _encode_keys(const
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
+ size_t pos);
+ // used for unique-key with merge on write and segment min_max key
+ std::string _full_encode_keys(
+ const std::vector<vectorized::IOlapColumnDataAccessor*>&
key_columns, size_t pos);
+ // used for unique-key with merge on write
+ void _encode_seq_column(const vectorized::IOlapColumnDataAccessor*
seq_column, size_t pos,
+ string* encoded_keys);
+ void _set_min_max_key(const Slice& key);
+ void _set_min_key(const Slice& key);
+ void _set_max_key(const Slice& key);
+ void _serialize_block_to_row_column(vectorized::Block& block);
+ Status _append_block_with_partial_content(RowsInBlock& data);
+ Status _fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
+ const std::vector<bool>&
use_default_or_null_flag,
+ bool has_default_or_nullable, const size_t&
segment_start_pos);
+
+private:
+ uint32_t _segment_id;
+ TabletSchemaSPtr _tablet_schema;
+ BaseTabletSPtr _tablet;
+ DataDir* _data_dir;
+ VerticalSegmentWriterOptions _opts;
+
+ // Not owned. owned by RowsetWriter
+ io::FileWriter* _file_writer;
+
+ SegmentFooterPB _footer;
+ size_t _num_key_columns;
+ size_t _num_short_key_columns;
+ size_t _inverted_index_file_size;
+ std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder;
+ std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder;
+ std::vector<std::unique_ptr<ColumnWriter>> _column_writers;
+ std::unique_ptr<MemTracker> _mem_tracker;
+
+ std::unique_ptr<vectorized::OlapBlockDataConvertor> _olap_data_convertor;
+ // used for building short key index or primary key index during
vectorized write.
+ std::vector<const KeyCoder*> _key_coders;
+ const KeyCoder* _seq_coder = nullptr;
+ std::vector<uint16_t> _key_index_size;
+ size_t _short_key_row_pos = 0;
+
+ // _num_rows_written means row count already written in this current
column group
+ uint32_t _num_rows_written = 0;
+ // number of rows filtered in strict mode partial update
+ int64_t _num_rows_filtered = 0;
+ // _row_count means total row count of this segment
+ // In vertical compaction row count is recorded when key columns group
finish
+ // and _num_rows_written will be updated in value column group
+ uint32_t _row_count = 0;
+
+ bool _is_first_row = true;
+ faststring _min_key;
+ faststring _max_key;
+
+ std::shared_ptr<MowContext> _mow_context;
+ // group every rowset-segment row id to speed up reader
+ PartialUpdateReadPlan _rssid_to_rid;
+ std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
+
+ std::vector<RowsInBlock> _batched_blocks;
+};
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/vec/olap/olap_data_convertor.cpp
b/be/src/vec/olap/olap_data_convertor.cpp
index e9fe19a048d..cc879d24eb3 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -217,9 +217,11 @@ void OlapBlockDataConvertor::set_source_content(const
vectorized::Block* block,
void OlapBlockDataConvertor::set_source_content_with_specifid_columns(
const vectorized::Block* block, size_t row_pos, size_t num_rows,
std::vector<uint32_t> cids) {
- DCHECK(block && num_rows > 0 && row_pos + num_rows <= block->rows() &&
- block->columns() <= _convertors.size());
+ DCHECK(block != nullptr);
+ DCHECK(num_rows > 0);
+ DCHECK(row_pos + num_rows <= block->rows());
for (auto i : cids) {
+ DCHECK(i < _convertors.size());
_convertors[i]->set_source_column(block->get_by_position(i), row_pos,
num_rows);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]