This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 397b0771e20 [opt] (inverted index) add inverted index file size for
open file (#37482)
397b0771e20 is described below
commit 397b0771e2022904d725dc79f245130cd68950a7
Author: Sun Chenyang <[email protected]>
AuthorDate: Fri Aug 16 19:40:44 2024 +0800
[opt] (inverted index) add inverted index file size for open file (#37482)
---
be/src/cloud/cloud_rowset_writer.cpp | 8 +
be/src/cloud/pb_convert.cpp | 8 +
be/src/olap/compaction.cpp | 5 +-
be/src/olap/rowset/beta_rowset.cpp | 6 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 14 +-
be/src/olap/rowset/beta_rowset_writer.h | 60 +++++++
be/src/olap/rowset/beta_rowset_writer_v2.cpp | 2 +-
be/src/olap/rowset/beta_rowset_writer_v2.h | 2 +
be/src/olap/rowset/rowset_meta.cpp | 30 ++++
be/src/olap/rowset/rowset_meta.h | 10 ++
be/src/olap/rowset/segment_creator.cpp | 20 ++-
be/src/olap/rowset/segment_creator.h | 8 +-
.../segment_v2/inverted_index_compaction.cpp | 1 -
.../segment_v2/inverted_index_compound_reader.cpp | 117 ++++++------
.../segment_v2/inverted_index_compound_reader.h | 29 ++-
.../segment_v2/inverted_index_file_reader.cpp | 93 ++++++----
.../rowset/segment_v2/inverted_index_file_reader.h | 11 +-
.../segment_v2/inverted_index_file_writer.cpp | 21 ++-
.../rowset/segment_v2/inverted_index_file_writer.h | 16 +-
.../segment_v2/inverted_index_fs_directory.cpp | 17 +-
.../segment_v2/inverted_index_fs_directory.h | 2 +-
be/src/olap/rowset/segment_v2/segment.cpp | 15 +-
be/src/olap/rowset/segment_v2/segment.h | 7 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 16 +-
be/src/olap/rowset/segment_v2/segment_writer.h | 7 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 19 +-
.../rowset/segment_v2/vertical_segment_writer.h | 9 +-
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 6 +-
be/src/olap/task/index_builder.cpp | 4 +-
.../segment_v2/inverted_index_array_test.cpp | 9 +-
gensrc/proto/olap_common.proto | 12 +-
gensrc/proto/olap_file.proto | 6 +
.../test_compound_reader_fault_injection.out | 4 +
.../test_inverted_index_file_size.out | 49 +++++
.../test_inverted_index_v2_file_size.out | 85 +++++++++
.../test_compound_reader_fault_injection.groovy | 62 +++++++
.../test_inverted_index_file_size.groovy | 145 +++++++++++++++
.../test_inverted_index_v2_file_size.groovy | 200 +++++++++++++++++++++
38 files changed, 946 insertions(+), 189 deletions(-)
diff --git a/be/src/cloud/cloud_rowset_writer.cpp
b/be/src/cloud/cloud_rowset_writer.cpp
index ad5c57fd21e..7753bf7b65b 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -115,6 +115,14 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}
+ if (auto idx_files_info =
_idx_files_info.get_inverted_files_info(_segment_start_id);
+ !idx_files_info.has_value()) [[unlikely]] {
+ LOG(ERROR) << "expected inverted index files info, but none presents: "
+ << idx_files_info.error();
+ } else {
+ _rowset_meta->add_inverted_index_files_info(idx_files_info.value());
+ }
+
RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema,
_context.tablet_path,
_rowset_meta,
&rowset),
"rowset init failed when build new rowset");
diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp
index 24bdadead33..d5342186541 100644
--- a/be/src/cloud/pb_convert.cpp
+++ b/be/src/cloud/pb_convert.cpp
@@ -82,6 +82,8 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const
RowsetMetaPB& in)
}
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_has_variant_type_in_schema(in.has_has_variant_type_in_schema());
+
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
+
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
}
void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
@@ -132,6 +134,8 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out,
RowsetMetaPB&& in) {
}
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_has_variant_type_in_schema(in.has_variant_type_in_schema());
+
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
+
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
}
RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) {
@@ -190,6 +194,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const
RowsetMetaCloudPB& in)
out->set_schema_version(in.schema_version());
}
out->set_enable_segments_file_size(in.enable_segments_file_size());
+
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
+
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
}
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
@@ -237,6 +243,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out,
RowsetMetaCloudPB&& in) {
out->set_schema_version(in.schema_version());
}
out->set_enable_segments_file_size(in.enable_segments_file_size());
+
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
+
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
}
TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) {
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 9ed27bad382..9109c59e8c2 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -763,15 +763,17 @@ Status Compaction::do_inverted_index_compaction() {
}
}
+ std::vector<InvertedIndexFileInfo>
all_inverted_index_file_info(dest_segment_num);
uint64_t inverted_index_file_size = 0;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
auto inverted_index_file_writer =
inverted_index_file_writers[seg_id].get();
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
} else {
- inverted_index_file_size +=
inverted_index_file_writer->get_index_file_size();
+ inverted_index_file_size +=
inverted_index_file_writer->get_index_file_total_size();
inverted_index_file_size -= compacted_idx_file_size[seg_id];
}
+ all_inverted_index_file_info[seg_id] =
inverted_index_file_writer->get_index_file_info();
}
// check index compaction status. If status is not ok, we should return
error and end this compaction round.
if (!status.ok()) {
@@ -786,6 +788,7 @@ Status Compaction::do_inverted_index_compaction() {
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size()
+
inverted_index_file_size);
+
_output_rowset->rowset_meta()->update_inverted_index_files_info(all_inverted_index_file_info);
COUNTER_UPDATE(_output_rowset_data_size_counter,
_output_rowset->data_disk_size());
LOG(INFO) << "succeed to do index compaction"
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 832ca314088..b269051e43f 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -180,7 +180,7 @@ Status BetaRowset::load_segment(int64_t seg_id,
segment_v2::SegmentSharedPtr* se
};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema, reader_options,
- segment);
+ segment,
_rowset_meta->inverted_index_file_info(seg_id));
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << " under
rowset " << rowset_id()
<< " : " << s.to_string();
@@ -538,8 +538,10 @@ Status BetaRowset::check_current_rowset_segment() {
.cache_base_path {},
.file_size = _rowset_meta->segment_file_size(seg_id),
};
+
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema,
- reader_options, &segment);
+ reader_options, &segment,
+
_rowset_meta->inverted_index_file_info(seg_id));
if (!s.ok()) {
LOG(WARNING) << "segment can not be opened. file=" << seg_path;
return s;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index ec1bba7621b..45f260bdfa1 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -195,7 +195,7 @@ BaseBetaRowsetWriter::BaseBetaRowsetWriter()
_num_rows_written(0),
_total_data_size(0),
_total_index_size(0),
- _segment_creator(_context, _seg_files) {}
+ _segment_creator(_context, _seg_files, _idx_files_info) {}
BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
: _engine(engine),
_segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {}
@@ -737,6 +737,14 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
:
_context.tablet_schema;
_rowset_meta->set_tablet_schema(rowset_schema);
+ if (auto idx_files_info =
_idx_files_info.get_inverted_files_info(_segment_start_id);
+ !idx_files_info.has_value()) [[unlikely]] {
+ LOG(ERROR) << "expected inverted index files info, but none presents: "
+ << idx_files_info.error();
+ } else {
+ _rowset_meta->add_inverted_index_files_info(idx_files_info.value());
+ }
+
RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema,
_context.tablet_path,
_rowset_meta,
&rowset),
"rowset init failed when build new rowset");
@@ -989,8 +997,8 @@ Status
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
SegmentStatistics segstat;
segstat.row_num = row_num;
- segstat.data_size = segment_size +
(*writer)->get_inverted_index_file_size();
- segstat.index_size = index_size +
(*writer)->get_inverted_index_file_size();
+ segstat.data_size = segment_size +
(*writer)->get_inverted_index_total_size();
+ segstat.index_size = index_size +
(*writer)->get_inverted_index_total_size();
segstat.key_bounds = key_bounds;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index e51fccdc291..a7ec8fe87e9 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -18,6 +18,7 @@
#pragma once
#include <fmt/format.h>
+#include <gen_cpp/olap_common.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <algorithm>
@@ -83,6 +84,60 @@ private:
bool _closed {false};
};
+// Collect the size of the inverted index files
+class InvertedIndexFilesInfo {
+public:
+ // Get inverted index file info in segment id order.
+ // Return the info of inverted index files from seg_id_offset to the last
one.
+ Result<std::vector<InvertedIndexFileInfo>> get_inverted_files_info(int
seg_id_offset) {
+ std::lock_guard lock(_lock);
+
+ Status st;
+ std::vector<InvertedIndexFileInfo>
inverted_files_info(_inverted_index_files_info.size());
+ bool succ = std::all_of(
+ _inverted_index_files_info.begin(),
_inverted_index_files_info.end(),
+ [&](auto&& it) {
+ auto&& [seg_id, info] = it;
+
+ int idx = seg_id - seg_id_offset;
+ if (idx >= inverted_files_info.size()) [[unlikely]] {
+ auto err_msg = fmt::format(
+ "invalid seg_id={} num_inverted_files_info={}
seg_id_offset={}",
+ seg_id, inverted_files_info.size(),
seg_id_offset);
+ DCHECK(false) << err_msg;
+ st = Status::InternalError(err_msg);
+ return false;
+ }
+
+ auto& finfo = inverted_files_info[idx];
+ if (finfo.has_index_size() || finfo.index_info_size() > 0)
[[unlikely]] {
+ // File size should not been set
+ auto err_msg = fmt::format("duplicate seg_id={}",
seg_id);
+ DCHECK(false) << err_msg;
+ st = Status::InternalError(err_msg);
+ return false;
+ }
+ finfo = info;
+ return true;
+ });
+
+ if (succ) {
+ return inverted_files_info;
+ }
+
+ return ResultError(st);
+ }
+
+ void add_file_info(int seg_id, InvertedIndexFileInfo file_info) {
+ std::lock_guard lock(_lock);
+ _inverted_index_files_info.emplace(seg_id, file_info);
+ }
+
+private:
+ std::unordered_map<int /* seg_id */, InvertedIndexFileInfo>
_inverted_index_files_info;
+ mutable SpinLock _lock;
+};
+
class BaseBetaRowsetWriter : public RowsetWriter {
public:
BaseBetaRowsetWriter();
@@ -160,6 +215,8 @@ public:
return _seg_files.get_file_writers();
}
+ InvertedIndexFilesInfo& get_inverted_index_files_info() { return
_idx_files_info; }
+
private:
void update_rowset_schema(TabletSchemaSPtr flush_schema);
// build a tmp rowset for load segment to calc delete_bitmap
@@ -212,6 +269,9 @@ protected:
int64_t _delete_bitmap_ns = 0;
int64_t _segment_writer_ns = 0;
+
+ // map<segment_id, inverted_index_file_info>
+ InvertedIndexFilesInfo _idx_files_info;
};
class SegcompactionWorker;
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index 95adf3d6e50..0d0ad435b9e 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -58,7 +58,7 @@ namespace doris {
using namespace ErrorCode;
BetaRowsetWriterV2::BetaRowsetWriterV2(const
std::vector<std::shared_ptr<LoadStreamStub>>& streams)
- : _segment_creator(_context, _seg_files), _streams(streams) {}
+ : _segment_creator(_context, _seg_files, _idx_files_info),
_streams(streams) {}
BetaRowsetWriterV2::~BetaRowsetWriterV2() = default;
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index d2267a3dbd1..174b70a072b 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -157,6 +157,8 @@ private:
SegmentCreator _segment_creator;
+ InvertedIndexFilesInfo _idx_files_info;
+
fmt::memory_buffer vlog_buffer;
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
diff --git a/be/src/olap/rowset/rowset_meta.cpp
b/be/src/olap/rowset/rowset_meta.cpp
index b969db7a2a2..2bc5a6cef85 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -233,6 +233,13 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta&
other) {
_rowset_meta_pb.add_segments_file_size(fsize);
}
}
+ if (_rowset_meta_pb.enable_inverted_index_file_info() &&
+ other._rowset_meta_pb.enable_inverted_index_file_info()) {
+ for (auto finfo : other.inverted_index_file_info()) {
+ InvertedIndexFileInfo* new_file_info =
_rowset_meta_pb.add_inverted_index_file_info();
+ *new_file_info = finfo;
+ }
+ }
// In partial update the rowset schema maybe updated when table contains
variant type, so we need the newest schema to be updated
// Otherwise the schema is stale and lead to wrong data read
if (tablet_schema()->num_variant_columns() > 0) {
@@ -249,6 +256,29 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta&
other) {
}
}
+InvertedIndexFileInfo RowsetMeta::inverted_index_file_info(int seg_id) {
+ return _rowset_meta_pb.enable_inverted_index_file_info()
+ ? (_rowset_meta_pb.inverted_index_file_info_size() > seg_id
+ ?
_rowset_meta_pb.inverted_index_file_info(seg_id)
+ : InvertedIndexFileInfo())
+ : InvertedIndexFileInfo();
+}
+
+void RowsetMeta::add_inverted_index_files_info(
+ const std::vector<InvertedIndexFileInfo>& idx_file_info) {
+ _rowset_meta_pb.set_enable_inverted_index_file_info(true);
+ for (auto finfo : idx_file_info) {
+ auto* new_file_info = _rowset_meta_pb.add_inverted_index_file_info();
+ *new_file_info = finfo;
+ }
+}
+
+void RowsetMeta::update_inverted_index_files_info(
+ const std::vector<InvertedIndexFileInfo>& idx_file_info) {
+ _rowset_meta_pb.clear_inverted_index_file_info();
+ add_inverted_index_files_info(idx_file_info);
+}
+
bool operator==(const RowsetMeta& a, const RowsetMeta& b) {
if (a._rowset_id != b._rowset_id) return false;
if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta)
return false;
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index c5a573d760c..4f25c676f6b 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -357,6 +357,16 @@ public:
// Used for partial update, when publish, partial update may add a new
rowset and we should update rowset meta
void merge_rowset_meta(const RowsetMeta& other);
+ InvertedIndexFileInfo inverted_index_file_info(int seg_id);
+
+ const auto& inverted_index_file_info() const {
+ return _rowset_meta_pb.inverted_index_file_info();
+ }
+
+ void add_inverted_index_files_info(const
std::vector<InvertedIndexFileInfo>& idx_file_info);
+
+ void update_inverted_index_files_info(const
std::vector<InvertedIndexFileInfo>& idx_file_info);
+
// Because the member field '_handle' is a raw pointer, use member func
'init' to replace copy ctor
RowsetMeta(const RowsetMeta&) = delete;
RowsetMeta operator=(const RowsetMeta&) = delete;
diff --git a/be/src/olap/rowset/segment_creator.cpp
b/be/src/olap/rowset/segment_creator.cpp
index 5e5dfc3733f..d969f5b3904 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -51,8 +51,9 @@
namespace doris {
using namespace ErrorCode;
-SegmentFlusher::SegmentFlusher(RowsetWriterContext& context,
SegmentFileCollection& seg_files)
- : _context(context), _seg_files(seg_files) {}
+SegmentFlusher::SegmentFlusher(RowsetWriterContext& context,
SegmentFileCollection& seg_files,
+ InvertedIndexFilesInfo& idx_files_info)
+ : _context(context), _seg_files(seg_files),
_idx_files_info(idx_files_info) {}
SegmentFlusher::~SegmentFlusher() = default;
@@ -243,10 +244,11 @@ Status SegmentFlusher::_flush_segment_writer(
uint32_t segment_id = writer->segment_id();
SegmentStatistics segstat;
segstat.row_num = row_num;
- segstat.data_size = segment_size + writer->inverted_index_file_size();
- segstat.index_size = index_size + writer->inverted_index_file_size();
+ segstat.data_size = segment_size + writer->get_inverted_index_total_size();
+ segstat.index_size = index_size + writer->get_inverted_index_total_size();
segstat.key_bounds = key_bounds;
+ _idx_files_info.add_file_info(segment_id,
writer->get_inverted_index_file_info());
writer.reset();
RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat,
flush_schema));
@@ -288,10 +290,11 @@ Status
SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
uint32_t segment_id = writer->get_segment_id();
SegmentStatistics segstat;
segstat.row_num = row_num;
- segstat.data_size = segment_size + writer->get_inverted_index_file_size();
- segstat.index_size = index_size + writer->get_inverted_index_file_size();
+ segstat.data_size = segment_size + writer->get_inverted_index_total_size();
+ segstat.index_size = index_size + writer->get_inverted_index_total_size();
segstat.key_bounds = key_bounds;
+ _idx_files_info.add_file_info(segment_id,
writer->get_inverted_index_file_info());
writer.reset();
RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat,
flush_schema));
@@ -325,8 +328,9 @@ int64_t SegmentFlusher::Writer::max_row_to_add(size_t
row_avg_size_in_bytes) {
return _writer->max_row_to_add(row_avg_size_in_bytes);
}
-SegmentCreator::SegmentCreator(RowsetWriterContext& context,
SegmentFileCollection& seg_files)
- : _segment_flusher(context, seg_files) {}
+SegmentCreator::SegmentCreator(RowsetWriterContext& context,
SegmentFileCollection& seg_files,
+ InvertedIndexFilesInfo& idx_files_info)
+ : _segment_flusher(context, seg_files, idx_files_info) {}
Status SegmentCreator::add_block(const vectorized::Block* block) {
if (block->rows() == 0) {
diff --git a/be/src/olap/rowset/segment_creator.h
b/be/src/olap/rowset/segment_creator.h
index 97a8f177ad9..961e161853c 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -46,6 +46,7 @@ class VerticalSegmentWriter;
struct SegmentStatistics;
class BetaRowsetWriter;
class SegmentFileCollection;
+class InvertedIndexFilesInfo;
class FileWriterCreator {
public:
@@ -93,7 +94,8 @@ private:
class SegmentFlusher {
public:
- SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection&
seg_files);
+ SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection&
seg_files,
+ InvertedIndexFilesInfo& idx_files_info);
~SegmentFlusher();
@@ -158,6 +160,7 @@ private:
private:
RowsetWriterContext& _context;
SegmentFileCollection& _seg_files;
+ InvertedIndexFilesInfo& _idx_files_info;
// written rows by add_block/add_row
std::atomic<int64_t> _num_rows_written = 0;
@@ -169,7 +172,8 @@ private:
class SegmentCreator {
public:
- SegmentCreator(RowsetWriterContext& context, SegmentFileCollection&
seg_files);
+ SegmentCreator(RowsetWriterContext& context, SegmentFileCollection&
seg_files,
+ InvertedIndexFilesInfo& idx_files_info);
~SegmentCreator() = default;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index 33fcd10ef36..e47189f9137 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -40,7 +40,6 @@ Status compact_column(int64_t index_id,
std::vector<lucene::store::Directory*>&
"debug point: index compaction error");
}
})
-
lucene::store::Directory* dir =
DorisFSDirectoryFactory::getDirectory(io::global_local_filesystem(),
tmp_path.data());
lucene::analysis::SimpleAnalyzer<char> analyzer;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 67c3ac5253f..7613df112ed 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -32,6 +32,7 @@
#include "CLucene/SharedHeader.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/tablet_schema.h"
+#include "util/debug_points.h"
namespace doris {
namespace io {
@@ -65,7 +66,7 @@ protected:
public:
CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
const int64_t length,
- const int32_t readBufferSize =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
+ const int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
CSIndexInput(const CSIndexInput& clone);
~CSIndexInput() override;
void close() override;
@@ -77,8 +78,8 @@ public:
};
CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t
fileOffset,
- const int64_t length, const int32_t _readBufferSize)
- : BufferedIndexInput(_readBufferSize) {
+ const int64_t length, const int32_t
read_buffer_size)
+ : BufferedIndexInput(read_buffer_size) {
this->base = base;
this->fileOffset = fileOffset;
this->_length = length;
@@ -110,25 +111,13 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) :
BufferedIndexInput(clone
void CSIndexInput::close() {}
-DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const
char* name,
- int32_t read_buffer_size, bool
open_idx_file_cache)
- : readBufferSize(read_buffer_size),
- dir(d),
- ram_dir(new lucene::store::RAMDirectory()),
- file_name(name),
- entries(_CLNEW EntriesType(true, true)) {
- bool success = false;
+DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
int32_t read_buffer_size)
+ : _ram_dir(new lucene::store::RAMDirectory()),
+ _stream(stream),
+ _entries(_CLNEW EntriesType(true, true)),
+ _read_buffer_size(read_buffer_size) {
try {
- if (dir->fileLength(name) == 0) {
- LOG(WARNING) << "CompoundReader open failed, index file " << name
<< " is empty.";
- _CLTHROWA(CL_ERR_IO,
- fmt::format("CompoundReader open failed, index file {}
is empty", name)
- .c_str());
- }
- stream = dir->openInput(name, readBufferSize);
- stream->setIdxFileCache(open_idx_file_cache);
-
- int32_t count = stream->readVInt();
+ int32_t count = _stream->readVInt();
ReaderFileEntry* entry = nullptr;
TCHAR tid[CL_MAX_PATH];
uint8_t buffer[BUFFER_LENGTH];
@@ -139,37 +128,50 @@
DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
entry->file_name = aid;
entry->offset = stream->readLong();
entry->length = stream->readLong();
- entries->put(aid, entry);
+ DBUG_EXECUTE_IF("construct_DorisCompoundReader_failed", {
+ CLuceneError err;
+ err.set(CL_ERR_IO, "construct_DorisCompoundReader_failed");
+ throw err;
+ })
+ _entries->put(aid, entry);
// read header file data
if (entry->offset < 0) {
copyFile(entry->file_name.c_str(), entry->length, buffer,
BUFFER_LENGTH);
}
}
-
- success = true;
- }
- _CLFINALLY(if (!success && (stream != nullptr)) {
+ } catch (...) {
try {
- stream->close();
- _CLDELETE(stream)
+ if (_stream != nullptr) {
+ _stream->close();
+ _CLDELETE(_stream)
+ }
+ if (_entries != nullptr) {
+ _entries->clear();
+ _CLDELETE(_entries);
+ }
+ if (_ram_dir) {
+ _ram_dir->close();
+ _CLDELETE(_ram_dir)
+ }
} catch (CLuceneError& err) {
if (err.number() != CL_ERR_IO) {
throw err;
}
}
- })
+ throw;
+ }
}
void DorisCompoundReader::copyFile(const char* file, int64_t file_length,
uint8_t* buffer,
int64_t buffer_length) {
- std::unique_ptr<lucene::store::IndexOutput>
output(ram_dir->createOutput(file));
+ std::unique_ptr<lucene::store::IndexOutput>
output(_ram_dir->createOutput(file));
int64_t start_ptr = output->getFilePointer();
int64_t remainder = file_length;
int64_t chunk = buffer_length;
while (remainder > 0) {
int64_t len = std::min(std::min(chunk, file_length), remainder);
- stream->readBytes(buffer, len);
+ _stream->readBytes(buffer, len);
output->writeBytes(buffer, len);
remainder -= len;
}
@@ -178,7 +180,7 @@ void DorisCompoundReader::copyFile(const char* file,
int64_t file_length, uint8_
swprintf(buf, CL_MAX_PATH + 100,
_T("Non-zero remainder length after copying")
_T(": %d (id: %s, length: %d, buffer size: %d)"),
- (int)remainder, file_name.c_str(), (int)file_length,
(int)chunk);
+ (int)remainder, file, (int)file_length, (int)chunk);
_CLTHROWT(CL_ERR_IO, buf);
}
@@ -203,7 +205,7 @@ DorisCompoundReader::~DorisCompoundReader() {
LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what();
}
}
- _CLDELETE(entries)
+ _CLDELETE(_entries)
}
const char* DorisCompoundReader::getClassName() {
@@ -214,26 +216,22 @@ const char* DorisCompoundReader::getObjectName() const {
}
bool DorisCompoundReader::list(std::vector<std::string>* names) const {
- for (EntriesType::const_iterator i = entries->begin(); i !=
entries->end(); i++) {
+ for (EntriesType::const_iterator i = _entries->begin(); i !=
_entries->end(); i++) {
names->push_back(i->first);
}
return true;
}
bool DorisCompoundReader::fileExists(const char* name) const {
- return entries->exists((char*)name);
-}
-
-lucene::store::Directory* DorisCompoundReader::getDirectory() {
- return dir;
+ return _entries->exists((char*)name);
}
int64_t DorisCompoundReader::fileModified(const char* name) const {
- return dir->fileModified(name);
+ return 0;
}
int64_t DorisCompoundReader::fileLength(const char* name) const {
- ReaderFileEntry* e = entries->get((char*)name);
+ ReaderFileEntry* e = _entries->get((char*)name);
if (e == nullptr) {
char buf[CL_MAX_PATH + 30];
strcpy(buf, "File ");
@@ -257,12 +255,12 @@ bool DorisCompoundReader::openInput(const char* name,
bool DorisCompoundReader::openInput(const char* name,
lucene::store::IndexInput*& ret,
CLuceneError& error, int32_t bufferSize) {
- if (stream == nullptr) {
+ if (_stream == nullptr) {
error.set(CL_ERR_IO, "Stream closed");
return false;
}
- const ReaderFileEntry* entry = entries->get((char*)name);
+ const ReaderFileEntry* entry = _entries->get((char*)name);
if (entry == nullptr) {
char buf[CL_MAX_PATH + 26];
snprintf(buf, CL_MAX_PATH + 26, "No sub-file with id %s found", name);
@@ -271,34 +269,30 @@ bool DorisCompoundReader::openInput(const char* name,
lucene::store::IndexInput*
}
// If file is in RAM, just return.
- if (ram_dir && ram_dir->fileExists(name)) {
- return ram_dir->openInput(name, ret, error, bufferSize);
+ if (_ram_dir && _ram_dir->fileExists(name)) {
+ return _ram_dir->openInput(name, ret, error, bufferSize);
}
if (bufferSize < 1) {
- bufferSize = readBufferSize;
+ bufferSize = _read_buffer_size;
}
- ret = _CLNEW CSIndexInput(stream, entry->offset, entry->length,
bufferSize);
+ ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length,
bufferSize);
return true;
}
void DorisCompoundReader::close() {
std::lock_guard<std::mutex> wlock(_this_lock);
- if (stream != nullptr) {
- stream->close();
- _CLDELETE(stream)
- }
- if (entries != nullptr) {
- entries->clear();
+ if (_stream != nullptr) {
+ _stream->close();
+ _CLDELETE(_stream)
}
- if (ram_dir) {
- ram_dir->close();
- _CLDELETE(ram_dir)
+ if (_entries != nullptr) {
+ _entries->clear();
}
- if (dir) {
- dir->close();
- _CLDECDELETE(dir)
+ if (_ram_dir) {
+ _ram_dir->close();
+ _CLDELETE(_ram_dir)
}
_closed = true;
}
@@ -324,12 +318,11 @@ lucene::store::IndexOutput*
DorisCompoundReader::createOutput(const char* /*name
}
std::string DorisCompoundReader::toString() const {
- return std::string("DorisCompoundReader@") + this->directory +
std::string("; file_name: ") +
- std::string(file_name);
+ return std::string("DorisCompoundReader@");
}
CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() {
- return stream;
+ return _stream;
}
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 1ca2d6ad371..a30c39f8a2f 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -65,16 +65,12 @@ using EntriesType =
lucene::util::Deletor::Object<ReaderFileEntry>>;
class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
private:
- int32_t readBufferSize;
- // base info
- lucene::store::Directory* dir = nullptr;
- lucene::store::RAMDirectory* ram_dir = nullptr;
- std::string directory;
- std::string file_name;
- CL_NS(store)::IndexInput* stream = nullptr;
- EntriesType* entries = nullptr;
+ lucene::store::RAMDirectory* _ram_dir = nullptr;
+ CL_NS(store)::IndexInput* _stream = nullptr;
+ EntriesType* _entries = nullptr;
std::mutex _this_lock;
bool _closed = false;
+ int32_t _read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE;
protected:
/** Removes an existing file in the directory-> */
@@ -83,10 +79,10 @@ protected:
public:
explicit DorisCompoundReader(
CL_NS(store)::IndexInput* stream, EntriesType* entries_clone,
- int32_t _readBufferSize =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE)
- : readBufferSize(_readBufferSize),
- stream(stream),
- entries(_CLNEW EntriesType(true, true)) {
+ int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE)
+ : _stream(stream),
+ _entries(_CLNEW EntriesType(true, true)),
+ _read_buffer_size(read_buffer_size) {
for (auto& e : *entries_clone) {
auto* origin_entry = e.second;
auto* entry = _CLNEW ReaderFileEntry();
@@ -94,17 +90,15 @@ public:
entry->file_name = origin_entry->file_name;
entry->offset = origin_entry->offset;
entry->length = origin_entry->length;
- entries->put(aid, entry);
+ _entries->put(aid, entry);
}
};
- DorisCompoundReader(lucene::store::Directory* dir, const char* name,
- int32_t _readBufferSize =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
- bool open_idx_file_cache = false);
+ DorisCompoundReader(CL_NS(store)::IndexInput* stream,
+ int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
~DorisCompoundReader() override;
void copyFile(const char* file, int64_t file_length, uint8_t* buffer,
int64_t buffer_length);
bool list(std::vector<std::string>* names) const override;
bool fileExists(const char* name) const override;
- lucene::store::Directory* getDirectory();
int64_t fileModified(const char* name) const override;
int64_t fileLength(const char* name) const override;
bool openInput(const char* name, lucene::store::IndexInput*& ret,
CLuceneError& err,
@@ -116,7 +110,6 @@ public:
lucene::store::IndexOutput* createOutput(const char* name) override;
void close() override;
std::string toString() const override;
- std::string getFileName() { return file_name; }
std::string getPath() const;
static const char* getClassName();
const char* getObjectName() const override;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index 09a6a62aaa6..e0c75922c98 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -47,34 +47,39 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t
read_buffer_size) {
std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
try {
- int64_t file_size = 0;
- Status st = _fs->file_size(index_file_full_path, &file_size);
- DBUG_EXECUTE_IF("inverted file read error: index file not found", {
- st = Status::Error<doris::ErrorCode::NOT_FOUND>("index file not
found");
- })
- if (st.code() == ErrorCode::NOT_FOUND) {
- return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
- "inverted index file {} is not found",
index_file_full_path);
- } else if (!st.ok()) {
- return st;
- }
- if (file_size == 0) {
- LOG(WARNING) << "inverted index file " << index_file_full_path <<
" is empty.";
- return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
- "inverted index file {} is empty", index_file_full_path);
- }
-
CLuceneError err;
CL_NS(store)::IndexInput* index_input = nullptr;
- auto ok = DorisFSDirectory::FSIndexInput::open(_fs,
index_file_full_path.c_str(),
- index_input, err,
read_buffer_size);
+
+ // 1. get file size from meta
+ int64_t file_size = -1;
+ if (_idx_file_info.has_index_size()) {
+ file_size = _idx_file_info.index_size();
+ }
+ file_size = file_size == 0 ? -1 : file_size;
+
+ DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", {
+ if (file_size == -1) {
+ return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "CLuceneError occur file size = -1, file is {}",
index_file_full_path);
+ }
+ })
+
+ // 2. open file
+ auto ok = DorisFSDirectory::FSIndexInput::open(
+ _fs, index_file_full_path.c_str(), index_input, err,
read_buffer_size, file_size);
if (!ok) {
+ if (err.number() == CL_ERR_FileNotFound) {
+ return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
+ "inverted index file {} is not found.",
index_file_full_path);
+ }
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when open idx file {}, error msg: {}",
index_file_full_path,
err.what());
}
index_input->setIdxFileCache(_open_idx_file_cache);
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
+
+ // 3. read file
int32_t version = _stream->readInt(); // Read version number
if (version == InvertedIndexStorageFormatPB::V2) {
DCHECK(version == _storage_format);
@@ -153,23 +158,49 @@ Result<std::unique_ptr<DorisCompoundReader>>
InvertedIndexFileReader::_open(
std::unique_ptr<DorisCompoundReader> compound_reader;
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
- DorisFSDirectory* dir = nullptr;
auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
_index_path_prefix, index_id, index_suffix);
try {
- std::filesystem::path path(index_file_path);
- dir = DorisFSDirectoryFactory::getDirectory(_fs,
path.parent_path().c_str());
- compound_reader = std::make_unique<DorisCompoundReader>(
- dir, path.filename().c_str(), _read_buffer_size,
_open_idx_file_cache);
- } catch (CLuceneError& err) {
- if (dir != nullptr) {
- dir->close();
- _CLDELETE(dir)
+ CLuceneError err;
+ CL_NS(store)::IndexInput* index_input = nullptr;
+
+ // 1. get file size from meta
+ int64_t file_size = -1;
+ if (_idx_file_info.index_info_size() > 0) {
+ for (const auto& idx_info : _idx_file_info.index_info()) {
+ if (index_id == idx_info.index_id() &&
+ index_suffix == idx_info.index_suffix()) {
+ file_size = idx_info.index_file_size();
+ break;
+ }
+ }
}
- if (err.number() == CL_ERR_FileNotFound) {
- return
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
- "inverted index path: {} not exist.",
index_file_path));
+ file_size = file_size == 0 ? -1 : file_size;
+ DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", {
+ if (file_size == -1) {
+ return
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "CLuceneError occur file size = -1, file is {}",
index_file_path));
+ }
+ })
+
+ // 2. open file
+ auto ok = DorisFSDirectory::FSIndexInput::open(
+ _fs, index_file_path.c_str(), index_input, err,
_read_buffer_size, file_size);
+ if (!ok) {
+ // now index_input = nullptr
+ if (err.number() == CL_ERR_FileNotFound) {
+ return
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
+ "inverted index file {} is not found.",
index_file_path));
+ }
+ return
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "CLuceneError occur when open idx file {}, error msg:
{}", index_file_path,
+ err.what()));
}
+
+ // 3. read file in DorisCompoundReader
+ index_input->setIdxFileCache(_open_idx_file_cache);
+ compound_reader =
std::make_unique<DorisCompoundReader>(index_input, _read_buffer_size);
+ } catch (CLuceneError& err) {
return
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when open idx file {}, error msg: {}",
index_file_path,
err.what()));
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
index 1414f493e4b..8bc28b1882f 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
@@ -51,10 +51,12 @@ public:
std::map<std::pair<int64_t, std::string>,
std::unique_ptr<EntriesType>>;
InvertedIndexFileReader(io::FileSystemSPtr fs, std::string
index_path_prefix,
- InvertedIndexStorageFormatPB storage_format)
+ InvertedIndexStorageFormatPB storage_format,
+ InvertedIndexFileInfo idx_file_info =
InvertedIndexFileInfo())
: _fs(std::move(fs)),
_index_path_prefix(std::move(index_path_prefix)),
- _storage_format(storage_format) {}
+ _storage_format(storage_format),
+ _idx_file_info(idx_file_info) {}
Status init(int32_t read_buffer_size =
config::inverted_index_read_buffer_size,
bool open_idx_file_cache = false);
@@ -65,6 +67,8 @@ public:
Status index_file_exist(const TabletIndex* index_meta, bool* res) const;
Status has_null(const TabletIndex* index_meta, bool* res) const;
Result<InvertedIndexDirectoryMap> get_all_directories();
+ // open file v2, init _stream
+ int64_t get_inverted_file_size() const { return _stream == nullptr ? 0 :
_stream->length(); }
private:
Status _init_from_v2(int32_t read_buffer_size);
@@ -72,7 +76,7 @@ private:
const std::string&
index_suffix) const;
IndicesEntriesMap _indices_entries;
- std::unique_ptr<CL_NS(store)::IndexInput> _stream;
+ std::unique_ptr<CL_NS(store)::IndexInput> _stream = nullptr;
const io::FileSystemSPtr _fs;
std::string _index_path_prefix;
int32_t _read_buffer_size = -1;
@@ -80,6 +84,7 @@ private:
InvertedIndexStorageFormatPB _storage_format;
mutable std::shared_mutex _mutex; // Use mutable for const read operations
bool _inited = false;
+ InvertedIndexFileInfo _idx_file_info;
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index 6eb54878924..d11b9fa54d0 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -84,8 +84,8 @@ Status InvertedIndexFileWriter::delete_index(const
TabletIndex* index_meta) {
return Status::OK();
}
-size_t InvertedIndexFileWriter::headerLength() {
- size_t header_size = 0;
+int64_t InvertedIndexFileWriter::headerLength() {
+ int64_t header_size = 0;
header_size +=
sizeof(int32_t) * 2; // Account for the size of the version number
and number of indices
@@ -122,7 +122,7 @@ Status InvertedIndexFileWriter::close() {
})
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
try {
- _file_size = write_v1();
+ _total_file_size = write_v1();
for (const auto& entry : _indices_dirs) {
const auto& dir = entry.second;
// delete index path, which contains separated inverted index
files
@@ -137,7 +137,7 @@ Status InvertedIndexFileWriter::close() {
}
} else {
try {
- _file_size = write_v2();
+ _total_file_size = write_v2();
for (const auto& entry : _indices_dirs) {
const auto& dir = entry.second;
// delete index path, which contains separated inverted index
files
@@ -220,8 +220,8 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
input->close();
}
-size_t InvertedIndexFileWriter::write_v1() {
- size_t total_size = 0;
+int64_t InvertedIndexFileWriter::write_v1() {
+ int64_t total_size = 0;
for (const auto& entry : _indices_dirs) {
const int64_t index_id = entry.first.first;
const auto& index_suffix = entry.first.second;
@@ -330,6 +330,12 @@ size_t InvertedIndexFileWriter::write_v1() {
output->close();
//LOG(INFO) << (idx_path / idx_name).c_str() << " size:" <<
compound_file_size;
total_size += compound_file_size;
+ InvertedIndexFileInfo_IndexInfo index_info;
+ index_info.set_index_id(index_id);
+ index_info.set_index_suffix(index_suffix);
+ index_info.set_index_file_size(compound_file_size);
+ auto* new_index_info = _file_info.add_index_info();
+ *new_index_info = index_info;
} catch (CLuceneError& err) {
LOG(ERROR) << "CLuceneError occur when close idx file "
<<
InvertedIndexDescriptor::get_index_file_path_v1(_index_path_prefix,
@@ -342,7 +348,7 @@ size_t InvertedIndexFileWriter::write_v1() {
return total_size;
}
-size_t InvertedIndexFileWriter::write_v2() {
+int64_t InvertedIndexFileWriter::write_v2() {
// Create the output stream to write the compound file
int64_t current_offset = headerLength();
@@ -434,6 +440,7 @@ size_t InvertedIndexFileWriter::write_v2() {
_CLDECDELETE(out_dir)
auto compound_file_size = compound_file_output->getFilePointer();
compound_file_output->close();
+ _file_info.set_index_size(compound_file_size);
return compound_file_size;
}
} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
index 024c1dec986..2aceb671d80 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
@@ -19,6 +19,7 @@
#include <CLucene.h> // IWYU pragma: keep
#include <CLucene/store/IndexInput.h>
+#include <gen_cpp/olap_common.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <string>
@@ -60,11 +61,12 @@ public:
Status delete_index(const TabletIndex* index_meta);
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
~InvertedIndexFileWriter() = default;
- size_t write_v2();
- size_t write_v1();
+ int64_t write_v2();
+ int64_t write_v1();
Status close();
- size_t headerLength();
- size_t get_index_file_size() const { return _file_size; }
+ int64_t headerLength();
+ InvertedIndexFileInfo get_index_file_info() const { return _file_info; }
+ int64_t get_index_file_total_size() const { return _total_file_size; }
const io::FileSystemSPtr& get_fs() const { return _fs; }
void sort_files(std::vector<FileInfo>& file_infos);
void copyFile(const char* fileName, lucene::store::Directory* dir,
@@ -80,10 +82,14 @@ private:
std::string _rowset_id;
int64_t _seg_id;
InvertedIndexStorageFormatPB _storage_format;
- size_t _file_size = 0;
+ // v1: all file size
+ // v2: file size
+ int64_t _total_file_size = 0;
// write to disk or stream
io::FileWriterPtr _idx_v2_writer;
io::FileWriterOptions _opts;
+
+ InvertedIndexFileInfo _file_info;
};
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 27e03b43da2..f752c530020 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -118,7 +118,7 @@ public:
bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const
char* path,
IndexInput*& ret, CLuceneError&
error,
- int32_t buffer_size) {
+ int32_t buffer_size, int64_t
file_size) {
CND_PRECONDITION(path != nullptr, "path is NULL");
if (buffer_size == -1) {
@@ -130,21 +130,26 @@ bool DorisFSDirectory::FSIndexInput::open(const
io::FileSystemSPtr& fs, const ch
reader_options.cache_type = config::enable_file_cache ?
io::FileCachePolicy::FILE_BLOCK_CACHE
:
io::FileCachePolicy::NO_CACHE;
reader_options.is_doris_table = true;
+ reader_options.file_size = file_size;
Status st = fs->open_file(path, &h->_reader, &reader_options);
DBUG_EXECUTE_IF("inverted file read error: index file not found",
{ st = Status::Error<doris::ErrorCode::NOT_FOUND>("index
file not found"); })
if (st.code() == ErrorCode::NOT_FOUND) {
- error.set(CL_ERR_FileNotFound, "File does not exist");
+ error.set(CL_ERR_FileNotFound, fmt::format("File does not exist, file
is {}", path).data());
} else if (st.code() == ErrorCode::IO_ERROR) {
- error.set(CL_ERR_IO, "File open io error");
+ error.set(CL_ERR_IO, fmt::format("File open io error, file is {}",
path).data());
} else if (st.code() == ErrorCode::PERMISSION_DENIED) {
- error.set(CL_ERR_IO, "File Access denied");
- } else {
- error.set(CL_ERR_IO, "Could not open file");
+ error.set(CL_ERR_IO, fmt::format("File Access denied, file is {}",
path).data());
+ } else if (!st.ok()) {
+ error.set(CL_ERR_IO, fmt::format("Could not open file, file is {}",
path).data());
}
//Check if a valid handle was retrieved
if (st.ok() && h->_reader) {
+ if (h->_reader->size() == 0) {
+ // may be an empty file
+ LOG(INFO) << "Opened inverted index file is empty, file is " <<
path;
+ }
//Store the file length
h->_length = h->_reader->size();
h->_fpos = 0;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
index 357ac65c678..59ae6db1a96 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
@@ -189,7 +189,7 @@ protected:
public:
static bool open(const io::FileSystemSPtr& fs, const char* path,
IndexInput*& ret,
- CLuceneError& error, int32_t bufferSize = -1);
+ CLuceneError& error, int32_t bufferSize = -1, int64_t
file_size = -1);
~FSIndexInput() override;
IndexInput* clone() const override;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 0208ed635e1..54c77c8afc4 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -90,11 +90,12 @@ std::string file_cache_key_str(const std::string& seg_path)
{
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t
segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
- const io::FileReaderOptions& reader_options,
- std::shared_ptr<Segment>* output) {
+ const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output,
+ InvertedIndexFileInfo idx_file_info) {
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
- std::shared_ptr<Segment> segment(new Segment(segment_id, rowset_id,
std::move(tablet_schema)));
+ std::shared_ptr<Segment> segment(
+ new Segment(segment_id, rowset_id, std::move(tablet_schema),
idx_file_info));
segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
auto st = segment->_open();
@@ -136,11 +137,13 @@ Status Segment::open(io::FileSystemSPtr fs, const
std::string& path, uint32_t se
return Status::OK();
}
-Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr
tablet_schema)
+Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr
tablet_schema,
+ InvertedIndexFileInfo idx_file_info)
: _segment_id(segment_id),
_meta_mem_usage(0),
_rowset_id(rowset_id),
- _tablet_schema(std::move(tablet_schema)) {
+ _tablet_schema(std::move(tablet_schema)),
+ _idx_file_info(idx_file_info) {
g_total_segment_num << 1;
}
@@ -184,7 +187,7 @@ Status Segment::_open_inverted_index() {
_fs,
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(
_file_reader->path().native())},
- _tablet_schema->get_inverted_index_storage_format());
+ _tablet_schema->get_inverted_index_storage_format(),
_idx_file_info);
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index 2baeadcaf07..dd61e7eb831 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -82,7 +82,7 @@ public:
static Status open(io::FileSystemSPtr fs, const std::string& path,
uint32_t segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options,
- std::shared_ptr<Segment>* output);
+ std::shared_ptr<Segment>* output, InvertedIndexFileInfo
idx_file_info = {});
static io::UInt128Wrapper file_cache_key(std::string_view rowset_id,
uint32_t seg_id);
io::UInt128Wrapper file_cache_key() const {
@@ -195,7 +195,8 @@ public:
private:
DISALLOW_COPY_AND_ASSIGN(Segment);
- Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr
tablet_schema);
+ Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr
tablet_schema,
+ InvertedIndexFileInfo idx_file_info = InvertedIndexFileInfo());
// open segment file and read the minimum amount of necessary information
(footer)
Status _open();
Status _parse_footer(SegmentFooterPB* footer);
@@ -271,6 +272,8 @@ private:
// inverted index file reader
std::shared_ptr<InvertedIndexFileReader> _inverted_index_file_reader;
DorisCallOnce<Status> _inverted_index_file_reader_open;
+
+ InvertedIndexFileInfo _idx_file_info;
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index f20af3df80a..42e625746f3 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -1093,13 +1093,6 @@ uint64_t SegmentWriter::estimate_segment_size() {
return size;
}
-size_t SegmentWriter::try_get_inverted_index_file_size() {
- if (_inverted_index_file_writer != nullptr) {
- return _inverted_index_file_writer->get_index_file_size();
- }
- return 0;
-}
-
Status SegmentWriter::finalize_columns_data() {
if (_has_key) {
_row_count = _num_rows_written;
@@ -1166,8 +1159,8 @@ Status SegmentWriter::finalize_footer(uint64_t*
segment_file_size) {
}
if (_inverted_index_file_writer != nullptr) {
RETURN_IF_ERROR(_inverted_index_file_writer->close());
+ _inverted_index_file_info =
_inverted_index_file_writer->get_index_file_info();
}
- _inverted_index_file_size = try_get_inverted_index_file_size();
return Status::OK();
}
@@ -1400,5 +1393,12 @@ Status SegmentWriter::_generate_short_key_index(
return Status::OK();
}
+int64_t SegmentWriter::get_inverted_index_total_size() {
+ if (_inverted_index_file_writer != nullptr) {
+ return _inverted_index_file_writer->get_index_file_total_size();
+ }
+ return 0;
+}
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 41c3d5da3a7..32723e72fb0 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -102,9 +102,10 @@ public:
int64_t max_row_to_add(size_t row_avg_size_in_bytes);
uint64_t estimate_segment_size();
- size_t try_get_inverted_index_file_size();
- size_t get_inverted_index_file_size() const { return
_inverted_index_file_size; }
+ InvertedIndexFileInfo get_inverted_index_file_info() const { return
_inverted_index_file_info; }
+ int64_t get_inverted_index_total_size();
+
uint32_t num_rows_written() const { return _num_rows_written; }
// for partial update
@@ -197,7 +198,7 @@ private:
SegmentFooterPB _footer;
size_t _num_key_columns;
size_t _num_short_key_columns;
- size_t _inverted_index_file_size;
+ InvertedIndexFileInfo _inverted_index_file_info;
std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder;
std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder;
std::vector<std::unique_ptr<ColumnWriter>> _column_writers;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 3e23b1fda52..ecb248b7fe9 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -1010,13 +1010,6 @@ uint64_t
VerticalSegmentWriter::_estimated_remaining_size() {
return size;
}
-size_t VerticalSegmentWriter::_calculate_inverted_index_file_size() {
- if (_inverted_index_file_writer != nullptr) {
- return _inverted_index_file_writer->get_index_file_size();
- }
- return 0;
-}
-
Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) {
uint64_t index_start = _file_writer->bytes_appended();
RETURN_IF_ERROR(_write_ordinal_index());
@@ -1035,7 +1028,10 @@ Status
VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) {
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _file_writer->bytes_appended() - index_start;
}
- _inverted_index_file_size = _calculate_inverted_index_file_size();
+
+ if (_inverted_index_file_writer != nullptr) {
+ _inverted_index_file_info =
_inverted_index_file_writer->get_index_file_info();
+ }
// reset all column writers and data_conveter
clear();
@@ -1199,5 +1195,12 @@ void VerticalSegmentWriter::_set_max_key(const Slice&
key) {
_max_key.append(key.get_data(), key.get_size());
}
+int64_t VerticalSegmentWriter::get_inverted_index_total_size() {
+ if (_inverted_index_file_writer != nullptr) {
+ return _inverted_index_file_writer->get_index_file_total_size();
+ }
+ return 0;
+}
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index c52deea40a0..831747712b0 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -99,7 +99,9 @@ public:
[[nodiscard]] std::string data_dir_path() const {
return _data_dir == nullptr ? "" : _data_dir->path();
}
- [[nodiscard]] size_t inverted_index_file_size() const { return
_inverted_index_file_size; }
+ [[nodiscard]] InvertedIndexFileInfo get_inverted_index_file_info() const {
+ return _inverted_index_file_info;
+ }
[[nodiscard]] uint32_t num_rows_written() const { return
_num_rows_written; }
// for partial update
@@ -120,13 +122,14 @@ public:
TabletSchemaSPtr flush_schema() const { return _flush_schema; };
+ int64_t get_inverted_index_total_size();
+
void clear();
private:
void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const
TabletColumn& column);
Status _create_column_writer(uint32_t cid, const TabletColumn& column,
const TabletSchemaSPtr& schema);
- size_t _calculate_inverted_index_file_size();
uint64_t _estimated_remaining_size();
Status _write_ordinal_index();
Status _write_zone_map();
@@ -171,7 +174,7 @@ private:
SegmentFooterPB _footer;
size_t _num_key_columns;
size_t _num_short_key_columns;
- size_t _inverted_index_file_size;
+ InvertedIndexFileInfo _inverted_index_file_info;
std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder;
std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder;
std::vector<std::unique_ptr<ColumnWriter>> _column_writers;
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index ee687d18edc..ced0fb880c4 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -205,8 +205,10 @@ Status VerticalBetaRowsetWriter<T>::final_flush() {
LOG(WARNING) << "Fail to finalize segment footer, " << st;
return st;
}
- this->_total_data_size += segment_size +
segment_writer->get_inverted_index_file_size();
- this->_total_index_size +=
segment_writer->get_inverted_index_file_size();
+ this->_total_data_size += segment_size +
segment_writer->get_inverted_index_total_size();
+ this->_total_index_size +=
segment_writer->get_inverted_index_total_size();
+ this->_idx_files_info.add_file_info(segment_writer->get_segment_id(),
+
segment_writer->get_inverted_index_file_info());
segment_writer.reset();
}
return Status::OK();
diff --git a/be/src/olap/task/index_builder.cpp
b/be/src/olap/task/index_builder.cpp
index e4a3332ad17..38a52d1d211 100644
--- a/be/src/olap/task/index_builder.cpp
+++ b/be/src/olap/task/index_builder.cpp
@@ -310,7 +310,7 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
LOG(ERROR) << "close inverted_index_writer error:" << st;
return st;
}
- inverted_index_size +=
inverted_index_writer->get_index_file_size();
+ inverted_index_size +=
inverted_index_writer->get_index_file_total_size();
}
_inverted_index_file_writers.clear();
output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size() +
@@ -465,7 +465,7 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
LOG(ERROR) << "close inverted_index_writer error:" << st;
return st;
}
- inverted_index_size +=
inverted_index_file_writer->get_index_file_size();
+ inverted_index_size +=
inverted_index_file_writer->get_index_file_total_size();
}
_inverted_index_builders.clear();
_inverted_index_file_writers.clear();
diff --git a/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp
b/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp
index 7ebf89300da..0482ae7e1b5 100644
--- a/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp
+++ b/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp
@@ -59,9 +59,12 @@ public:
const std::string kTestDir = "./ut_dir/inverted_index_array_test";
void check_terms_stats(string dir_str, string file_str) {
- auto fs = io::global_local_filesystem();
- std::unique_ptr<DorisCompoundReader> reader =
std::make_unique<DorisCompoundReader>(
- DorisFSDirectoryFactory::getDirectory(fs, dir_str.c_str()),
file_str.c_str(), 4096);
+ CLuceneError err;
+ CL_NS(store)::IndexInput* index_input = nullptr;
+ DorisFSDirectory::FSIndexInput::open(io::global_local_filesystem(),
file_str.c_str(),
+ index_input, err, 4096);
+ std::unique_ptr<DorisCompoundReader> reader =
+ std::make_unique<DorisCompoundReader>(index_input, 4096);
std::cout << "Term statistics for " << file_str << std::endl;
std::cout << "==================================" << std::endl;
lucene::store::Directory* dir = reader.get();
diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto
index e60aa7603fc..a305f2ce460 100644
--- a/gensrc/proto/olap_common.proto
+++ b/gensrc/proto/olap_common.proto
@@ -62,4 +62,14 @@ message PTopNCounter {
enum FileType {
SEGMENT_FILE = 1;
INVERTED_INDEX_FILE = 2;
-}
\ No newline at end of file
+}
+
+message InvertedIndexFileInfo {
+ message IndexInfo {
+ required int64 index_id = 1;
+ required int64 index_file_size = 2 [default = -1];
+ optional string index_suffix = 3;
+ }
+ repeated IndexInfo index_info = 1; // for inverted index v1
+ optional int64 index_size = 2; // for inverted index v2
+}
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 69ac88d4a72..9f5dad5886e 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -131,6 +131,9 @@ message RowsetMetaPB {
// the segments_file_size maybe is empty or error
optional bool enable_segments_file_size = 1004;
optional bool has_variant_type_in_schema = 1005;
+
+ optional bool enable_inverted_index_file_info = 1006;
+ repeated InvertedIndexFileInfo inverted_index_file_info = 1007;
}
message SchemaDictKeyList {
@@ -214,6 +217,9 @@ message RowsetMetaCloudPB {
optional bool has_variant_type_in_schema = 104;
// dict key lists for compress schema info
optional SchemaDictKeyList schema_dict_key_list = 105;
+
+ optional bool enable_inverted_index_file_info = 106;
+ repeated InvertedIndexFileInfo inverted_index_file_info = 107;
}
message SegmentStatisticsPB {
diff --git
a/regression-test/data/inverted_index_p0/test_compound_reader_fault_injection.out
b/regression-test/data/inverted_index_p0/test_compound_reader_fault_injection.out
new file mode 100644
index 00000000000..cc8db5dd8b6
--- /dev/null
+++
b/regression-test/data/inverted_index_p0/test_compound_reader_fault_injection.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+5
+
diff --git
a/regression-test/data/inverted_index_p0/test_inverted_index_file_size.out
b/regression-test/data/inverted_index_p0/test_inverted_index_file_size.out
new file mode 100644
index 00000000000..37d0b96d8a1
--- /dev/null
+++ b/regression-test/data/inverted_index_p0/test_inverted_index_file_size.out
@@ -0,0 +1,49 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+3860
+
+-- !sql --
+125
+
+-- !sql --
+3860
+
+-- !sql --
+125
+
+-- !sql --
+3860
+
+-- !sql --
+125
+
+-- !sql --
+3860
+
+-- !sql --
+125
+
+-- !sql --
+3860
+
+-- !sql --
+125
+
+-- !sql --
+3860
+
+-- !sql --
+125
+
+-- !sql --
+3860
+
+-- !sql --
+125
+
+-- !sql --
+3860
+
+-- !sql --
+125
+
diff --git
a/regression-test/data/inverted_index_p0/test_inverted_index_v2_file_size.out
b/regression-test/data/inverted_index_p0/test_inverted_index_v2_file_size.out
new file mode 100644
index 00000000000..bdb27e98fa9
--- /dev/null
+++
b/regression-test/data/inverted_index_p0/test_inverted_index_v2_file_size.out
@@ -0,0 +1,85 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 andy andy love apple 100
+1 bason bason hate pear 99
+2 andy andy love apple 100
+2 bason bason hate pear 99
+3 andy andy love apple 100
+3 bason bason hate pear 99
+
+-- !sql --
+1 andy andy love apple 100
+2 andy andy love apple 100
+3 andy andy love apple 100
+
+-- !sql --
+1 bason bason hate pear 99
+2 bason bason hate pear 99
+3 bason bason hate pear 99
+
+-- !sql --
+1 bason bason hate pear 99
+2 bason bason hate pear 99
+3 bason bason hate pear 99
+
+-- !sql --
+1 andy andy love apple 100
+1 bason bason hate pear 99
+2 andy andy love apple 100
+2 bason bason hate pear 99
+3 andy andy love apple 100
+3 bason bason hate pear 99
+
+-- !sql --
+1 andy andy love apple 100
+2 andy andy love apple 100
+3 andy andy love apple 100
+
+-- !sql --
+1 bason bason hate pear 99
+2 bason bason hate pear 99
+3 bason bason hate pear 99
+
+-- !sql --
+1 bason bason hate pear 99
+2 bason bason hate pear 99
+3 bason bason hate pear 99
+
+-- !sql --
+1 andy andy love apple 100
+1 andy andy love apple 100
+1 bason bason hate pear 99
+1 bason bason hate pear 99
+2 andy andy love apple 100
+2 andy andy love apple 100
+2 bason bason hate pear 99
+2 bason bason hate pear 99
+3 andy andy love apple 100
+3 andy andy love apple 100
+3 bason bason hate pear 99
+3 bason bason hate pear 99
+
+-- !sql --
+1 andy andy love apple 100
+1 andy andy love apple 100
+2 andy andy love apple 100
+2 andy andy love apple 100
+3 andy andy love apple 100
+3 andy andy love apple 100
+
+-- !sql --
+1 bason bason hate pear 99
+1 bason bason hate pear 99
+2 bason bason hate pear 99
+2 bason bason hate pear 99
+3 bason bason hate pear 99
+3 bason bason hate pear 99
+
+-- !sql --
+1 bason bason hate pear 99
+1 bason bason hate pear 99
+2 bason bason hate pear 99
+2 bason bason hate pear 99
+3 bason bason hate pear 99
+3 bason bason hate pear 99
+
diff --git
a/regression-test/suites/inverted_index_p0/test_compound_reader_fault_injection.groovy
b/regression-test/suites/inverted_index_p0/test_compound_reader_fault_injection.groovy
new file mode 100644
index 00000000000..8cf49742fe3
--- /dev/null
+++
b/regression-test/suites/inverted_index_p0/test_compound_reader_fault_injection.groovy
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+suite("test_compound_reader_fault_injection", "nonConcurrent") {
+ // define a sql table
+ def testTable = "httplogs"
+
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ sql """
+ CREATE TABLE ${testTable} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` string NULL COMMENT "",
+ `request` string NULL COMMENT "",
+ `status` string NULL COMMENT "",
+ `size` string NULL COMMENT "",
+ INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "inverted_index_storage_format" = "V1"
+ );
+ """
+
+ sql """ INSERT INTO ${testTable} VALUES (893964617, '40.135.0.0', 'GET
/images/hm_bg.jpg HTTP/1.0', 200, 24736); """
+ sql """ INSERT INTO ${testTable} VALUES (893964653, '232.0.0.0', 'GET
/images/hm_bg.jpg HTTP/1.0', 200, 3781); """
+ sql """ INSERT INTO ${testTable} VALUES (893964672, '26.1.0.0', 'GET
/images/hm_bg.jpg HTTP/1.0', 304, 0); """
+ sql """ INSERT INTO ${testTable} VALUES (893964672, '26.1.0.0', 'GET
/images/hm_bg.jpg HTTP/1.0', 304, 0); """
+ sql """ INSERT INTO ${testTable} VALUES (893964653, '232.0.0.0', 'GET
/images/hm_bg.jpg HTTP/1.0', 200, 3781); """
+
+ sql 'sync'
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("construct_DorisCompoundReader_failed")
+ try {
+ sql """ select count() from ${testTable} where (request match
'HTTP'); """
+ } catch (Exception e) {
+ log.info(e.getMessage())
+
assertTrue(e.getMessage().contains("construct_DorisCompoundReader_failed"))
+ }
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("construct_DorisCompoundReader_failed")
+ qt_sql """ select count() from ${testTable} where (request match
'HTTP'); """
+ }
+}
diff --git
a/regression-test/suites/inverted_index_p0/test_inverted_index_file_size.groovy
b/regression-test/suites/inverted_index_p0/test_inverted_index_file_size.groovy
new file mode 100644
index 00000000000..a2748cb93c9
--- /dev/null
+++
b/regression-test/suites/inverted_index_p0/test_inverted_index_file_size.groovy
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+suite("test_inverted_index_file_size", "nonConcurrent"){
+ def tableName = "test_inverted_index_file_size"
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_config = { key, value ->
+
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def load_data = {
+ // load the json data
+ streamLoad {
+ table "${tableName}"
+
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ file 'documents-1000.json' // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ def run_compaction_and_wait = {
+
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
+ def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
+
+ // trigger compactions for all tablets in ${tableName}
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ if (compactJson.status.toLowerCase() == "fail") {
+ logger.info("Compaction was done automatically!")
+ } else {
+ assertEquals("success", compactJson.status.toLowerCase())
+ }
+ }
+
+ // wait for all compactions done
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+ }
+
+ def test_table = { format ->
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` varchar(20) NULL COMMENT "",
+ `request` text NULL COMMENT "",
+ `status` varchar(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX clientip_idx (`clientip`) USING INVERTED
PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
+ INDEX request_idx (`request`) USING INVERTED
PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
+ INDEX status_idx (`status`) USING INVERTED COMMENT '',
+ INDEX size_idx (`size`) USING INVERTED COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY RANDOM BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "inverted_index_storage_format" = "${format}"
+ );
+ """
+
+ load_data.call()
+ load_data.call()
+ load_data.call()
+ load_data.call()
+ load_data.call()
+
+ qt_sql """ select count() from ${tableName} where clientip match
'17.0.0.0' and request match 'GET' and status match '200' and size > 200 """
+ qt_sql """ select count() from ${tableName} where clientip
match_phrase '17.0.0.0' and request match_phrase 'GET' and status match '200'
and size > 200 """
+ run_compaction_and_wait.call()
+ qt_sql """ select count() from ${tableName} where clientip match
'17.0.0.0' and request match 'GET' and status match '200' and size > 200 """
+ qt_sql """ select count() from ${tableName} where clientip
match_phrase '17.0.0.0' and request match_phrase 'GET' and status match '200'
and size > 200 """
+
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("file_size_not_in_rowset_meta")
+ set_be_config.call("inverted_index_compaction_enable", "true")
+ test_table.call("V1")
+ test_table.call("V2")
+ set_be_config.call("inverted_index_compaction_enable", "false")
+ test_table.call("V1")
+ test_table.call("V2")
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("file_size_not_in_rowset_meta")
+ set_be_config.call("inverted_index_compaction_enable", "true")
+ }
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/inverted_index_p0/test_inverted_index_v2_file_size.groovy
b/regression-test/suites/inverted_index_p0/test_inverted_index_v2_file_size.groovy
new file mode 100644
index 00000000000..1484b7284a3
--- /dev/null
+++
b/regression-test/suites/inverted_index_p0/test_inverted_index_v2_file_size.groovy
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_index_index_V2_file_size", "nonConcurrent") {
+ def isCloudMode = isCloudMode()
+ def tableName = "test_index_index_V2_file_size"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_config = { key, value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def trigger_full_compaction_on_tablets = { tablets ->
+ for (def tablet : tablets) {
+ String tablet_id = tablet.TabletId
+ String backend_id = tablet.BackendId
+ int times = 1
+
+ String compactionStatus;
+ do{
+ def (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out +
", err=" + err)
+ ++times
+ sleep(2000)
+ compactionStatus = parseJson(out.trim()).status.toLowerCase();
+ } while (compactionStatus!="success" && times<=10)
+
+
+ if (compactionStatus == "fail") {
+ logger.info("Compaction was done automatically!")
+ }
+ }
+ }
+
+ def wait_full_compaction_done = { tablets ->
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ String backend_id = tablet.BackendId
+ def (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+ }
+
+ def get_rowset_count = { tablets ->
+ int rowsetCount = 0
+ for (def tablet in tablets) {
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ rowsetCount +=((List<String>) tabletJson.rowsets).size()
+ }
+ return rowsetCount
+ }
+
+ boolean invertedIndexCompactionEnable = false
+ try {
+ String backend_id;
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "inverted_index_compaction_enable")
{
+ invertedIndexCompactionEnable =
Boolean.parseBoolean(((List<String>) ele)[2])
+ logger.info("inverted_index_compaction_enable:
${((List<String>) ele)[2]}")
+ }
+ }
+ set_be_config.call("inverted_index_compaction_enable", "true")
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1", "disable_auto_compaction" =
"true");
+ """
+
+
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
+ def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
+
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate
pear", 99); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "bason", "bason hate
pear", 99); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "bason", "bason hate
pear", 99); """
+
+
GetDebugPoint().enableDebugPointForAllBEs("match.invert_index_not_support_execute_match")
+
+ qt_sql """ select * from ${tableName} order by id, name, hobbies,
score """
+ qt_sql """ select * from ${tableName} where name match "andy" order by
id, name, hobbies, score """
+ qt_sql """ select * from ${tableName} where hobbies match "pear" order
by id, name, hobbies, score """
+ qt_sql """ select * from ${tableName} where score < 100 order by id,
name, hobbies, score """
+
+ // trigger full compactions for all tablets in ${tableName}
+ trigger_full_compaction_on_tablets.call(tablets)
+
+ // wait for full compaction done
+ wait_full_compaction_done.call(tablets)
+
+ def dedup_tablets = deduplicate_tablets(tablets)
+
+ // In the p0 testing environment, there are no expected operations
such as scaling down BE (backend) services
+ // if tablets or dedup_tablets is empty, exception is thrown, and case
fail
+ int replicaNum = Math.floor(tablets.size() / dedup_tablets.size())
+ if (replicaNum != 1 && replicaNum != 3)
+ {
+ assert(false);
+ }
+
+ // after full compaction, there is only 1 rowset.
+ def count = get_rowset_count.call(tablets);
+ if (isCloudMode) {
+ assert (count == (1 + 1) * replicaNum)
+ } else {
+ assert (count == 1 * replicaNum)
+ }
+
+ qt_sql """ select * from ${tableName} order by id, name, hobbies,
score """
+ qt_sql """ select * from ${tableName} where name match "andy" order by
id, name, hobbies, score """
+ qt_sql """ select * from ${tableName} where hobbies match "pear" order
by id, name, hobbies, score """
+ qt_sql """ select * from ${tableName} where score < 100 order by id,
name, hobbies, score """
+
+ // insert more data and trigger full compaction again
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate
pear", 99); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "bason", "bason hate
pear", 99); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "bason", "bason hate
pear", 99); """
+
+ set_be_config.call("inverted_index_compaction_enable", "false")
+ // trigger full compactions for all tablets in ${tableName}
+ trigger_full_compaction_on_tablets.call(tablets)
+
+ // wait for full compaction done
+ wait_full_compaction_done.call(tablets)
+
+ // after full compaction, there is only 1 rowset.
+ count = get_rowset_count.call(tablets);
+ if (isCloudMode) {
+ assert (count == (1 + 1) * replicaNum)
+ } else {
+ assert (count == 1 * replicaNum)
+ }
+
+ qt_sql """ select * from ${tableName} order by id, name, hobbies,
score """
+ qt_sql """ select * from ${tableName} where name match "andy" order by
id, name, hobbies, score """
+ qt_sql """ select * from ${tableName} where hobbies match "pear" order
by id, name, hobbies, score """
+ qt_sql """ select * from ${tableName} where score < 100 order by id,
name, hobbies, score """
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("match.invert_index_not_support_execute_match")
+ set_be_config.call("inverted_index_compaction_enable",
invertedIndexCompactionEnable.toString())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]