This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 39473cdf481 [performance](load) add vertical segment writer (#24403)
39473cdf481 is described below

commit 39473cdf4819202a9641a1143dba529158d911ef
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 14 11:53:09 2023 +0800

    [performance](load) add vertical segment writer (#24403)
---
 be/src/olap/rowset/segment_creator.cpp             |   90 +-
 be/src/olap/rowset/segment_creator.h               |    8 +
 be/src/olap/rowset/segment_v2/column_writer.cpp    |   17 +-
 be/src/olap/rowset/segment_v2/column_writer.h      |   20 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  | 1008 ++++++++++++++++++++
 .../rowset/segment_v2/vertical_segment_writer.h    |  196 ++++
 be/src/vec/olap/olap_data_convertor.cpp            |    6 +-
 7 files changed, 1311 insertions(+), 34 deletions(-)

diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index 15002350f7f..f40d899bdce 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -30,6 +30,7 @@
 #include "io/fs/file_writer.h"
 #include "olap/rowset/beta_rowset_writer.h" // SegmentStatistics
 #include "olap/rowset/segment_v2/segment_writer.h"
+#include "olap/rowset/segment_v2/vertical_segment_writer.h"
 #include "vec/core/block.h"
 
 namespace doris {
@@ -49,7 +50,7 @@ Status SegmentFlusher::flush_single_block(const 
vectorized::Block* block, int32_
     if (block->rows() == 0) {
         return Status::OK();
     }
-    std::unique_ptr<segment_v2::SegmentWriter> writer;
+    std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
     bool no_compression = block->bytes() <= 
config::segment_compression_threshold_kb * 1024;
     RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, 
flush_schema));
     RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows()));
@@ -73,10 +74,16 @@ Status SegmentFlusher::close() {
 Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& 
segment_writer,
                                  const vectorized::Block* block, size_t 
row_offset,
                                  size_t row_num) {
-    auto s = segment_writer->append_block(block, row_offset, row_num);
-    if (UNLIKELY(!s.ok())) {
-        return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to append block: 
{}", s.to_string());
-    }
+    RETURN_IF_ERROR(segment_writer->append_block(block, row_offset, row_num));
+    _num_rows_written += row_num;
+    return Status::OK();
+}
+
+Status 
SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& 
segment_writer,
+                                 const vectorized::Block* block, size_t 
row_offset,
+                                 size_t row_num) {
+    RETURN_IF_ERROR(segment_writer->batch_block(block, row_offset, row_num));
+    RETURN_IF_ERROR(segment_writer->write_batch());
     _num_rows_written += row_num;
     return Status::OK();
 }
@@ -112,6 +119,79 @@ Status 
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
     return Status::OK();
 }
 
+Status SegmentFlusher::_create_segment_writer(
+        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t 
segment_id,
+        bool no_compression, TabletSchemaSPtr flush_schema) {
+    io::FileWriterPtr file_writer;
+    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, 
file_writer));
+
+    segment_v2::VerticalSegmentWriterOptions writer_options;
+    writer_options.enable_unique_key_merge_on_write = 
_context.enable_unique_key_merge_on_write;
+    writer_options.rowset_ctx = &_context;
+    writer_options.write_type = _context.write_type;
+    if (no_compression) {
+        writer_options.compression_type = NO_COMPRESSION;
+    }
+
+    const auto& tablet_schema = flush_schema ? flush_schema : 
_context.tablet_schema;
+    writer.reset(new segment_v2::VerticalSegmentWriter(
+            file_writer.get(), segment_id, tablet_schema, _context.tablet, 
_context.data_dir,
+            _context.max_rows_per_segment, writer_options, 
_context.mow_context));
+    {
+        std::lock_guard<SpinLock> l(_lock);
+        _file_writers.push_back(std::move(file_writer));
+    }
+    auto s = writer->init();
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
+        writer.reset();
+        return s;
+    }
+    return Status::OK();
+}
+
+Status SegmentFlusher::_flush_segment_writer(
+        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* 
flush_size) {
+    uint32_t row_num = writer->num_rows_written();
+    _num_rows_filtered += writer->num_rows_filtered();
+
+    if (row_num == 0) {
+        return Status::OK();
+    }
+    uint64_t segment_size;
+    uint64_t index_size;
+    Status s = writer->finalize(&segment_size, &index_size);
+    if (!s.ok()) {
+        return Status::Error(s.code(), "failed to finalize segment: {}", 
s.to_string());
+    }
+    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
+               << " flushing filename: " << writer->data_dir_path()
+               << " rowset_id:" << _context.rowset_id;
+
+    KeyBoundsPB key_bounds;
+    Slice min_key = writer->min_encoded_key();
+    Slice max_key = writer->max_encoded_key();
+    DCHECK_LE(min_key.compare(max_key), 0);
+    key_bounds.set_min_key(min_key.to_string());
+    key_bounds.set_max_key(max_key.to_string());
+
+    uint32_t segment_id = writer->segment_id();
+    SegmentStatistics segstat;
+    segstat.row_num = row_num;
+    segstat.data_size = segment_size + writer->inverted_index_file_size();
+    segstat.index_size = index_size + writer->inverted_index_file_size();
+    segstat.key_bounds = key_bounds;
+
+    writer.reset();
+
+    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
+
+    if (flush_size) {
+        *flush_size = segment_size + index_size;
+    }
+    return Status::OK();
+}
+
 Status 
SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
 writer,
                                              int64_t* flush_size) {
     uint32_t row_num = writer->num_rows_written();
diff --git a/be/src/olap/rowset/segment_creator.h 
b/be/src/olap/rowset/segment_creator.h
index 750aa148724..cf5456a28aa 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -35,6 +35,7 @@ class Block;
 
 namespace segment_v2 {
 class SegmentWriter;
+class VerticalSegmentWriter;
 } // namespace segment_v2
 
 struct SegmentStatistics;
@@ -127,11 +128,18 @@ public:
 private:
     Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& 
segment_writer,
                      const vectorized::Block* block, size_t row_offset, size_t 
row_num);
+    Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& 
segment_writer,
+                     const vectorized::Block* block, size_t row_offset, size_t 
row_num);
     Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& 
writer,
                                   int32_t segment_id, bool no_compression = 
false,
                                   TabletSchemaSPtr flush_schema = nullptr);
+    Status 
_create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& 
writer,
+                                  int32_t segment_id, bool no_compression = 
false,
+                                  TabletSchemaSPtr flush_schema = nullptr);
     Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& 
writer,
                                  int64_t* flush_size = nullptr);
