This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2013dcd0e9 [refactor](load) cleanup segment flush logic in beta rowset
writer (#21635)
2013dcd0e9 is described below
commit 2013dcd0e96fb79493884dc933e9f2e6f941986d
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Jul 18 18:17:57 2023 +0800
[refactor](load) cleanup segment flush logic in beta rowset writer (#21635)
---
be/src/olap/memtable_flush_executor.cpp | 7 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 222 +++++++++++++-----------------
be/src/olap/rowset/beta_rowset_writer.h | 47 ++++---
be/src/olap/rowset/rowset_writer.h | 18 +--
be/src/olap/rowset/segcompaction.cpp | 2 +-
be/src/olap/tablet.cpp | 3 +-
6 files changed, 132 insertions(+), 167 deletions(-)
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index cf478393af..70cd74e214 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -99,8 +99,11 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable,
int32_t segment_id, in
int64_t duration_ns;
SCOPED_RAW_TIMER(&duration_ns);
std::unique_ptr<vectorized::Block> block = memtable->to_block();
-
SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_rowset_writer->unfold_variant_column_and_flush_block(
- block.get(), segment_id, memtable->flush_mem_tracker(),
flush_size)));
+ {
+ SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
+ SKIP_MEMORY_CHECK(RETURN_IF_ERROR(
+ _rowset_writer->flush_memtable(block.get(), segment_id,
flush_size)));
+ }
_memtable_stat += memtable->stat();
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns /
1000);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index d99c8e9ee4..4aa7815423 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -89,10 +89,10 @@ BetaRowsetWriter::~BetaRowsetWriter() {
if (!fs) {
return;
}
- auto max_segment_id = std::max(_num_segment.load(),
_next_segment_id.load());
- for (int i = 0; i < max_segment_id; ++i) {
- std::string seg_path = BetaRowset::segment_file_path(
- _context.rowset_dir, _context.rowset_id, _segment_start_id
+ i);
+ DCHECK_LE(_segment_start_id + _num_segment, _next_segment_id);
+ for (int i = _segment_start_id; i < _next_segment_id; ++i) {
+ std::string seg_path =
+ BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, i);
// Even if an error is encountered, these files that have not been
cleaned up
// will be cleaned up by the GC background. So here we only print
the error
// message when we encounter an error.
@@ -134,11 +134,9 @@ Status BetaRowsetWriter::add_block(const
vectorized::Block* block) {
return Status::OK();
}
if (UNLIKELY(_segment_writer == nullptr)) {
- FlushContext ctx;
- ctx.block = block;
- RETURN_IF_ERROR(_create_segment_writer(&_segment_writer, &ctx));
+ RETURN_IF_ERROR(_create_segment_writer(_segment_writer,
allocate_segment_id()));
}
- return _add_block(block, &_segment_writer);
+ return _add_block(block, _segment_writer);
}
Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
@@ -446,46 +444,38 @@ Status
BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
return status;
}
-Status BetaRowsetWriter::_do_add_block(const vectorized::Block* block,
-
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
- size_t row_offset, size_t
input_row_num) {
- auto s = (*segment_writer)->append_block(block, row_offset, input_row_num);
+Status BetaRowsetWriter::_add_rows(const vectorized::Block* block,
+ std::unique_ptr<segment_v2::SegmentWriter>&
segment_writer,
+ size_t row_offset, size_t input_row_num) {
+ auto s = segment_writer->append_block(block, row_offset, input_row_num);
if (UNLIKELY(!s.ok())) {
return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to append block:
{}", s.to_string());
}
+ _raw_num_rows_written += input_row_num;
return Status::OK();
}
Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
-
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
- const FlushContext* flush_ctx) {
+
std::unique_ptr<segment_v2::SegmentWriter>& segment_writer) {
size_t block_size_in_bytes = block->bytes();
size_t block_row_num = block->rows();
size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes /
block_row_num);
size_t row_offset = 0;
- if (flush_ctx != nullptr && flush_ctx->segment_id.has_value()) {
- // the entire block (memtable) should be flushed into single segment
- RETURN_IF_ERROR(_do_add_block(block, segment_writer, 0,
block_row_num));
- _raw_num_rows_written += block_row_num;
- return Status::OK();
- }
-
do {
- auto max_row_add =
(*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
+ auto max_row_add =
segment_writer->max_row_to_add(row_avg_size_in_bytes);
if (UNLIKELY(max_row_add < 1)) {
// no space for another single row, need flush now
RETURN_IF_ERROR(_flush_segment_writer(segment_writer));
- RETURN_IF_ERROR(_create_segment_writer(segment_writer, flush_ctx));
- max_row_add =
(*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
+ RETURN_IF_ERROR(_create_segment_writer(segment_writer,
allocate_segment_id()));
+ max_row_add =
segment_writer->max_row_to_add(row_avg_size_in_bytes);
DCHECK(max_row_add > 0);
}
size_t input_row_num = std::min(block_row_num - row_offset,
size_t(max_row_add));
- RETURN_IF_ERROR(_do_add_block(block, segment_writer, row_offset,
input_row_num));
+ RETURN_IF_ERROR(_add_rows(block, segment_writer, row_offset,
input_row_num));
row_offset += input_row_num;
} while (row_offset < block_row_num);
- _raw_num_rows_written += block_row_num;
return Status::OK();
}
@@ -515,52 +505,45 @@ Status
BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr row
Status BetaRowsetWriter::flush() {
if (_segment_writer != nullptr) {
- RETURN_IF_ERROR(_flush_segment_writer(&_segment_writer));
+ RETURN_IF_ERROR(_flush_segment_writer(_segment_writer));
}
return Status::OK();
}
-Status BetaRowsetWriter::unfold_variant_column_and_flush_block(
- vectorized::Block* block, int32_t segment_id,
- const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t*
flush_size) {
- SCOPED_CONSUME_MEM_TRACKER(flush_mem_tracker);
-
+Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t
segment_id,
+ int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
- FlushContext ctx;
- ctx.block = block;
+ TabletSchemaSPtr flush_schema;
if (_context.tablet_schema->is_dynamic_schema()) {
// Unfold variant column
- RETURN_IF_ERROR(_unfold_variant_column(*block, ctx.flush_schema));
+ RETURN_IF_ERROR(_unfold_variant_column(*block, flush_schema));
+ }
+ {
+ SCOPED_RAW_TIMER(&_segment_writer_ns);
+ RETURN_IF_ERROR(_flush_single_block(block, segment_id, flush_size,
flush_schema));
}
- ctx.segment_id = std::optional<int32_t> {segment_id};
- SCOPED_RAW_TIMER(&_segment_writer_ns);
- RETURN_IF_ERROR(flush_single_block(block, flush_size, &ctx));
RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
RETURN_IF_ERROR(_segcompaction_if_necessary());
return Status::OK();
}
-Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block,
int64* flush_size,
- const FlushContext* ctx) {
+Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block) {
if (block->rows() == 0) {
return Status::OK();
}
+ return _flush_single_block(block, allocate_segment_id());
+}
+Status BetaRowsetWriter::_flush_single_block(const vectorized::Block* block,
int32_t segment_id,
+ int64_t* flush_size,
TabletSchemaSPtr flush_schema) {
std::unique_ptr<segment_v2::SegmentWriter> writer;
- RETURN_IF_ERROR(_create_segment_writer(&writer, ctx));
- segment_v2::SegmentWriter* raw_writer = writer.get();
- int32_t segment_id = writer->get_segment_id();
- RETURN_IF_ERROR(_add_block(block, &writer, ctx));
- // if segment_id is present in flush context,
- // the entire memtable should be flushed into a single segment
- if (ctx != nullptr && ctx->segment_id.has_value()) {
- DCHECK_EQ(writer->get_segment_id(), segment_id);
- DCHECK_EQ(writer.get(), raw_writer);
- }
- RETURN_IF_ERROR(_flush_segment_writer(&writer, flush_size));
+ bool no_compression = block->bytes() <=
config::segment_compression_threshold_kb * 1024;
+ RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression,
flush_schema));
+ RETURN_IF_ERROR(_add_rows(block, writer, 0, block->rows()));
+ RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
return Status::OK();
}
@@ -600,7 +583,7 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const
RowsetMetaSharedPtr& spec_r
RowsetSharedPtr BetaRowsetWriter::build() {
// make sure all segments are flushed
- DCHECK_EQ(_num_segment, _next_segment_id);
+ DCHECK_EQ(_segment_start_id + _num_segment, _next_segment_id);
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& file_writer : _file_writers) {
Status status = file_writer->close();
@@ -762,12 +745,12 @@ RowsetSharedPtr BetaRowsetWriter::_build_tmp() {
return rowset;
}
-Status BetaRowsetWriter::_create_file_writer(std::string path,
io::FileWriterPtr* file_writer) {
+Status BetaRowsetWriter::_create_file_writer(std::string path,
io::FileWriterPtr& file_writer) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed");
}
- Status st = fs->create_file(path, file_writer);
+ Status st = fs->create_file(path, &file_writer);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ",
err: " << st;
return st;
@@ -777,85 +760,71 @@ Status BetaRowsetWriter::_create_file_writer(std::string
path, io::FileWriterPtr
return Status::OK();
}
-Status BetaRowsetWriter::create_file_writer(uint32_t segment_id,
io::FileWriterPtr* file_writer) {
+Status BetaRowsetWriter::create_file_writer(uint32_t segment_id,
io::FileWriterPtr& file_writer) {
std::string path;
path = BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, segment_id);
return _create_file_writer(path, file_writer);
}
-Status BetaRowsetWriter::_create_file_writer(uint32_t begin, uint32_t end,
- io::FileWriterPtr* file_writer) {
- std::string path;
- path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir,
_context.rowset_id,
- begin, end);
- return _create_file_writer(path, file_writer);
+Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
+ std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin,
int64_t end) {
+ DCHECK(begin >= 0 && end >= 0);
+ std::string path =
BetaRowset::local_segment_path_segcompacted(_context.rowset_dir,
+
_context.rowset_id, begin, end);
+ io::FileWriterPtr file_writer;
+ RETURN_IF_ERROR(_create_file_writer(path, file_writer));
+
+ segment_v2::SegmentWriterOptions writer_options;
+ writer_options.enable_unique_key_merge_on_write =
_context.enable_unique_key_merge_on_write;
+ writer_options.rowset_ctx = &_context;
+ writer_options.write_type = _context.write_type;
+ writer_options.write_type = DataWriteType::TYPE_COMPACTION;
+
+ writer->reset(new segment_v2::SegmentWriter(file_writer.get(),
_num_segcompacted,
+ _context.tablet_schema,
_context.tablet,
+ _context.data_dir,
_context.max_rows_per_segment,
+ writer_options,
_context.mow_context));
+ if (_segcompaction_worker.get_file_writer() != nullptr) {
+ _segcompaction_worker.get_file_writer()->close();
+ }
+ _segcompaction_worker.get_file_writer().reset(file_writer.release());
+
+ return Status::OK();
}
-Status BetaRowsetWriter::_do_create_segment_writer(
- std::unique_ptr<segment_v2::SegmentWriter>* writer, bool
is_segcompaction, int64_t begin,
- int64_t end, const FlushContext* flush_ctx) {
- Status st;
- std::string path;
- int32_t segment_id = 0;
+Status
BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
+ int32_t segment_id, bool
no_compression,
+ TabletSchemaSPtr flush_schema)
{
+ RETURN_IF_ERROR(_check_segment_number_limit());
io::FileWriterPtr file_writer;
- if (is_segcompaction) {
- DCHECK(begin >= 0 && end >= 0);
- st = _create_file_writer(begin, end, &file_writer);
- } else {
- int32_t segid_offset = (flush_ctx != nullptr &&
flush_ctx->segment_id.has_value())
- ? flush_ctx->segment_id.value()
- : allocate_segment_id();
- segment_id = segid_offset + _segment_start_id;
- st = create_file_writer(segment_id, &file_writer);
- }
- if (!st.ok()) {
- return st;
- }
+ RETURN_IF_ERROR(create_file_writer(segment_id, file_writer));
segment_v2::SegmentWriterOptions writer_options;
writer_options.enable_unique_key_merge_on_write =
_context.enable_unique_key_merge_on_write;
writer_options.rowset_ctx = &_context;
writer_options.write_type = _context.write_type;
- if (is_segcompaction) {
- writer_options.write_type = DataWriteType::TYPE_COMPACTION;
+ if (no_compression) {
+ writer_options.compression_type = NO_COMPRESSION;
}
- if (is_segcompaction) {
- writer->reset(new segment_v2::SegmentWriter(
- file_writer.get(), _num_segcompacted, _context.tablet_schema,
_context.tablet,
- _context.data_dir, _context.max_rows_per_segment,
writer_options,
- _context.mow_context));
- if (_segcompaction_worker.get_file_writer() != nullptr) {
- _segcompaction_worker.get_file_writer()->close();
- }
- _segcompaction_worker.get_file_writer().reset(file_writer.release());
- } else {
- const auto& tablet_schema = flush_ctx && flush_ctx->flush_schema ?
flush_ctx->flush_schema
- :
_context.tablet_schema;
- if (flush_ctx && flush_ctx->block &&
- flush_ctx->block->bytes() <=
config::segment_compression_threshold_kb * 1024) {
- writer_options.compression_type = NO_COMPRESSION;
- }
- writer->reset(new segment_v2::SegmentWriter(
- file_writer.get(), segment_id, tablet_schema, _context.tablet,
_context.data_dir,
- _context.max_rows_per_segment, writer_options,
_context.mow_context));
- {
- std::lock_guard<SpinLock> l(_lock);
- _file_writers.push_back(std::move(file_writer));
- }
- auto s = (*writer)->init();
- if (!s.ok()) {
- LOG(WARNING) << "failed to init segment writer: " << s.to_string();
- writer->reset(nullptr);
- return s;
- }
+ const auto& tablet_schema = flush_schema ? flush_schema :
_context.tablet_schema;
+ writer.reset(new segment_v2::SegmentWriter(
+ file_writer.get(), segment_id, tablet_schema, _context.tablet,
_context.data_dir,
+ _context.max_rows_per_segment, writer_options,
_context.mow_context));
+ {
+ std::lock_guard<SpinLock> l(_lock);
+ _file_writers.push_back(std::move(file_writer));
+ }
+ auto s = writer->init();
+ if (!s.ok()) {
+ LOG(WARNING) << "failed to init segment writer: " << s.to_string();
+ writer.reset();
+ return s;
}
-
return Status::OK();
}
-Status
BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
writer,
- const FlushContext* flush_ctx)
{
+Status BetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 +
_num_segcompacted;
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
@@ -864,43 +833,42 @@ Status
BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::Segm
_context.tablet_id, _context.rowset_id.to_string(),
config::max_segment_num_per_rowset, _num_segment,
_segcompacted_point,
_num_segcompacted);
- } else {
- return _do_create_segment_writer(writer, false, -1, -1, flush_ctx);
}
+ return Status::OK();
}
-Status
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
writer,
+Status
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
int64_t* flush_size) {
- uint32_t segid = (*writer)->get_segment_id();
- uint32_t row_num = (*writer)->num_rows_written();
+ uint32_t segid = writer->get_segment_id();
+ uint32_t row_num = writer->num_rows_written();
- if ((*writer)->num_rows_written() == 0) {
+ if (writer->num_rows_written() == 0) {
return Status::OK();
}
uint64_t segment_size;
uint64_t index_size;
- Status s = (*writer)->finalize(&segment_size, &index_size);
+ Status s = writer->finalize(&segment_size, &index_size);
if (!s.ok()) {
return Status::Error(s.code(), "failed to finalize segment: {}",
s.to_string());
}
VLOG_DEBUG << "tablet_id:" << _context.tablet_id
- << " flushing filename: " << (*writer)->get_data_dir()->path()
+ << " flushing filename: " << writer->get_data_dir()->path()
<< " rowset_id:" << _context.rowset_id << " segment num:" <<
_num_segment;
KeyBoundsPB key_bounds;
- Slice min_key = (*writer)->min_encoded_key();
- Slice max_key = (*writer)->max_encoded_key();
+ Slice min_key = writer->min_encoded_key();
+ Slice max_key = writer->max_encoded_key();
DCHECK_LE(min_key.compare(max_key), 0);
key_bounds.set_min_key(min_key.to_string());
key_bounds.set_max_key(max_key.to_string());
SegmentStatistics segstat;
segstat.row_num = row_num;
- segstat.data_size = segment_size +
(*writer)->get_inverted_index_file_size();
- segstat.index_size = index_size +
(*writer)->get_inverted_index_file_size();
+ segstat.data_size = segment_size + writer->get_inverted_index_file_size();
+ segstat.index_size = index_size + writer->get_inverted_index_file_size();
segstat.key_bounds = key_bounds;
- writer->reset();
+ writer.reset();
if (flush_size) {
*flush_size = segment_size + index_size;
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index e88efdc000..bb3e55d764 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -84,20 +84,18 @@ public:
Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset)
override;
- Status create_file_writer(uint32_t segment_id, io::FileWriterPtr* writer);
+ Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer);
void add_segment(uint32_t segid, SegmentStatistics& segstat);
Status flush() override;
- Status unfold_variant_column_and_flush_block(
- vectorized::Block* block, int32_t segment_id,
- const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t*
flush_size) override;
+ Status flush_memtable(vectorized::Block* block, int32_t segment_id,
+ int64_t* flush_size) override;
// Return the file size flushed to disk in "flush_size"
// This method is thread-safe.
- Status flush_single_block(const vectorized::Block* block, int64_t*
flush_size,
- const FlushContext* ctx = nullptr) override;
+ Status flush_single_block(const vectorized::Block* block) override;
RowsetSharedPtr build() override;
@@ -129,31 +127,38 @@ public:
Status wait_flying_segcompaction() override;
- void set_segment_start_id(int32_t start_id) override { _segment_start_id =
start_id; }
+ void set_segment_start_id(int32_t start_id) override {
+ _segment_start_id = start_id;
+ _next_segment_id = start_id;
+ }
int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
int64_t segment_writer_ns() override { return _segment_writer_ns; }
private:
- Status _do_add_block(const vectorized::Block* block,
- std::unique_ptr<segment_v2::SegmentWriter>*
segment_writer,
- size_t row_offset, size_t input_row_num);
+ Status _add_rows(const vectorized::Block* block,
+ std::unique_ptr<segment_v2::SegmentWriter>&
segment_writer, size_t row_offset,
+ size_t input_row_num);
Status _add_block(const vectorized::Block* block,
- std::unique_ptr<segment_v2::SegmentWriter>* writer,
- const FlushContext* flush_ctx = nullptr);
-
- Status _create_file_writer(std::string path, io::FileWriterPtr*
file_writer);
- Status _create_file_writer(uint32_t begin, uint32_t end,
io::FileWriterPtr* writer);
- Status
_do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
- bool is_segcompaction, int64_t begin,
int64_t end,
- const FlushContext* ctx = nullptr);
- Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
writer,
- const FlushContext* ctx = nullptr);
- Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
writer,
+ std::unique_ptr<segment_v2::SegmentWriter>& writer);
+
+ Status _create_file_writer(std::string path, io::FileWriterPtr&
file_writer);
+ Status _check_segment_number_limit();
+ Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
+ int32_t segment_id, bool no_compression =
false,
+ TabletSchemaSPtr flush_schema = nullptr);
+ Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
int64_t* flush_size = nullptr);
+ Status _flush_single_block(const vectorized::Block* block, int32_t
segment_id,
+ int64_t* flush_size = nullptr,
+ TabletSchemaSPtr flush_schema = nullptr);
Status _generate_delete_bitmap(int32_t segment_id);
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
+
+ // segment compaction
+ Status _create_segment_writer_for_segcompaction(
+ std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin,
int64_t end);
Status _segcompaction_if_necessary();
Status _segcompaction_ramaining_if_necessary();
Status
_load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 45db4afe34..9a6ae89dd6 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -35,14 +35,6 @@ namespace doris {
class MemTable;
-// Context for single memtable flush
-struct FlushContext {
- ENABLE_FACTORY_CREATOR(FlushContext);
- TabletSchemaSPtr flush_schema = nullptr;
- const vectorized::Block* block = nullptr;
- std::optional<int32_t> segment_id = std::nullopt;
-};
-
class RowsetWriter {
public:
RowsetWriter() = default;
@@ -78,15 +70,13 @@ public:
"RowsetWriter not support final_flush");
}
- virtual Status unfold_variant_column_and_flush_block(
- vectorized::Block* block, int32_t segment_id,
- const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t*
flush_size) {
+ virtual Status flush_memtable(vectorized::Block* block, int32_t segment_id,
+ int64_t* flush_size) {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
- "RowsetWriter not support
unfold_variant_column_and_flush_block");
+ "RowsetWriter not support flush_memtable");
}
- virtual Status flush_single_block(const vectorized::Block* block, int64_t*
flush_size,
- const FlushContext* ctx = nullptr) {
+ virtual Status flush_single_block(const vectorized::Block* block) {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
"RowsetWriter not support flush_single_block");
}
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index 234651a278..cac1e78fa6 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -187,7 +187,7 @@ Status
SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin,
uint64_t end) {
- return _writer->_do_create_segment_writer(writer, true, begin, end);
+ return _writer->_create_segment_writer_for_segcompaction(writer, begin,
end);
}
Status
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr
segments) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9ad6096bfe..e53661dce8 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2977,8 +2977,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr
rowset,
RETURN_IF_ERROR(generate_new_block_for_partial_update(
rowset_schema, read_plan_ori, read_plan_update,
rsid_to_rowset, &block));
sort_block(block, ordered_block);
- int64_t size;
- RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block,
&size));
+ RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
}
LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << "
rowset: " << rowset_id
<< " seg_id: " << seg->id() << " dummy_version: " << end_version
+ 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]