This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cc2d146517f [Fix](segment compaction) fix error using of inverted
index file writer in segment compaction #43114 (#43616)
cc2d146517f is described below
commit cc2d146517fffd3a58cc0981e98f711fe5a13161
Author: airborne12 <[email protected]>
AuthorDate: Mon Nov 11 20:29:40 2024 +0800
[Fix](segment compaction) fix error using of inverted index file writer in
segment compaction #43114 (#43616)
cherry pick from #43114
---
be/src/olap/rowset/beta_rowset_writer.cpp | 7 +++-
be/src/olap/rowset/beta_rowset_writer.h | 4 +-
be/src/olap/rowset/segcompaction.cpp | 2 +-
be/src/olap/rowset/segcompaction.h | 5 +++
be/test/olap/segcompaction_test.cpp | 64 +++++++++++++++++++++++++++++++
5 files changed, 78 insertions(+), 4 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index a19989af1e7..634e8b64429 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -948,7 +948,7 @@ Status
BaseBetaRowsetWriter::create_inverted_index_file_writer(
return Status::OK();
}
-Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
+Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin,
int64_t end) {
DCHECK(begin >= 0 && end >= 0);
std::string path =
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
@@ -988,6 +988,11 @@ Status
BetaRowsetWriter::_create_segment_writer_for_segcompaction(
RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
}
_segcompaction_worker->get_file_writer().reset(file_writer.release());
+ if (auto& idx_file_writer =
_segcompaction_worker->get_inverted_index_file_writer();
+ idx_file_writer != nullptr) {
+ RETURN_IF_ERROR(idx_file_writer->close());
+ }
+
_segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
return Status::OK();
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 6063f714177..ca2685f5956 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -280,6 +280,8 @@ public:
Status flush_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t
index_size,
KeyBoundsPB& key_bounds);
+ Status create_segment_writer_for_segcompaction(
+ std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin,
int64_t end);
bool is_segcompacted() const { return _num_segcompacted > 0; }
@@ -290,8 +292,6 @@ private:
Status _check_segment_number_limit(size_t segnum) override;
int64_t _num_seg() const override;
Status _wait_flying_segcompaction();
- Status _create_segment_writer_for_segcompaction(
- std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin,
int64_t end);
Status _segcompaction_if_necessary();
Status _segcompaction_rename_last_segments();
Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id);
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index fc8baf952c1..92b903d3a90 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -219,7 +219,7 @@ Status
SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin,
uint32_t end) {
- return _writer->_create_segment_writer_for_segcompaction(writer, begin,
end);
+ return _writer->create_segment_writer_for_segcompaction(writer, begin,
end);
}
Status
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr
segments) {
diff --git a/be/src/olap/rowset/segcompaction.h
b/be/src/olap/rowset/segcompaction.h
index 67dd6889aad..f0f8aa6b257 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -25,6 +25,7 @@
#include "olap/merger.h"
#include "olap/simple_rowid_conversion.h"
#include "olap/tablet.h"
+#include "segment_v2/inverted_index_file_writer.h"
#include "segment_v2/segment.h"
namespace doris {
@@ -61,6 +62,9 @@ public:
DeleteBitmapPtr get_converted_delete_bitmap() { return
_converted_delete_bitmap; }
io::FileWriterPtr& get_file_writer() { return _file_writer; }
+ InvertedIndexFileWriterPtr& get_inverted_index_file_writer() {
+ return _inverted_index_file_writer;
+ }
// set the cancel flag, tasks already started will not be cancelled.
bool cancel();
@@ -86,6 +90,7 @@ private:
// Currently cloud storage engine doesn't need segcompaction
BetaRowsetWriter* _writer = nullptr;
io::FileWriterPtr _file_writer;
+ InvertedIndexFileWriterPtr _inverted_index_file_writer = nullptr;
// for unique key mow table
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
diff --git a/be/test/olap/segcompaction_test.cpp
b/be/test/olap/segcompaction_test.cpp
index ba0d23acb02..32d724d246b 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -34,6 +34,7 @@
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
+#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
@@ -178,6 +179,24 @@ protected:
tablet_schema->init_from_pb(tablet_schema_pb);
}
+ void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index,
int64_t index_id,
+ const std::string& index_name, int32_t col_unique_id,
+ const std::string& column_type, const std::string&
column_name,
+ bool parser = false) {
+ column_pb->set_unique_id(col_unique_id);
+ column_pb->set_name(column_name);
+ column_pb->set_type(column_type);
+ column_pb->set_is_key(false);
+ column_pb->set_is_nullable(true);
+ tablet_index->set_index_id(index_id);
+ tablet_index->set_index_name(index_name);
+ tablet_index->set_index_type(IndexType::INVERTED);
+ tablet_index->add_col_unique_id(col_unique_id);
+ if (parser) {
+ auto* properties = tablet_index->mutable_properties();
+ (*properties)[INVERTED_INDEX_PARSER_KEY] =
INVERTED_INDEX_PARSER_UNICODE;
+ }
+ }
// use different id to avoid conflict
void create_rowset_writer_context(int64_t id, TabletSchemaSPtr
tablet_schema,
RowsetWriterContext*
rowset_writer_context) {
@@ -830,6 +849,51 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadUniqueTableSmall) {
}
}
+TEST_F(SegCompactionTest, CreateSegCompactionWriter) {
+ config::enable_segcompaction = true;
+ Status s;
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ TabletSchemaPB schema_pb;
+ schema_pb.set_keys_type(KeysType::DUP_KEYS);
+
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
+
+ construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000,
"key_index", 0, "INT",
+ "key");
+ construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001,
"v1_index", 1, "STRING",
+ "v1");
+ construct_column(schema_pb.add_column(), schema_pb.add_index(), 10002,
"v2_index", 2, "STRING",
+ "v2", true);
+ construct_column(schema_pb.add_column(), schema_pb.add_index(), 10003,
"v3_index", 3, "INT",
+ "v3");
+
+ tablet_schema.reset(new TabletSchema);
+ tablet_schema->init_from_pb(schema_pb);
+ RowsetSharedPtr rowset;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 3;
+ std::vector<uint32_t> segment_num_rows;
+ {
+ RowsetWriterContext writer_context;
+ create_rowset_writer_context(10052, tablet_schema, &writer_context);
+
+ auto res = RowsetFactory::create_rowset_writer(*l_engine,
writer_context, false);
+ EXPECT_TRUE(res.has_value()) << res.error();
+ auto rowset_writer = std::move(res).value();
+ EXPECT_EQ(Status::OK(), s);
+ auto beta_rowset_writer =
dynamic_cast<BetaRowsetWriter*>(rowset_writer.get());
+ EXPECT_TRUE(beta_rowset_writer != nullptr);
+ std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+ auto status =
beta_rowset_writer->create_segment_writer_for_segcompaction(&writer, 0, 1);
+ EXPECT_TRUE(beta_rowset_writer != nullptr);
+ EXPECT_TRUE(status == Status::OK());
+ int64_t inverted_index_file_size = 0;
+ status = writer->close_inverted_index(&inverted_index_file_size);
+ EXPECT_TRUE(status == Status::OK());
+ std::cout << inverted_index_file_size << std::endl;
+ }
+}
+
TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
config::enable_segcompaction = true;
Status s;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]