+    Status 
_flush_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& 
writer,
+                                 int64_t* flush_size = nullptr);
 
 private:
     RowsetWriterContext _context;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 3891c1235e5..bacf6fa1ce0 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -446,12 +446,7 @@ ScalarColumnWriter::ScalarColumnWriter(const 
ColumnWriterOptions& opts,
 
 ScalarColumnWriter::~ScalarColumnWriter() {
     // delete all pages
-    Page* page = _pages.head;
-    while (page != nullptr) {
-        Page* next_page = page->next;
-        delete page;
-        page = next_page;
-    }
+    _pages.clear();
 }
 
 Status ScalarColumnWriter::init() {
@@ -601,11 +596,10 @@ Status ScalarColumnWriter::finish() {
 }
 
 Status ScalarColumnWriter::write_data() {
-    Page* page = _pages.head;
-    while (page != nullptr) {
-        RETURN_IF_ERROR(_write_data_page(page));
-        page = page->next;
+    for (auto& page : _pages) {
+        RETURN_IF_ERROR(_write_data_page(page.get()));
     }
+    _pages.clear();
     // write column dict
     if (_encoding_info->encoding() == DICT_ENCODING) {
         OwnedSlice dict_body;
@@ -622,6 +616,7 @@ Status ScalarColumnWriter::write_data() {
                 {dict_body.slice()}, footer, &dict_pp));
         dict_pp.to_proto(_opts.meta->mutable_dict_page());
     }
+    _page_builder.reset();
     return Status::OK();
 }
 
@@ -731,7 +726,7 @@ Status ScalarColumnWriter::finish_current_page() {
         page->data.emplace_back(std::move(compressed_body));
     }
 
-    _push_back_page(page.release());
+    _push_back_page(std::move(page));
     _first_rowid = _next_rowid;
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h 
b/be/src/olap/rowset/segment_v2/column_writer.h
index 1bc0afb972b..67cefc3c9ce 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -230,28 +230,16 @@ private:
         // use vector for easier management for lifetime of OwnedSlice
         std::vector<OwnedSlice> data;
         PageFooterPB footer;
-        Page* next = nullptr;
     };
 
-    struct PageHead {
-        Page* head = nullptr;
-        Page* tail = nullptr;
-    };
-
-    void _push_back_page(Page* page) {
-        // add page to pages' tail
-        if (_pages.tail != nullptr) {
-            _pages.tail->next = page;
-        }
-        _pages.tail = page;
-        if (_pages.head == nullptr) {
-            _pages.head = page;
-        }
+    void _push_back_page(std::unique_ptr<Page> page) {
         for (auto& data_slice : page->data) {
             _data_size += data_slice.slice().size;
         }
         // estimate (page footer + footer size + checksum) took 20 bytes
         _data_size += 20;
+        // add page to pages' tail
+        _pages.emplace_back(std::move(page));
     }
 
     Status _write_data_page(Page* page);
@@ -262,7 +250,7 @@ private:
     uint64_t _data_size;
 
     // cached generated pages,
-    PageHead _pages;
+    std::vector<std::unique_ptr<Page>> _pages;
     ordinal_t _first_rowid = 0;
 
     BlockCompressionCodec* _compress_codec;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
new file mode 100644
index 00000000000..760ef3d6c9c
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -0,0 +1,1008 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/vertical_segment_writer.h"
+
+#include <gen_cpp/segment_v2.pb.h>
+#include <parallel_hashmap/phmap.h>
+
+#include <algorithm>
+#include <cassert>
+#include <ostream>
+#include <unordered_map>
+#include <utility>
+
+#include "cloud/config.h"
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
+#include "common/logging.h" // LOG
+#include "gutil/port.h"
+#include "io/fs/file_writer.h"
+#include "olap/data_dir.h"
+#include "olap/key_coder.h"
+#include "olap/olap_common.h"
+#include "olap/primary_key_index.h"
+#include "olap/row_cursor.h"                      // RowCursor // IWYU pragma: 
keep
+#include "olap/rowset/rowset_writer_context.h"    // RowsetWriterContext
+#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
+#include "olap/rowset/segment_v2/page_io.h"
+#include "olap/rowset/segment_v2/page_pointer.h"
+#include "olap/segment_loader.h"
+#include "olap/short_key_index.h"
+#include "olap/tablet_schema.h"
+#include "olap/utils.h"
+#include "runtime/memory/mem_tracker.h"
+#include "service/point_query_executor.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+#include "util/faststring.h"
+#include "util/key_util.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/common/schema_util.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/types.h"
+#include "vec/io/reader_buffer.h"
+#include "vec/jsonb/serialize.h"
+#include "vec/olap/olap_data_convertor.h"
+
+namespace doris {
+namespace segment_v2 {
+
+using namespace ErrorCode;
+
+static const char* k_segment_magic = "D0R1";
+static const uint32_t k_segment_magic_length = 4;
+
+VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, 
uint32_t segment_id,
+                                             TabletSchemaSPtr tablet_schema, 
BaseTabletSPtr tablet,
+                                             DataDir* data_dir, uint32_t 
max_row_per_segment,
+                                             const 
VerticalSegmentWriterOptions& opts,
+                                             std::shared_ptr<MowContext> 
mow_context)
+        : _segment_id(segment_id),
+          _tablet_schema(std::move(tablet_schema)),
+          _tablet(std::move(tablet)),
+          _data_dir(data_dir),
+          _opts(opts),
+          _file_writer(file_writer),
+          
_mem_tracker(std::make_unique<MemTracker>("VerticalSegmentWriter:Segment-" +
+                                                    
std::to_string(segment_id))),
+          _mow_context(std::move(mow_context)) {
+    CHECK_NOTNULL(file_writer);
+    _num_key_columns = _tablet_schema->num_key_columns();
+    _num_short_key_columns = _tablet_schema->num_short_key_columns();
+    DCHECK(_num_key_columns >= _num_short_key_columns);
+    for (size_t cid = 0; cid < _num_key_columns; ++cid) {
+        const auto& column = _tablet_schema->column(cid);
+        _key_coders.push_back(get_key_coder(column.type()));
+        _key_index_size.push_back(column.index_length());
+    }
+    // encode the sequence id into the primary key index
+    if (_tablet_schema->has_sequence_col() && _tablet_schema->keys_type() == 
UNIQUE_KEYS &&
+        _opts.enable_unique_key_merge_on_write) {
+        const auto& column = 
_tablet_schema->column(_tablet_schema->sequence_col_idx());
+        _seq_coder = get_key_coder(column.type());
+    }
+}
+
+VerticalSegmentWriter::~VerticalSegmentWriter() {
+    _mem_tracker->release(_mem_tracker->consumption());
+}
+
+void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t 
column_id,
+                                              const TabletColumn& column) {
+    meta->set_column_id(column_id);
+    meta->set_unique_id(column.unique_id());
+    meta->set_type(int(column.type()));
+    meta->set_length(column.length());
+    meta->set_encoding(DEFAULT_ENCODING);
+    meta->set_compression(_opts.compression_type);
+    meta->set_is_nullable(column.is_nullable());
+    for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
+        _init_column_meta(meta->add_children_columns(), column_id, 
column.get_sub_column(i));
+    }
+}
+
+Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const 
TabletColumn& column) {
+    ColumnWriterOptions opts;
+    opts.meta = _footer.add_columns();
+
+    _init_column_meta(opts.meta, cid, column);
+
+    // now we create zone map for key columns in AGG_KEYS or all column in 
UNIQUE_KEYS or DUP_KEYS
+    // and not support zone map for array type and jsonb type.
+    opts.need_zone_map = (column.is_key() || _tablet_schema->keys_type() != 
KeysType::AGG_KEYS) &&
+                         column.type() != FieldType::OLAP_FIELD_TYPE_OBJECT;
+    opts.need_bloom_filter = column.is_bf_column();
+    auto* tablet_index = 
_tablet_schema->get_ngram_bf_index(column.unique_id());
+    if (tablet_index) {
+        opts.need_bloom_filter = true;
+        opts.is_ngram_bf_index = true;
+        opts.gram_size = tablet_index->get_gram_size();
+        opts.gram_bf_size = tablet_index->get_gram_bf_size();
+    }
+
+    opts.need_bitmap_index = column.has_bitmap_index();
+    bool skip_inverted_index = false;
+    if (_opts.rowset_ctx != nullptr) {
+        // skip write inverted index for index compaction
+        skip_inverted_index = 
_opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0;
+    }
+    // skip write inverted index on load if skip_write_index_on_load is true
+    if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
+        _tablet_schema->skip_write_index_on_load()) {
+        skip_inverted_index = true;
+    }
+    // indexes for this column
+    opts.indexes = _tablet_schema->get_indexes_for_column(column.unique_id());
+    for (auto index : opts.indexes) {
+        if (!skip_inverted_index && index && index->index_type() == 
IndexType::INVERTED) {
+            opts.inverted_index = index;
+            // TODO support multiple inverted index
+            break;
+        }
+    }
+    if (column.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) {
+        opts.need_zone_map = false;
+        if (opts.need_bloom_filter) {
+            return Status::NotSupported("Do not support bloom filter for 
struct type");
+        }
+        if (opts.need_bitmap_index) {
+            return Status::NotSupported("Do not support bitmap index for 
struct type");
+        }
+    }
+    if (column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+        opts.need_zone_map = false;
+        if (opts.need_bloom_filter) {
+            return Status::NotSupported("Do not support bloom filter for array 
type");
+        }
+        if (opts.need_bitmap_index) {
+            return Status::NotSupported("Do not support bitmap index for array 
type");
+        }
+    }
+    if (column.type() == FieldType::OLAP_FIELD_TYPE_JSONB) {
+        opts.need_zone_map = false;
+        if (opts.need_bloom_filter) {
+            return Status::NotSupported("Do not support bloom filter for jsonb 
type");
+        }
+        if (opts.need_bitmap_index) {
+            return Status::NotSupported("Do not support bitmap index for jsonb 
type");
+        }
+    }
+    if (column.type() == FieldType::OLAP_FIELD_TYPE_AGG_STATE) {
+        opts.need_zone_map = false;
+        if (opts.need_bloom_filter) {
+            return Status::NotSupported("Do not support bloom filter for 
agg_state type");
+        }
+        if (opts.need_bitmap_index) {
+            return Status::NotSupported("Do not support bitmap index for 
agg_state type");
+        }
+    }
+    if (column.type() == FieldType::OLAP_FIELD_TYPE_MAP) {
+        opts.need_zone_map = false;
+        if (opts.need_bloom_filter) {
+            return Status::NotSupported("Do not support bloom filter for map 
type");
+        }
+        if (opts.need_bitmap_index) {
+            return Status::NotSupported("Do not support bitmap index for map 
type");
+        }
+    }
+
+    if (column.is_row_store_column()) {
+        // smaller page size for row store column
+        opts.data_page_size = config::row_column_page_size;
+    }
+
+    std::unique_ptr<ColumnWriter> writer;
+    RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, 
&writer));
+    RETURN_IF_ERROR(writer->init());
+    _column_writers.push_back(std::move(writer));
+
+    _olap_data_convertor->add_column_data_convertor(column);
+    return Status::OK();
+};
+
+Status VerticalSegmentWriter::init() {
+    DCHECK(_column_writers.empty());
+    if (_opts.compression_type == UNKNOWN_COMPRESSION) {
+        _opts.compression_type = _tablet_schema->compression_type();
+    }
+    _olap_data_convertor = 
std::make_unique<vectorized::OlapBlockDataConvertor>();
+    _olap_data_convertor->reserve(_tablet_schema->num_columns());
+    _column_writers.reserve(_tablet_schema->columns().size());
+    // we don't need the short key index for unique key merge on write table.
+    if (_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write) {
+        size_t seq_col_length = 0;
+        if (_tablet_schema->has_sequence_col()) {
+            seq_col_length =
+                    
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
+        }
+        _primary_key_index_builder.reset(new 
PrimaryKeyIndexBuilder(_file_writer, seq_col_length));
+        RETURN_IF_ERROR(_primary_key_index_builder->init());
+    } else {
+        _short_key_index_builder.reset(
+                new ShortKeyIndexBuilder(_segment_id, 
_opts.num_rows_per_block));
+    }
+    return Status::OK();
+}
+
+void VerticalSegmentWriter::_maybe_invalid_row_cache(const std::string& key) 
const {
+    // Just invalid row cache for simplicity, since the rowset is not visible 
at present.
+    // If we update/insert cache, if load failed rowset will not be visible 
but cached data
+    // will be visible, and lead to inconsistency.
+    if (!config::disable_storage_row_cache && 
_tablet_schema->store_row_column() &&
+        _opts.write_type == DataWriteType::TYPE_DIRECT) {
+        // invalidate cache
+        RowCache::instance()->erase({_opts.rowset_ctx->tablet_id, key});
+    }
+}
+
+void VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& 
block) {
+    if (block.rows() == 0) {
+        return;
+    }
+    MonotonicStopWatch watch;
+    watch.start();
+    // find row column id
+    int row_column_id = 0;
+    for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
+        if (_tablet_schema->column(i).is_row_store_column()) {
+            row_column_id = i;
+            break;
+        }
+    }
+    if (row_column_id == 0) {
+        return;
+    }
+    auto* row_store_column =
+            
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
+                                                           
.column->assume_mutable_ref()
+                                                           .assume_mutable()
+                                                           .get());
+    row_store_column->clear();
+    vectorized::DataTypeSerDeSPtrs serdes =
+            vectorized::create_data_type_serdes(block.get_data_types());
+    vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, 
*row_store_column,
+                                                   
_tablet_schema->num_columns(), serdes);
+    VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", 
row_column_id:" << row_column_id
+               << ", total_byte_size:" << block.allocated_bytes() << ", 
serialize_cost(us)"
+               << watch.elapsed_time() / 1000;
+}
+
+// for partial update, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all key_column's converted 
slice
+// 2. get pk of input block, and read missing columns
+//       2.1 first find key location{rowset_id, segment_id, row_id}
+//       2.2 build read plan to read by batch
+//       2.3 fill block
+// 3. set columns to data convertor and then write all columns
+Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& 
data) {
+    if (config::cloud_mode) {
+        // TODO(plat1ko)
+        return Status::NotSupported("append_block_with_partial_content");
+    }
+    DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write);
+    DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
+
+    auto tablet = static_cast<Tablet*>(_tablet.get());
+    // create full block and fill with input columns
+    auto full_block = _tablet_schema->create_block();
+    const auto& including_cids = 
_opts.rowset_ctx->partial_update_info->update_cids;
+    size_t input_id = 0;
+    for (auto i : including_cids) {
+        full_block.replace_by_position(i, 
data.block->get_by_position(input_id++).column);
+    }
+    
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, 
data.row_pos,
+                                                                   
data.num_rows, including_cids);
+
+    bool have_input_seq_column = false;
+    // write including columns
+    std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
+    vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+    size_t segment_start_pos;
+    for (auto cid : including_cids) {
+        // here we get segment column row num before append data.
+        segment_start_pos = _column_writers[cid]->get_next_rowid();
+        // olap data convertor alway start from id = 0
+        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+        if (!status.ok()) {
+            return status;
+        }
+        if (cid < _num_key_columns) {
+            key_columns.push_back(column);
+        } else if (_tablet_schema->has_sequence_col() &&
+                   cid == _tablet_schema->sequence_col_idx()) {
+            seq_column = column;
+            have_input_seq_column = true;
+        }
+        RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), 
column->get_data(),
+                                                     data.num_rows));
+    }
+
+    bool has_default_or_nullable = false;
+    std::vector<bool> use_default_or_null_flag;
+    use_default_or_null_flag.reserve(data.num_rows);
+    const vectorized::Int8* delete_sign_column_data = nullptr;
+    if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+                full_block.try_get_by_name(DELETE_SIGN);
+        delete_sign_column != nullptr) {
+        auto& delete_sign_col =
+                reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+        if (delete_sign_col.size() >= data.row_pos + data.num_rows) {
+            delete_sign_column_data = delete_sign_col.get_data().data();
+        }
+    }
+
+    std::vector<RowsetSharedPtr> specified_rowsets;
+    {
+        std::shared_lock rlock(tablet->get_header_lock());
+        specified_rowsets = 
tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
+    }
+    std::vector<std::unique_ptr<SegmentCacheHandle>> 
segment_caches(specified_rowsets.size());
+    // locate rows in base data
+
+    int64_t num_rows_filtered = 0;
+    for (size_t block_pos = data.row_pos; block_pos < data.row_pos + 
data.num_rows; block_pos++) {
+        // block   segment
+        //   2   ->   0
+        //   3   ->   1
+        //   4   ->   2
+        //   5   ->   3
+        // here row_pos = 2, num_rows = 4.
+        size_t delta_pos = block_pos - data.row_pos;
+        size_t segment_pos = segment_start_pos + delta_pos;
+        std::string key = _full_encode_keys(key_columns, delta_pos);
+        if (have_input_seq_column) {
+            _encode_seq_column(seq_column, delta_pos, &key);
+        }
+        // If the table have sequence column, and the include-cids don't 
contain the sequence
+        // column, we need to update the primary key index builder at the end 
of this method.
+        // At that time, we have a valid sequence column to encode the key 
with seq col.
+        if (!_tablet_schema->has_sequence_col() || have_input_seq_column) {
+            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+        }
+        _maybe_invalid_row_cache(key);
+
+        // mark key with delete sign as deleted.
+        bool have_delete_sign =
+                (delete_sign_column_data != nullptr && 
delete_sign_column_data[block_pos] != 0);
+
+        RowLocation loc;
+        // save rowset shared ptr so this rowset wouldn't delete
+        RowsetSharedPtr rowset;
+        auto st = tablet->lookup_row_key(key, have_input_seq_column, 
specified_rowsets, &loc,
+                                         _mow_context->max_version, 
segment_caches, &rowset);
+        if (st.is<KEY_NOT_FOUND>()) {
+            if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
+                ++num_rows_filtered;
+                // delete the invalid newly inserted row
+                _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, 
_segment_id,
+                                                  
DeleteBitmap::TEMP_VERSION_COMMON},
+                                                 segment_pos);
+            }
+
+            if 
(!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update) 
{
+                return Status::InternalError(
+                        "the unmentioned columns should have default value or 
be nullable for "
+                        "newly inserted rows in non-strict mode partial 
update");
+            }
+            has_default_or_nullable = true;
+            use_default_or_null_flag.emplace_back(true);
+            continue;
+        }
+        if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) {
+            LOG(WARNING) << "failed to lookup row key, error: " << st;
+            return st;
+        }
+
+        // if the delete sign is marked, it means that the value columns of 
the row
+        // will not be read. So we don't need to read the missing values from 
the previous rows.
+        // But we still need to mark the previous row on delete bitmap
+        if (have_delete_sign) {
+            has_default_or_nullable = true;
+            use_default_or_null_flag.emplace_back(true);
+        } else {
+            // partial update should not contain invisible columns
+            use_default_or_null_flag.emplace_back(false);
+            _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
+            tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);
+        }
+
+        if (st.is<KEY_ALREADY_EXISTS>()) {
+            // although we need to mark delete current row, we still need to 
read missing columns
+            // for this row, we need to ensure that each column is aligned
+            _mow_context->delete_bitmap->add(
+                    {_opts.rowset_ctx->rowset_id, _segment_id, 
DeleteBitmap::TEMP_VERSION_COMMON},
+                    segment_pos);
+        } else {
+            _mow_context->delete_bitmap->add(
+                    {loc.rowset_id, loc.segment_id, 
DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
+        }
+    }
+    CHECK(use_default_or_null_flag.size() == data.num_rows);
+
+    if (config::enable_merge_on_write_correctness_check) {
+        
tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
+                                                   _mow_context->rowset_ids);
+    }
+
+    // read and fill block
+    auto mutable_full_columns = full_block.mutate_columns();
+    RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, 
use_default_or_null_flag,
+                                          has_default_or_nullable, 
segment_start_pos));
+    // row column should be filled here
+    if (_tablet_schema->store_row_column()) {
+        // convert block to row store format
+        _serialize_block_to_row_column(full_block);
+    }
+
+    // convert missing columns and send to column writer
+    const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
+    
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, 
data.row_pos,
+                                                                   
data.num_rows, missing_cids);
+    for (auto cid : missing_cids) {
+        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+        if (!status.ok()) {
+            return status;
+        }
+        if (_tablet_schema->has_sequence_col() && !have_input_seq_column &&
+            cid == _tablet_schema->sequence_col_idx()) {
+            DCHECK_EQ(seq_column, nullptr);
+            seq_column = column;
+        }
+        RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), 
column->get_data(),
+                                                     data.num_rows));
+    }
+
+    _num_rows_filtered += num_rows_filtered;
+    if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
+        DCHECK_NE(seq_column, nullptr);
+        DCHECK_EQ(_num_rows_written, data.row_pos)
+                << "_num_rows_written: " << _num_rows_written << ", row_pos" 
<< data.row_pos;
+        DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
+                << "primary key index builder num rows(" << 
_primary_key_index_builder->num_rows()
+                << ") not equal to segment writer's num rows written(" << 
_num_rows_written << ")";
+        if (_num_rows_written != data.row_pos ||
+            _primary_key_index_builder->num_rows() != _num_rows_written) {
+            return Status::InternalError(
+                    "Correctness check failed, _num_rows_written: {}, row_pos: 
{}, primary key "
+                    "index builder num rows: {}",
+                    _num_rows_written, data.row_pos, 
_primary_key_index_builder->num_rows());
+        }
+        for (size_t block_pos = data.row_pos; block_pos < data.row_pos + 
data.num_rows;
+             block_pos++) {
+            std::string key = _full_encode_keys(key_columns, block_pos - 
data.row_pos);
+            _encode_seq_column(seq_column, block_pos - data.row_pos, &key);
+            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+        }
+    }
+
+    _num_rows_written += data.num_rows;
+    DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
+            << "primary key index builder num rows(" << 
_primary_key_index_builder->num_rows()
+            << ") not equal to segment writer's num rows written(" << 
_num_rows_written << ")";
+    _olap_data_convertor->clear_source_content();
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_fill_missing_columns(
+        vectorized::MutableColumns& mutable_full_columns,
+        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
+        const size_t& segment_start_pos) {
+    if (config::cloud_mode) [[unlikely]] {
+        return Status::NotSupported("fill_missing_columns");
+    }
+    auto tablet = static_cast<Tablet*>(_tablet.get());
+    // create old value columns
+    const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
+    auto old_value_block = _tablet_schema->create_block_by_cids(missing_cids);
+    CHECK(missing_cids.size() == old_value_block.columns());
+    auto mutable_old_columns = old_value_block.mutate_columns();
+    bool has_row_column = _tablet_schema->store_row_column();
+    // record real pos, key is input line num, value is old_block line num
+    std::map<uint32_t, uint32_t> read_index;
+    size_t read_idx = 0;
+    for (auto rs_it : _rssid_to_rid) {
+        for (auto seg_it : rs_it.second) {
+            auto rowset = _rsid_to_rowset[rs_it.first];
+            CHECK(rowset);
+            std::vector<uint32_t> rids;
+            for (auto id_and_pos : seg_it.second) {
+                rids.emplace_back(id_and_pos.rid);
+                read_index[id_and_pos.pos] = read_idx++;
+            }
+            if (has_row_column) {
+                auto st = tablet->fetch_value_through_row_column(
+                        rowset, *_tablet_schema, seg_it.first, rids, 
missing_cids, old_value_block);
+                if (!st.ok()) {
+                    LOG(WARNING) << "failed to fetch value through row column";
+                    return st;
+                }
+                continue;
+            }
+            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
+                TabletColumn tablet_column = 
_tablet_schema->column(missing_cids[cid]);
+                auto st = tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
+                                                        
mutable_old_columns[cid]);
+                // set read value to output block
+                if (!st.ok()) {
+                    LOG(WARNING) << "failed to fetch value by rowids";
+                    return st;
+                }
+            }
+        }
+    }
+    // build default value columns
+    auto default_value_block = old_value_block.clone_empty();
+    auto mutable_default_value_columns = default_value_block.mutate_columns();
+
+    const vectorized::Int8* delete_sign_column_data = nullptr;
+    if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+                old_value_block.try_get_by_name(DELETE_SIGN);
+        delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
+        auto& delete_sign_col =
+                reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+        delete_sign_column_data = delete_sign_col.get_data().data();
+    }
+
+    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
+        for (auto i = 0; i < missing_cids.size(); ++i) {
+            const auto& column = _tablet_schema->column(missing_cids[i]);
+            if (column.has_default_value()) {
+                auto default_value = 
_tablet_schema->column(missing_cids[i]).default_value();
+                vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
+                                          default_value.size());
+                
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
+                        rb, mutable_default_value_columns[i].get()));
+            }
+        }
+    }
+
+    // fill all missing value from mutable_old_columns, need to consider 
default value and null value
+    for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
+        // `use_default_or_null_flag[idx] == true` doesn't mean that we should 
read values from the old row
+        // for the missing columns. For example, if a table has sequence 
column, the rows with DELETE_SIGN column
+        // marked will not be marked in delete bitmap(see 
https://github.com/apache/doris/pull/24011), so it will
+        // be found in Tablet::lookup_row_key() and 
`use_default_or_null_flag[idx]` will be false. But we should not
+        // read values from old rows for missing values in this occasion. So 
we should read the DELETE_SIGN column
+        // to check if a row REALLY exists in the table.
+        if (use_default_or_null_flag[idx] ||
+            (delete_sign_column_data != nullptr &&
+             delete_sign_column_data[read_index[idx + segment_start_pos]] != 
0)) {
+            for (auto i = 0; i < missing_cids.size(); ++i) {
+                // if the column has default value, fiil it with default value
+                // otherwise, if the column is nullable, fill it with null 
value
+                const auto& tablet_column = 
_tablet_schema->column(missing_cids[i]);
+                if (tablet_column.has_default_value()) {
+                    mutable_full_columns[missing_cids[i]]->insert_from(
+                            *mutable_default_value_columns[i].get(), 0);
+                } else if (tablet_column.is_nullable()) {
+                    auto nullable_column = 
assert_cast<vectorized::ColumnNullable*>(
+                            mutable_full_columns[missing_cids[i]].get());
+                    nullable_column->insert_null_elements(1);
+                } else {
+                    // If the control flow reaches this branch, the column 
neither has default value
+                    // nor is nullable. It means that the row's delete sign is 
marked, and the value
+                    // columns are useless and won't be read. So we can just 
put arbitary values in the cells
+                    mutable_full_columns[missing_cids[i]]->insert_default();
+                }
+            }
+            continue;
+        }
+        auto pos_in_old_block = read_index[idx + segment_start_pos];
+        for (auto i = 0; i < missing_cids.size(); ++i) {
+            mutable_full_columns[missing_cids[i]]->insert_from(
+                    
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
+                    pos_in_old_block);
+        }
+    }
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, 
size_t row_pos,
+                                          size_t num_rows) {
+    if (_opts.rowset_ctx->partial_update_info &&
+        _opts.rowset_ctx->partial_update_info->is_partial_update &&
+        _opts.write_type == DataWriteType::TYPE_DIRECT &&
+        !_opts.rowset_ctx->is_transient_rowset_writer) {
+        if (block->columns() <= _tablet_schema->num_key_columns() ||
+            block->columns() >= _tablet_schema->num_columns()) {
+            return Status::InternalError(fmt::format(
+                    "illegal partial update block columns: {}, num key 
columns: {}, total "
+                    "schema columns: {}",
+                    block->columns(), _tablet_schema->num_key_columns(),
+                    _tablet_schema->num_columns()));
+        }
+    } else if (block->columns() != _tablet_schema->num_columns()) {
+        return Status::InternalError(
+                "illegal block columns, block columns = {}, tablet_schema 
columns = {}",
+                block->columns(), _tablet_schema->num_columns());
+    }
+    _batched_blocks.emplace_back(block, row_pos, num_rows);
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::write_batch() {
+    if (_opts.rowset_ctx->partial_update_info &&
+        _opts.rowset_ctx->partial_update_info->is_partial_update &&
+        _opts.write_type == DataWriteType::TYPE_DIRECT &&
+        !_opts.rowset_ctx->is_transient_rowset_writer) {
+        for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+            RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid)));
+        }
+        for (auto& data : _batched_blocks) {
+            RETURN_IF_ERROR(_append_block_with_partial_content(data));
+        }
+        for (auto& column_writer : _column_writers) {
+            RETURN_IF_ERROR(column_writer->finish());
+            RETURN_IF_ERROR(column_writer->write_data());
+        }
+        return Status::OK();
+    }
+    // Row column should be filled here when it's a directly write from 
memtable
+    // or it's schema change write(since column data type maybe changed, so we 
should reubild)
+    if (_tablet_schema->store_row_column() &&
+        (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+         _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE)) {
+        for (auto& data : _batched_blocks) {
+            // TODO: maybe we should pass range to this method
+            
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(data.block));
+        }
+    }
+
+    std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
+    vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+    for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+        RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid)));
+        for (auto& data : _batched_blocks) {
+            _olap_data_convertor->set_source_content_with_specifid_columns(
+                    data.block, data.row_pos, data.num_rows, 
std::vector<uint32_t> {cid});
+
+            // convert column data from engine format to storage layer format
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(cid);
+            if (!status.ok()) {
+                return status;
+            }
+            if (cid < _num_key_columns) {
+                key_columns.push_back(column);
+            } else if (_tablet_schema->has_sequence_col() &&
+                       cid == _tablet_schema->sequence_col_idx()) {
+                seq_column = column;
+            }
+            
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), 
column->get_data(),
+                                                         data.num_rows));
+            _olap_data_convertor->clear_source_content();
+        }
+        if (_data_dir != nullptr &&
+            
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
+            return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed 
capacity limit.",
+                                                            
_data_dir->path_hash());
+        }
+        RETURN_IF_ERROR(_column_writers[cid]->finish());
+        RETURN_IF_ERROR(_column_writers[cid]->write_data());
+    }
+
+    for (auto& data : _batched_blocks) {
+        _olap_data_convertor->set_source_content(data.block, data.row_pos, 
data.num_rows);
+        // find all row pos for short key indexes
+        std::vector<size_t> short_key_pos;
+        // We build a short key index every `_opts.num_rows_per_block` rows. 
Specifically, we
+        // build a short key index using 1st rows for first block and 
`_short_key_row_pos - _row_count`
+        // for next blocks.
+        if (_short_key_row_pos == 0 && _num_rows_written == 0) {
+            short_key_pos.push_back(0);
+        }
+        while (_short_key_row_pos + _opts.num_rows_per_block < 
_num_rows_written + data.num_rows) {
+            _short_key_row_pos += _opts.num_rows_per_block;
+            short_key_pos.push_back(_short_key_row_pos - _num_rows_written);
+        }
+        if (_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write) {
+            // create primary indexes
+            std::string last_key;
+            for (size_t pos = 0; pos < data.num_rows; pos++) {
+                std::string key = _full_encode_keys(key_columns, pos);
+                if (_tablet_schema->has_sequence_col()) {
+                    _encode_seq_column(seq_column, pos, &key);
+                }
+                DCHECK(key.compare(last_key) > 0)
+                        << "found duplicate key or key is not sorted! current 
key: " << key
+                        << ", last key" << last_key;
+                RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+                _maybe_invalid_row_cache(key);
+                last_key = std::move(key);
+            }
+        } else {
+            // create short key indexes'
+            // for min_max key
+            _set_min_key(_full_encode_keys(key_columns, 0));
+            _set_max_key(_full_encode_keys(key_columns, data.num_rows - 1));
+
+            key_columns.resize(_num_short_key_columns);
+            for (const auto pos : short_key_pos) {
+                
RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, 
pos)));
+            }
+        }
+        _olap_data_convertor->clear_source_content();
+        _num_rows_written += data.num_rows;
+    }
+
+    _batched_blocks.clear();
+    return Status::OK();
+}
+
+std::string VerticalSegmentWriter::_full_encode_keys(
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, 
size_t pos) {
+    assert(_key_index_size.size() == _num_key_columns);
+    assert(key_columns.size() == _num_key_columns && _key_coders.size() == 
_num_key_columns);
+
+    std::string encoded_keys;
+    size_t cid = 0;
+    for (const auto& column : key_columns) {
+        auto field = column->get_data_at(pos);
+        if (UNLIKELY(!field)) {
+            encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
+            ++cid;
+            continue;
+        }
+        encoded_keys.push_back(KEY_NORMAL_MARKER);
+        _key_coders[cid]->full_encode_ascending(field, &encoded_keys);
+        ++cid;
+    }
+    return encoded_keys;
+}
+
+void VerticalSegmentWriter::_encode_seq_column(
+        const vectorized::IOlapColumnDataAccessor* seq_column, size_t pos, 
string* encoded_keys) {
+    const auto* field = seq_column->get_data_at(pos);
+    // To facilitate the use of the primary key index, encode the seq column
+    // to the minimum value of the corresponding length when the seq column
+    // is null
+    if (UNLIKELY(!field)) {
+        encoded_keys->push_back(KEY_NULL_FIRST_MARKER);
+        size_t seq_col_length = 
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length();
+        encoded_keys->append(seq_col_length, KEY_MINIMAL_MARKER);
+        return;
+    }
+    encoded_keys->push_back(KEY_NORMAL_MARKER);
+    _seq_coder->full_encode_ascending(field, encoded_keys);
+}
+
+std::string VerticalSegmentWriter::_encode_keys(
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, 
size_t pos) {
+    assert(key_columns.size() == _num_short_key_columns);
+
+    std::string encoded_keys;
+    size_t cid = 0;
+    for (const auto& column : key_columns) {
+        auto field = column->get_data_at(pos);
+        if (UNLIKELY(!field)) {
+            encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
+            ++cid;
+            continue;
+        }
+        encoded_keys.push_back(KEY_NORMAL_MARKER);
+        _key_coders[cid]->encode_ascending(field, _key_index_size[cid], 
&encoded_keys);
+        ++cid;
+    }
+    return encoded_keys;
+}
+
+// TODO(lingbin): Currently this function does not include the size of various 
indexes,
+// We should make this more precise.
+uint64_t VerticalSegmentWriter::_estimated_remaining_size() {
+    // footer_size(4) + checksum(4) + segment_magic(4)
+    uint64_t size = 12;
+    if (_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write) {
+        size += _primary_key_index_builder->size();
+    } else {
+        size += _short_key_index_builder->size();
+    }
+
+    // update the mem_tracker of segment size
+    _mem_tracker->consume(size - _mem_tracker->consumption());
+    return size;
+}
+
+size_t VerticalSegmentWriter::_calculate_inverted_index_file_size() {
+    size_t total_size = 0;
+    for (auto& column_writer : _column_writers) {
+        total_size += column_writer->get_inverted_index_size();
+    }
+    return total_size;
+}
+
+Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) {
+    uint64_t index_start = _file_writer->bytes_appended();
+    RETURN_IF_ERROR(_write_ordinal_index());
+    RETURN_IF_ERROR(_write_zone_map());
+    RETURN_IF_ERROR(_write_bitmap_index());
+    RETURN_IF_ERROR(_write_inverted_index());
+    RETURN_IF_ERROR(_write_bloom_filter_index());
+
+    *index_size = _file_writer->bytes_appended() - index_start;
+    if (_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write) {
+        RETURN_IF_ERROR(_write_primary_key_index());
+        // IndexedColumnWriter write data pages mixed with segment data, we 
should use
+        // the stat from primary key index builder.
+        *index_size += _primary_key_index_builder->disk_size();
+    } else {
+        RETURN_IF_ERROR(_write_short_key_index());
+        *index_size = _file_writer->bytes_appended() - index_start;
+    }
+    _inverted_index_file_size = _calculate_inverted_index_file_size();
+    // reset all column writers and data_conveter
+    clear();
+
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::finalize_footer(uint64_t* segment_file_size) {
+    RETURN_IF_ERROR(_write_footer());
+    // finish
+    RETURN_IF_ERROR(_file_writer->finalize());
+    *segment_file_size = _file_writer->bytes_appended();
+    if (*segment_file_size == 0) {
+        return Status::Corruption("Bad segment, file size = 0");
+    }
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* 
index_size) {
+    MonotonicStopWatch timer;
+    timer.start();
+    // check disk capacity
+    if (_data_dir != nullptr &&
+        _data_dir->reach_capacity_limit((int64_t)_estimated_remaining_size())) 
{
+        return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed 
capacity limit.",
+                                                        
_data_dir->path_hash());
+    }
+    _row_count = _num_rows_written;
+    _num_rows_written = 0;
+    // write index
+    RETURN_IF_ERROR(finalize_columns_index(index_size));
+    // write footer
+    RETURN_IF_ERROR(finalize_footer(segment_file_size));
+
+    if (timer.elapsed_time() > 5000000000L) {
+        LOG(INFO) << "segment flush consumes a lot time_ns " << 
timer.elapsed_time()
+                  << ", segmemt_size " << *segment_file_size;
+    }
+    return Status::OK();
+}
+
+void VerticalSegmentWriter::clear() {
+    for (auto& column_writer : _column_writers) {
+        column_writer.reset();
+    }
+    _column_writers.clear();
+    _olap_data_convertor.reset();
+}
+
+// write ordinal index after data has been written
+Status VerticalSegmentWriter::_write_ordinal_index() {
+    for (auto& column_writer : _column_writers) {
+        RETURN_IF_ERROR(column_writer->write_ordinal_index());
+    }
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_zone_map() {
+    for (auto& column_writer : _column_writers) {
+        RETURN_IF_ERROR(column_writer->write_zone_map());
+    }
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_bitmap_index() {
+    for (auto& column_writer : _column_writers) {
+        RETURN_IF_ERROR(column_writer->write_bitmap_index());
+    }
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_inverted_index() {
+    for (auto& column_writer : _column_writers) {
+        RETURN_IF_ERROR(column_writer->write_inverted_index());
+    }
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_bloom_filter_index() {
+    for (auto& column_writer : _column_writers) {
+        RETURN_IF_ERROR(column_writer->write_bloom_filter_index());
+    }
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_short_key_index() {
+    std::vector<Slice> body;
+    PageFooterPB footer;
+    RETURN_IF_ERROR(_short_key_index_builder->finalize(_row_count, &body, 
&footer));
+    PagePointer pp;
+    // short key index page is not compressed right now
+    RETURN_IF_ERROR(PageIO::write_page(_file_writer, body, footer, &pp));
+    pp.to_proto(_footer.mutable_short_key_index_page());
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_write_primary_key_index() {
+    CHECK(_primary_key_index_builder->num_rows() == _row_count);
+    return 
_primary_key_index_builder->finalize(_footer.mutable_primary_key_index_meta());
+}
+
+Status VerticalSegmentWriter::_write_footer() {
+    _footer.set_num_rows(_row_count);
+
+    // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), 
MagicNumber(4)
+    std::string footer_buf;
+    if (!_footer.SerializeToString(&footer_buf)) {
+        return Status::InternalError("failed to serialize segment footer");
+    }
+
+    faststring fixed_buf;
+    // footer's size
+    put_fixed32_le(&fixed_buf, footer_buf.size());
+    // footer's checksum
+    uint32_t checksum = crc32c::Value(footer_buf.data(), footer_buf.size());
+    put_fixed32_le(&fixed_buf, checksum);
+    // Append magic number. we don't write magic number in the header because
+    // that will need an extra seek when reading
+    fixed_buf.append(k_segment_magic, k_segment_magic_length);
+
+    std::vector<Slice> slices {footer_buf, fixed_buf};
+    return _write_raw_data(slices);
+}
+
+Status VerticalSegmentWriter::_write_raw_data(const std::vector<Slice>& 
slices) {
+    RETURN_IF_ERROR(_file_writer->appendv(&slices[0], slices.size()));
+    return Status::OK();
+}
+
+Slice VerticalSegmentWriter::min_encoded_key() {
+    return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), 
_min_key.size())
+                                                   : 
_primary_key_index_builder->min_key();
+}
+Slice VerticalSegmentWriter::max_encoded_key() {
+    return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), 
_max_key.size())
+                                                   : 
_primary_key_index_builder->max_key();
+}
+
+void VerticalSegmentWriter::_set_min_max_key(const Slice& key) {
+    if (UNLIKELY(_is_first_row)) {
+        _min_key.append(key.get_data(), key.get_size());
+        _is_first_row = false;
+    }
+    if (key.compare(_max_key) > 0) {
+        _max_key.clear();
+        _max_key.append(key.get_data(), key.get_size());
+    }
+}
+
+void VerticalSegmentWriter::_set_min_key(const Slice& key) {
+    if (UNLIKELY(_is_first_row)) {
+        _min_key.append(key.get_data(), key.get_size());
+        _is_first_row = false;
+    }
+}
+
+void VerticalSegmentWriter::_set_max_key(const Slice& key) {
+    _max_key.clear();
+    _max_key.append(key.get_data(), key.get_size());
+}
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
new file mode 100644
index 00000000000..773751934b8
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/olap_file.pb.h>
+#include <gen_cpp/segment_v2.pb.h>
+
+#include <cstddef>
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <memory> // unique_ptr
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h" // Status
+#include "gutil/macros.h"
+#include "gutil/strings/substitute.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/segment_v2/column_writer.h"
+#include "olap/tablet.h"
+#include "olap/tablet_schema.h"
+#include "util/faststring.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace vectorized {
+class Block;
+class IOlapColumnDataAccessor;
+class OlapBlockDataConvertor;
+} // namespace vectorized
+
+class DataDir;
+class MemTracker;
+class ShortKeyIndexBuilder;
+class PrimaryKeyIndexBuilder;
+class KeyCoder;
+struct RowsetWriterContext;
+
+namespace io {
+class FileWriter;
+} // namespace io
+
+namespace segment_v2 {
+
+struct VerticalSegmentWriterOptions {
+    uint32_t num_rows_per_block = 1024;
+    bool enable_unique_key_merge_on_write = false;
+    CompressionTypePB compression_type = UNKNOWN_COMPRESSION;
+
+    RowsetWriterContext* rowset_ctx = nullptr;
+    DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
+};
+
+struct RowsInBlock {
+    const vectorized::Block* block;
+    size_t row_pos;
+    size_t num_rows;
+};
+
+class VerticalSegmentWriter {
+public:
+    explicit VerticalSegmentWriter(io::FileWriter* file_writer, uint32_t 
segment_id,
+                                   TabletSchemaSPtr tablet_schema, 
BaseTabletSPtr tablet,
+                                   DataDir* data_dir, uint32_t 
max_row_per_segment,
+                                   const VerticalSegmentWriterOptions& opts,
+                                   std::shared_ptr<MowContext> mow_context);
+    ~VerticalSegmentWriter();
+
+    VerticalSegmentWriter(const VerticalSegmentWriter&) = delete;
+    const VerticalSegmentWriter& operator=(const VerticalSegmentWriter&) = 
delete;
+
+    Status init();
+
+    // Add one block to batch, memory is owned by the caller.
+    // The batched blocks will be flushed in write_batch.
+    // Once write_batch is called, no more blocks shoud be added.
+    Status batch_block(const vectorized::Block* block, size_t row_pos, size_t 
num_rows);
+    Status write_batch();
+
+    [[nodiscard]] std::string data_dir_path() const {
+        return _data_dir == nullptr ? "" : _data_dir->path();
+    }
+    [[nodiscard]] size_t inverted_index_file_size() const { return 
_inverted_index_file_size; }
+    [[nodiscard]] uint32_t num_rows_written() const { return 
_num_rows_written; }
+    [[nodiscard]] int64_t num_rows_filtered() const { return 
_num_rows_filtered; }
+    [[nodiscard]] uint32_t row_count() const { return _row_count; }
+    [[nodiscard]] uint32_t segment_id() const { return _segment_id; }
+
+    Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
+
+    Status finalize_columns_index(uint64_t* index_size);
+    Status finalize_footer(uint64_t* segment_file_size);
+
+    Slice min_encoded_key();
+    Slice max_encoded_key();
+
+    void clear();
+
+private:
+    void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const 
TabletColumn& column);
+    Status _create_column_writer(uint32_t cid, const TabletColumn& column);
+    size_t _calculate_inverted_index_file_size();
+    uint64_t _estimated_remaining_size();
+    Status _write_ordinal_index();
+    Status _write_zone_map();
+    Status _write_bitmap_index();
+    Status _write_inverted_index();
+    Status _write_bloom_filter_index();
+    Status _write_short_key_index();
+    Status _write_primary_key_index();
+    Status _write_footer();
+    Status _write_raw_data(const std::vector<Slice>& slices);
+    void _maybe_invalid_row_cache(const std::string& key) const;
+    std::string _encode_keys(const 
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
+                             size_t pos);
+    // used for unique-key with merge on write and segment min_max key
+    std::string _full_encode_keys(
+            const std::vector<vectorized::IOlapColumnDataAccessor*>& 
key_columns, size_t pos);
+    // used for unique-key with merge on write
+    void _encode_seq_column(const vectorized::IOlapColumnDataAccessor* 
seq_column, size_t pos,
+                            string* encoded_keys);
+    void _set_min_max_key(const Slice& key);
+    void _set_min_key(const Slice& key);
+    void _set_max_key(const Slice& key);
+    void _serialize_block_to_row_column(vectorized::Block& block);
+    Status _append_block_with_partial_content(RowsInBlock& data);
+    Status _fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
+                                 const std::vector<bool>& 
use_default_or_null_flag,
+                                 bool has_default_or_nullable, const size_t& 
segment_start_pos);
+
+private:
+    uint32_t _segment_id;
+    TabletSchemaSPtr _tablet_schema;
+    BaseTabletSPtr _tablet;
+    DataDir* _data_dir;
+    VerticalSegmentWriterOptions _opts;
+
+    // Not owned. owned by RowsetWriter
+    io::FileWriter* _file_writer;
+
+    SegmentFooterPB _footer;
+    size_t _num_key_columns;
+    size_t _num_short_key_columns;
+    size_t _inverted_index_file_size;
+    std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder;
+    std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder;
+    std::vector<std::unique_ptr<ColumnWriter>> _column_writers;
+    std::unique_ptr<MemTracker> _mem_tracker;
+
+    std::unique_ptr<vectorized::OlapBlockDataConvertor> _olap_data_convertor;
+    // used for building short key index or primary key index during 
vectorized write.
+    std::vector<const KeyCoder*> _key_coders;
+    const KeyCoder* _seq_coder = nullptr;
+    std::vector<uint16_t> _key_index_size;
+    size_t _short_key_row_pos = 0;
+
+    // _num_rows_written means row count already written in this current 
column group
+    uint32_t _num_rows_written = 0;
+    // number of rows filtered in strict mode partial update
+    int64_t _num_rows_filtered = 0;
+    // _row_count means total row count of this segment
+    // In vertical compaction row count is recorded when key columns group 
finish
+    //  and _num_rows_written will be updated in value column group
+    uint32_t _row_count = 0;
+
+    bool _is_first_row = true;
+    faststring _min_key;
+    faststring _max_key;
+
+    std::shared_ptr<MowContext> _mow_context;
+    // group every rowset-segment row id to speed up reader
+    PartialUpdateReadPlan _rssid_to_rid;
+    std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
+
+    std::vector<RowsInBlock> _batched_blocks;
+};
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/vec/olap/olap_data_convertor.cpp 
b/be/src/vec/olap/olap_data_convertor.cpp
index e9fe19a048d..cc879d24eb3 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -217,9 +217,11 @@ void OlapBlockDataConvertor::set_source_content(const 
vectorized::Block* block,
 void OlapBlockDataConvertor::set_source_content_with_specifid_columns(
         const vectorized::Block* block, size_t row_pos, size_t num_rows,
         std::vector<uint32_t> cids) {
-    DCHECK(block && num_rows > 0 && row_pos + num_rows <= block->rows() &&
-           block->columns() <= _convertors.size());
+    DCHECK(block != nullptr);
+    DCHECK(num_rows > 0);
+    DCHECK(row_pos + num_rows <= block->rows());
     for (auto i : cids) {
+        DCHECK(i < _convertors.size());
         _convertors[i]->set_source_column(block->get_by_position(i), row_pos, 
num_rows);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to