This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bf51d5b82a1 [feature](move-memtable) support for inverted index file
(#35891)
bf51d5b82a1 is described below
commit bf51d5b82a193d3515d730d673e99585e4867b24
Author: Sun Chenyang <[email protected]>
AuthorDate: Fri Jul 5 10:03:11 2024 +0800
[feature](move-memtable) support for inverted index file (#35891)
support for inverted index file in move-memtable
---
be/src/io/fs/stream_sink_file_writer.cpp | 12 +-
be/src/io/fs/stream_sink_file_writer.h | 5 +-
be/src/olap/delta_writer_v2.cpp | 1 +
be/src/olap/rowset/beta_rowset_writer.cpp | 17 +-
be/src/olap/rowset/beta_rowset_writer.h | 3 +-
be/src/olap/rowset/beta_rowset_writer_v2.cpp | 6 +-
be/src/olap/rowset/beta_rowset_writer_v2.h | 3 +-
be/src/olap/rowset/rowset_writer.h | 4 +-
be/src/olap/rowset/rowset_writer_context.h | 3 +
be/src/olap/rowset/segment_creator.cpp | 40 +++--
be/src/olap/rowset/segment_creator.h | 10 +-
.../segment_v2/inverted_index_file_writer.cpp | 12 +-
.../rowset/segment_v2/inverted_index_file_writer.h | 9 +-
.../segment_v2/inverted_index_fs_directory.cpp | 101 +++++++++++
.../segment_v2/inverted_index_fs_directory.h | 3 +
be/src/olap/rowset/segment_v2/segment_writer.cpp | 11 +-
be/src/olap/rowset/segment_v2/segment_writer.h | 4 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 11 +-
.../rowset/segment_v2/vertical_segment_writer.h | 3 +-
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/runtime/load_stream.cpp | 14 +-
be/src/runtime/load_stream.h | 2 +-
be/src/runtime/load_stream_writer.cpp | 113 +++++++++----
be/src/runtime/load_stream_writer.h | 10 +-
be/src/util/thrift_util.cpp | 9 +-
be/src/util/thrift_util.h | 2 +-
be/src/vec/sink/load_stream_stub.cpp | 3 +-
be/src/vec/sink/load_stream_stub.h | 2 +-
be/test/io/fs/stream_sink_file_writer_test.cpp | 3 +-
.../org/apache/doris/planner/OlapTableSink.java | 1 +
gensrc/proto/internal_service.proto | 2 +
gensrc/proto/olap_common.proto | 5 +
gensrc/thrift/Descriptors.thrift | 1 +
.../data/inverted_index_p0/load/test_insert.out | 73 ++++++++
.../inverted_index_p0/load/test_stream_load.out | 45 +++++
.../test_index_lowercase_fault_injection.out | 0
.../test_stream_load_with_inverted_index.out | 43 +++++
..._writer_v2_back_pressure_fault_injection.groovy | 3 +
.../test_load_stream_fault_injection.groovy | 20 +--
.../inverted_index_p0/load/test_insert.groovy | 81 +++++++++
.../inverted_index_p0/load/test_spark_load.groovy | 174 +++++++++++++++++++
.../inverted_index_p0/load/test_stream_load.groovy | 150 +++++++++++++++++
.../test_index_lowercase_fault_injection.groovy | 2 +-
.../test_stream_load_with_inverted_index.groovy | 185 +++++++++++++++++++++
.../test_insert_into_index.groovy | 75 +++++++++
.../load_p0/http_stream/test_http_stream.groovy | 8 +-
.../load_p0/mysql_load/test_mysql_load.groovy | 4 +-
.../mysql_load/test_mysql_load_big_file.groovy | 4 +-
48 files changed, 1187 insertions(+), 107 deletions(-)
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index bca548be9a2..1d7f823af10 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -28,15 +28,17 @@
namespace doris::io {
void StreamSinkFileWriter::init(PUniqueId load_id, int64_t partition_id,
int64_t index_id,
- int64_t tablet_id, int32_t segment_id) {
+ int64_t tablet_id, int32_t segment_id,
FileType file_type) {
VLOG_DEBUG << "init stream writer, load id(" <<
UniqueId(load_id).to_string()
<< "), partition id(" << partition_id << "), index id(" <<
index_id
- << "), tablet_id(" << tablet_id << "), segment_id(" <<
segment_id << ")";
+ << "), tablet_id(" << tablet_id << "), segment_id(" <<
segment_id << ")"
+ << ", file_type(" << file_type << ")";
_load_id = load_id;
_partition_id = partition_id;
_index_id = index_id;
_tablet_id = tablet_id;
_segment_id = segment_id;
+ _file_type = file_type;
}
Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
@@ -47,7 +49,7 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ",
index_id: " << _index_id
<< ", tablet_id: " << _tablet_id << ", segment_id: " <<
_segment_id
- << ", data_length: " << bytes_req;
+ << ", data_length: " << bytes_req << "file_type" << _file_type;
std::span<const Slice> slices {data, data_cnt};
size_t stream_index = 0;
@@ -67,7 +69,7 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
});
if (!skip_stream) {
st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
- _bytes_appended, slices);
+ _bytes_appended, slices, false,
_file_type);
}
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
{
if (stream_index >= 2) {
@@ -140,7 +142,7 @@ Status StreamSinkFileWriter::_finalize() {
bool ok = false;
for (auto& stream : _streams) {
auto st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
- _bytes_appended, {}, true);
+ _bytes_appended, {}, true, _file_type);
ok = ok || st.ok();
if (!st.ok()) {
LOG(WARNING) << "failed to send segment eos to backend " <<
stream->dst_id()
diff --git a/be/src/io/fs/stream_sink_file_writer.h
b/be/src/io/fs/stream_sink_file_writer.h
index 0d621e8b4c1..0950039077b 100644
--- a/be/src/io/fs/stream_sink_file_writer.h
+++ b/be/src/io/fs/stream_sink_file_writer.h
@@ -18,7 +18,7 @@
#pragma once
#include <brpc/stream.h>
-#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_common.pb.h>
#include <queue>
@@ -40,7 +40,7 @@ public:
: _streams(std::move(streams)) {}
void init(PUniqueId load_id, int64_t partition_id, int64_t index_id,
int64_t tablet_id,
- int32_t segment_id);
+ int32_t segment_id, FileType file_type = FileType::SEGMENT_FILE);
Status appendv(const Slice* data, size_t data_cnt) override;
@@ -69,6 +69,7 @@ private:
int32_t _segment_id;
size_t _bytes_appended = 0;
State _state {State::OPENED};
+ FileType _file_type {FileType::SEGMENT_FILE};
};
} // namespace io
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 3f2f7bf99fa..805f072f6e6 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -123,6 +123,7 @@ Status DeltaWriterV2::init() {
context.rowset_id =
ExecEnv::GetInstance()->storage_engine().next_rowset_id();
context.data_dir = nullptr;
context.partial_update_info = _partial_update_info;
+ context.memtable_on_sink_support_index_v2 = true;
_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index d418a89a361..17801ec16fd 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -831,10 +831,19 @@ Status BaseBetaRowsetWriter::_create_file_writer(const
std::string& path,
return Status::OK();
}
-Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id,
- io::FileWriterPtr&
file_writer) {
- auto path = _context.segment_path(segment_id);
- return _create_file_writer(path, file_writer);
+Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id,
io::FileWriterPtr& file_writer,
+ FileType file_type) {
+ auto segment_path = _context.segment_path(segment_id);
+ if (file_type == FileType::INVERTED_INDEX_FILE) {
+ std::string prefix =
+ std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)};
+ std::string index_path =
InvertedIndexDescriptor::get_index_file_path_v2(prefix);
+ return _create_file_writer(index_path, file_writer);
+ } else if (file_type == FileType::SEGMENT_FILE) {
+ return _create_file_writer(segment_path, file_writer);
+ }
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
+ fmt::format("failed to create file = {}, file type = {}",
segment_path, file_type));
}
Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index d729a15df32..98bb43c6092 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -98,7 +98,8 @@ public:
Status add_rowset(RowsetSharedPtr rowset) override;
Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset)
override;
- Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer)
override;
+ Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer,
+ FileType file_type = FileType::SEGMENT_FILE)
override;
Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) override;
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index 1fee4e04034..3ebe331cfc1 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -69,14 +69,14 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext&
rowset_writer_context
return Status::OK();
}
-Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id,
io::FileWriterPtr& file_writer) {
+Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id,
io::FileWriterPtr& file_writer,
+ FileType file_type) {
auto partition_id = _context.partition_id;
auto index_id = _context.index_id;
auto tablet_id = _context.tablet_id;
auto load_id = _context.load_id;
-
auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(_streams);
- stream_writer->init(load_id, partition_id, index_id, tablet_id,
segment_id);
+ stream_writer->init(load_id, partition_id, index_id, tablet_id,
segment_id, file_type);
file_writer = std::move(stream_writer);
return Status::OK();
}
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index e406a2037a7..89bd3045089 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -80,7 +80,8 @@ public:
"add_rowset_for_linked_schema_change is not implemented");
}
- Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer)
override;
+ Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer,
+ FileType file_type = FileType::SEGMENT_FILE)
override;
Status flush() override {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>("flush is not
implemented");
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 75a592cf98d..6861b8ab7e2 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -17,6 +17,7 @@
#pragma once
+#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/types.pb.h>
@@ -89,7 +90,8 @@ public:
// Precondition: the input `rowset` should have the same type of the
rowset we're building
virtual Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset)
= 0;
- virtual Status create_file_writer(uint32_t segment_id, io::FileWriterPtr&
writer) {
+ virtual Status create_file_writer(uint32_t segment_id, io::FileWriterPtr&
writer,
+ FileType file_type =
FileType::SEGMENT_FILE) {
return Status::NotSupported("RowsetWriter does not support
create_file_writer");
}
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index 488030993e1..0130916bfb4 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -88,6 +88,9 @@ struct RowsetWriterContext {
std::shared_ptr<FileWriterCreator> file_writer_creator;
std::shared_ptr<SegmentCollector> segment_collector;
+ // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter
to send inverted index file
+ bool memtable_on_sink_support_index_v2 = false;
+
/// begin file cache opts
bool write_file_cache = false;
bool is_hot_data = false;
diff --git a/be/src/olap/rowset/segment_creator.cpp
b/be/src/olap/rowset/segment_creator.cpp
index 738c6e2f9f9..82313f988cb 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -134,8 +134,17 @@ Status
SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWrit
Status
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
int32_t segment_id, bool
no_compression) {
- io::FileWriterPtr file_writer;
- RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id,
file_writer));
+ io::FileWriterPtr segment_file_writer;
+ RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id,
segment_file_writer));
+
+ io::FileWriterPtr inverted_file_writer;
+ if (_context.tablet_schema->has_inverted_index() &&
+ _context.tablet_schema->get_inverted_index_storage_format() >=
+ InvertedIndexStorageFormatPB::V2 &&
+ _context.memtable_on_sink_support_index_v2) {
+ RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id,
inverted_file_writer,
+
FileType::INVERTED_INDEX_FILE));
+ }
segment_v2::SegmentWriterOptions writer_options;
writer_options.enable_unique_key_merge_on_write =
_context.enable_unique_key_merge_on_write;
@@ -146,9 +155,10 @@ Status
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
}
writer = std::make_unique<segment_v2::SegmentWriter>(
- file_writer.get(), segment_id, _context.tablet_schema,
_context.tablet,
- _context.data_dir, _context.max_rows_per_segment, writer_options,
_context.mow_context);
- RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer)));
+ segment_file_writer.get(), segment_id, _context.tablet_schema,
_context.tablet,
+ _context.data_dir, _context.max_rows_per_segment, writer_options,
_context.mow_context,
+ std::move(inverted_file_writer));
+ RETURN_IF_ERROR(_seg_files.add(segment_id,
std::move(segment_file_writer)));
auto s = writer->init();
if (!s.ok()) {
LOG(WARNING) << "failed to init segment writer: " << s.to_string();
@@ -161,8 +171,17 @@ Status
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
Status SegmentFlusher::_create_segment_writer(
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t
segment_id,
bool no_compression) {
- io::FileWriterPtr file_writer;
- RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id,
file_writer));
+ io::FileWriterPtr segment_file_writer;
+ RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id,
segment_file_writer));
+
+ io::FileWriterPtr inverted_file_writer;
+ if (_context.tablet_schema->has_inverted_index() &&
+ _context.tablet_schema->get_inverted_index_storage_format() >=
+ InvertedIndexStorageFormatPB::V2 &&
+ _context.memtable_on_sink_support_index_v2) {
+ RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id,
inverted_file_writer,
+
FileType::INVERTED_INDEX_FILE));
+ }
segment_v2::VerticalSegmentWriterOptions writer_options;
writer_options.enable_unique_key_merge_on_write =
_context.enable_unique_key_merge_on_write;
@@ -173,9 +192,10 @@ Status SegmentFlusher::_create_segment_writer(
}
writer = std::make_unique<segment_v2::VerticalSegmentWriter>(
- file_writer.get(), segment_id, _context.tablet_schema,
_context.tablet,
- _context.data_dir, _context.max_rows_per_segment, writer_options,
_context.mow_context);
- RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer)));
+ segment_file_writer.get(), segment_id, _context.tablet_schema,
_context.tablet,
+ _context.data_dir, _context.max_rows_per_segment, writer_options,
_context.mow_context,
+ std::move(inverted_file_writer));
+ RETURN_IF_ERROR(_seg_files.add(segment_id,
std::move(segment_file_writer)));
auto s = writer->init();
if (!s.ok()) {
LOG(WARNING) << "failed to init segment writer: " << s.to_string();
diff --git a/be/src/olap/rowset/segment_creator.h
b/be/src/olap/rowset/segment_creator.h
index 3226ab0adf8..97a8f177ad9 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -17,9 +17,11 @@
#pragma once
+#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <string>
+#include <typeinfo>
#include <unordered_map>
#include <vector>
@@ -49,7 +51,8 @@ class FileWriterCreator {
public:
virtual ~FileWriterCreator() = default;
- virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer)
= 0;
+ virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
+ FileType file_type = FileType::SEGMENT_FILE) = 0;
};
template <class T>
@@ -57,8 +60,9 @@ class FileWriterCreatorT : public FileWriterCreator {
public:
explicit FileWriterCreatorT(T* t) : _t(t) {}
- Status create(uint32_t segment_id, io::FileWriterPtr& file_writer)
override {
- return _t->create_file_writer(segment_id, file_writer);
+ Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
+ FileType file_type = FileType::SEGMENT_FILE) override {
+ return _t->create_file_writer(segment_id, file_writer, file_type);
}
private:
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index cdd26fecf87..22d494e5132 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -341,9 +341,15 @@ size_t InvertedIndexFileWriter::write_v2() {
io::Path index_path
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs,
index_path.parent_path().c_str());
-
- auto compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
- out_dir->createOutput(index_path.filename().c_str()));
+ std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
+ // idx v2 writer != nullptr means memtable on sink node now
+ if (_idx_v2_writer != nullptr) {
+ compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
+ out_dir->createOutputV2(_idx_v2_writer.get()));
+ } else {
+ compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
+ out_dir->createOutput(index_path.filename().c_str()));
+ }
// Write the version number
compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
index 7ec71c0b38a..0d82504c07f 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
@@ -26,6 +26,7 @@
#include <vector>
#include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
namespace doris {
@@ -46,12 +47,14 @@ class InvertedIndexFileWriter {
public:
InvertedIndexFileWriter(io::FileSystemSPtr fs, std::string
index_path_prefix,
std::string rowset_id, int64_t seg_id,
- InvertedIndexStorageFormatPB storage_format)
+ InvertedIndexStorageFormatPB storage_format,
+ io::FileWriterPtr file_writer = nullptr)
: _fs(std::move(fs)),
_index_path_prefix(std::move(index_path_prefix)),
_rowset_id(std::move(rowset_id)),
_seg_id(seg_id),
- _storage_format(storage_format) {}
+ _storage_format(storage_format),
+ _idx_v2_writer(std::move(file_writer)) {}
Result<DorisFSDirectory*> open(const TabletIndex* index_meta);
Status delete_index(const TabletIndex* index_meta);
@@ -76,6 +79,8 @@ private:
int64_t _seg_id;
InvertedIndexStorageFormatPB _storage_format;
size_t _file_size = 0;
+ // write to disk or stream
+ io::FileWriterPtr _idx_v2_writer;
};
} // namespace segment_v2
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 54d484d1199..0443bf345ba 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -98,6 +98,21 @@ public:
int64_t length() const override;
};
+class DorisFSDirectory::FSIndexOutputV2 : public
lucene::store::BufferedIndexOutput {
+private:
+ io::FileWriter* _index_v2_file_writer = nullptr;
+
+protected:
+ void flushBuffer(const uint8_t* b, const int32_t size) override;
+
+public:
+ FSIndexOutputV2() = default;
+ void init(io::FileWriter* file_writer);
+ ~FSIndexOutputV2() override;
+ void close() override;
+ int64_t length() const override;
+};
+
bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const
char* path,
IndexInput*& ret, CLuceneError&
error,
int32_t buffer_size) {
@@ -333,6 +348,86 @@ int64_t DorisFSDirectory::FSIndexOutput::length() const {
return _writer->bytes_appended();
}
+void DorisFSDirectory::FSIndexOutputV2::init(io::FileWriter* file_writer) {
+ _index_v2_file_writer = file_writer;
+ DBUG_EXECUTE_IF(
+
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
+ "init",
+ {
+ _CLTHROWA(CL_ERR_IO,
+ "debug point: test throw error in fsindexoutput init
mock error");
+ })
+}
+
+DorisFSDirectory::FSIndexOutputV2::~FSIndexOutputV2() {}
+
+void DorisFSDirectory::FSIndexOutputV2::flushBuffer(const uint8_t* b, const
int32_t size) {
+ if (_index_v2_file_writer != nullptr && b != nullptr && size > 0) {
+ Slice data {b, (size_t)size};
+ DBUG_EXECUTE_IF(
+
"DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_"
+ "flushBuffer",
+ { return; })
+ Status st = _index_v2_file_writer->append(data);
+ DBUG_EXECUTE_IF(
+
"DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer", {
+ st =
Status::Error<doris::ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "flush buffer mock error");
+ })
+ if (!st.ok()) {
+ LOG(WARNING) << "File IO Write error: " << st.to_string();
+ _CLTHROWA(CL_ERR_IO, "writer append data when flushBuffer error");
+ }
+ } else {
+ if (_index_v2_file_writer == nullptr) {
+ LOG(WARNING) << "File writer is nullptr in
DorisFSDirectory::FSIndexOutputV2, "
+ "ignore flush.";
+ _CLTHROWA(CL_ERR_IO, "flushBuffer error, _index_v2_file_writer =
nullptr");
+ } else if (b == nullptr) {
+ LOG(WARNING) << "buffer is nullptr when flushBuffer in "
+ "DorisFSDirectory::FSIndexOutput";
+ }
+ }
+}
+
+void DorisFSDirectory::FSIndexOutputV2::close() {
+ try {
+ BufferedIndexOutput::close();
+ DBUG_EXECUTE_IF(
+
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_"
+ "close",
+ {
+ _CLTHROWA(CL_ERR_IO,
+ "debug point: test throw error in
bufferedindexoutput close");
+ })
+ } catch (CLuceneError& err) {
+ LOG(WARNING) << "FSIndexOutputV2 close, BufferedIndexOutput close
error: " << err.what();
+ if (err.number() == CL_ERR_IO) {
+ LOG(WARNING) << "FSIndexOutputV2 close, BufferedIndexOutput close
IO error: "
+ << err.what();
+ }
+ _CLTHROWA(err.number(), err.what());
+ }
+ if (_index_v2_file_writer) {
+ auto ret = _index_v2_file_writer->close();
+
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
+ { ret = Status::Error<INTERNAL_ERROR>("writer close
status error"); })
+ if (!ret.ok()) {
+ LOG(WARNING) << "FSIndexOutputV2 close, stream sink file writer
close error: "
+ << ret.to_string();
+ _CLTHROWA(CL_ERR_IO, ret.to_string().c_str());
+ }
+ } else {
+ LOG(WARNING) << "File writer is nullptr, ignore finalize and close.";
+ _CLTHROWA(CL_ERR_IO, "close file writer error, _index_v2_file_writer =
nullptr");
+ }
+}
+
+int64_t DorisFSDirectory::FSIndexOutputV2::length() const {
+ CND_PRECONDITION(_index_v2_file_writer != nullptr, "file is not open");
+ return _index_v2_file_writer->bytes_appended();
+}
+
DorisFSDirectory::DorisFSDirectory() {
filemode = 0644;
this->lockFactory = nullptr;
@@ -495,6 +590,12 @@ lucene::store::IndexOutput*
DorisFSDirectory::createOutput(const char* name) {
return ret;
}
+lucene::store::IndexOutput* DorisFSDirectory::createOutputV2(io::FileWriter*
file_writer) {
+ auto* ret = _CLNEW FSIndexOutputV2();
+ ret->init(file_writer);
+ return ret;
+}
+
std::string DorisFSDirectory::toString() const {
return std::string("DorisFSDirectory@") + this->directory;
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
index d9069d66ef2..b3e0352d7ad 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
@@ -60,9 +60,11 @@ protected:
public:
class FSIndexOutput;
+ class FSIndexOutputV2;
class FSIndexInput;
friend class DorisFSDirectory::FSIndexOutput;
+ friend class DorisFSDirectory::FSIndexOutputV2;
friend class DorisFSDirectory::FSIndexInput;
const io::FileSystemSPtr& getFileSystem() { return _fs; }
@@ -78,6 +80,7 @@ public:
void renameFile(const char* from, const char* to) override;
void touchFile(const char* name) override;
lucene::store::IndexOutput* createOutput(const char* name) override;
+ lucene::store::IndexOutput* createOutputV2(io::FileWriter* file_writer);
void close() override;
std::string toString() const override;
static const char* getClassName();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 729e2500384..d22e1060dd3 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -43,8 +43,9 @@
#include "olap/key_coder.h"
#include "olap/olap_common.h"
#include "olap/primary_key_index.h"
-#include "olap/row_cursor.h" // RowCursor // IWYU pragma:
keep
-#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
+#include "olap/row_cursor.h" // RowCursor // IWYU pragma:
keep
+#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
+#include "olap/rowset/segment_creator.h"
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
@@ -85,7 +86,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer,
uint32_t segment_id,
TabletSchemaSPtr tablet_schema, BaseTabletSPtr
tablet,
DataDir* data_dir, uint32_t max_row_per_segment,
const SegmentWriterOptions& opts,
- std::shared_ptr<MowContext> mow_context)
+ std::shared_ptr<MowContext> mow_context,
+ io::FileWriterPtr inverted_file_writer)
: _segment_id(segment_id),
_tablet_schema(std::move(tablet_schema)),
_tablet(std::move(tablet)),
@@ -140,7 +142,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer,
uint32_t segment_id,
std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(
file_writer->path().c_str())},
_opts.rowset_ctx->rowset_id.to_string(), segment_id,
- _tablet_schema->get_inverted_index_storage_format());
+ _tablet_schema->get_inverted_index_storage_format(),
+ std::move(inverted_file_writer));
}
}
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 92af12d4da6..9c667ee92fc 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -85,7 +85,8 @@ public:
explicit SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
TabletSchemaSPtr tablet_schema, BaseTabletSPtr
tablet, DataDir* data_dir,
uint32_t max_row_per_segment, const
SegmentWriterOptions& opts,
- std::shared_ptr<MowContext> mow_context);
+ std::shared_ptr<MowContext> mow_context,
+ io::FileWriterPtr inverted_file_writer = nullptr);
~SegmentWriter();
Status init();
@@ -197,7 +198,6 @@ private:
// Not owned. owned by RowsetWriter or SegmentFlusher
io::FileWriter* _file_writer = nullptr;
std::unique_ptr<InvertedIndexFileWriter> _inverted_index_file_writer;
-
SegmentFooterPB _footer;
size_t _num_key_columns;
size_t _num_short_key_columns;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 0930325d6d8..5d2d6ac0769 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -42,8 +42,9 @@
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
-#include "olap/row_cursor.h" // RowCursor // IWYU pragma:
keep
-#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
+#include "olap/row_cursor.h" // RowCursor // IWYU pragma:
keep
+#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
+#include "olap/rowset/segment_creator.h"
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
@@ -83,7 +84,8 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter*
file_writer, uint32
TabletSchemaSPtr tablet_schema,
BaseTabletSPtr tablet,
DataDir* data_dir, uint32_t
max_row_per_segment,
const
VerticalSegmentWriterOptions& opts,
- std::shared_ptr<MowContext>
mow_context)
+ std::shared_ptr<MowContext>
mow_context,
+ io::FileWriterPtr
inverted_file_writer)
: _segment_id(segment_id),
_tablet_schema(std::move(tablet_schema)),
_tablet(std::move(tablet)),
@@ -114,7 +116,8 @@
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(
_opts.rowset_ctx->segment_path(segment_id))},
_opts.rowset_ctx->rowset_id.to_string(), segment_id,
- _tablet_schema->get_inverted_index_storage_format());
+ _tablet_schema->get_inverted_index_storage_format(),
+ std::move(inverted_file_writer));
}
}
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index 3809a8301d5..8068b3e44be 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -82,7 +82,8 @@ public:
TabletSchemaSPtr tablet_schema,
BaseTabletSPtr tablet,
DataDir* data_dir, uint32_t
max_row_per_segment,
const VerticalSegmentWriterOptions& opts,
- std::shared_ptr<MowContext> mow_context);
+ std::shared_ptr<MowContext> mow_context,
+ io::FileWriterPtr inverted_file_writer =
nullptr);
~VerticalSegmentWriter();
VerticalSegmentWriter(const VerticalSegmentWriter&) = delete;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 6980f0be3f2..8138c7594b8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -962,7 +962,7 @@ Status
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
case TDataSinkType::OLAP_TABLE_SINK: {
if (state->query_options().enable_memtable_on_sink_node &&
-
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+
!_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
!config::is_cloud_mode()) {
_sink.reset(new OlapTableSinkV2OperatorX(pool,
next_sink_operator_id(), row_desc,
output_exprs));
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index fc8a6cd1b61..0a35bf6008e 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -136,15 +136,23 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
// Each sender sends data in one segment sequential, so we also do not
// need a lock here.
bool eos = header.segment_eos();
+ FileType file_type = header.file_type();
uint32_t new_segid = mapping->at(segid);
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
- auto flush_func = [this, new_segid, eos, buf, header]() {
+ auto flush_func = [this, new_segid, eos, buf, header, file_type]() {
signal::set_signal_task_id(_load_id);
g_load_stream_flush_running_threads << -1;
- auto st = _load_stream_writer->append_data(new_segid, header.offset(),
buf);
+ auto st = _load_stream_writer->append_data(new_segid, header.offset(),
buf, file_type);
if (eos && st.ok()) {
- st = _load_stream_writer->close_segment(new_segid);
+ if (file_type == FileType::SEGMENT_FILE || file_type ==
FileType::INVERTED_INDEX_FILE) {
+ st = _load_stream_writer->close_writer(new_segid, file_type);
+ } else {
+ st = Status::InternalError(
+ "appent data failed, file type error, file type = {}, "
+ "segment_id={}",
+ file_type, new_segid);
+ }
}
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index b2635698379..9e6e0e36a4b 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -18,7 +18,7 @@
#pragma once
#include <bthread/mutex.h>
-#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_common.pb.h>
#include <condition_variable>
#include <memory>
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index bc02be98bc4..925229a43ce 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -94,28 +94,31 @@ Status LoadStreamWriter::init() {
return Status::OK();
}
-Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset,
butil::IOBuf buf) {
+Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset,
butil::IOBuf buf,
+ FileType file_type) {
SCOPED_ATTACH_TASK(_query_thread_context);
io::FileWriter* file_writer = nullptr;
+ auto& file_writers =
+ file_type == FileType::SEGMENT_FILE ? _segment_file_writers :
_inverted_file_writers;
{
std::lock_guard lock_guard(_lock);
DCHECK(_is_init);
- if (segid >= _segment_file_writers.size()) {
- for (size_t i = _segment_file_writers.size(); i <= segid; i++) {
+ if (segid >= file_writers.size()) {
+ for (size_t i = file_writers.size(); i <= segid; i++) {
Status st;
io::FileWriterPtr file_writer;
- st = _rowset_writer->create_file_writer(i, file_writer);
+ st = _rowset_writer->create_file_writer(i, file_writer,
file_type);
if (!st.ok()) {
_is_canceled = true;
return st;
}
- _segment_file_writers.push_back(std::move(file_writer));
+ file_writers.push_back(std::move(file_writer));
g_load_stream_file_writer_cnt << 1;
}
}
// TODO: IOBuf to Slice
- file_writer = _segment_file_writers[segid].get();
+ file_writer = file_writers[segid].get();
}
DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", {
file_writer = nullptr; });
VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
@@ -130,25 +133,32 @@ Status LoadStreamWriter::append_data(uint32_t segid,
uint64_t offset, butil::IOB
return file_writer->append(buf.to_string());
}
-Status LoadStreamWriter::close_segment(uint32_t segid) {
+Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
SCOPED_ATTACH_TASK(_query_thread_context);
io::FileWriter* file_writer = nullptr;
+ auto& file_writers =
+ file_type == FileType::SEGMENT_FILE ? _segment_file_writers :
_inverted_file_writers;
{
std::lock_guard lock_guard(_lock);
- DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.uninited_writer", {
_is_init = false; });
+ DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.uninited_writer", {
_is_init = false; });
if (!_is_init) {
- return Status::Corruption("close_segment failed, LoadStreamWriter
is not inited");
+ return Status::Corruption("close_writer failed, LoadStreamWriter
is not inited");
}
- DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.bad_segid",
- { segid = _segment_file_writers.size(); });
- if (segid >= _segment_file_writers.size()) {
- return Status::Corruption("close_segment failed, segment {} is
never opened", segid);
+ DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid",
+ { segid = file_writers.size(); });
+ if (segid >= file_writers.size()) {
+ return Status::Corruption(
+ "close_writer failed, file {} is never opened, file type
is {}", segid,
+ file_type);
}
- file_writer = _segment_file_writers[segid].get();
+ file_writer = file_writers[segid].get();
}
- DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.null_file_writer", {
file_writer = nullptr; });
+
+ DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", {
file_writer = nullptr; });
if (file_writer == nullptr) {
- return Status::Corruption("close_segment failed, file writer {} is
destoryed", segid);
+ return Status::Corruption(
+ "close_writer failed, file writer {} is destoryed, fiel type
is {}", segid,
+ file_type);
}
auto st = file_writer->close();
if (!st.ok()) {
@@ -156,10 +166,12 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
return st;
}
g_load_stream_file_writer_cnt << -1;
- LOG(INFO) << "segment " << segid << " path " <<
file_writer->path().native()
- << "closed, written " << file_writer->bytes_appended() << "
bytes";
+ LOG(INFO) << "file " << segid << " path " << file_writer->path().native()
<< "closed, written "
+ << file_writer->bytes_appended() << " bytes"
+ << ", file type is " << file_type;
if (file_writer->bytes_appended() == 0) {
- return Status::Corruption("segment {} closed with 0 bytes",
file_writer->path().native());
+ return Status::Corruption("file {} closed with 0 bytes, file type is
{}",
+ file_writer->path().native(), file_type);
}
return Status::OK();
}
@@ -167,35 +179,62 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics&
stat,
TabletSchemaSPtr flush_schema) {
SCOPED_ATTACH_TASK(_query_thread_context);
- io::FileWriter* file_writer = nullptr;
+ size_t segment_file_size = 0;
+ size_t inverted_file_size = 0;
{
std::lock_guard lock_guard(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", {
_is_init = false; });
if (!_is_init) {
return Status::Corruption("add_segment failed, LoadStreamWriter is
not inited");
}
+ if (_inverted_file_writers.size() > 0 &&
+ _inverted_file_writers.size() != _segment_file_writers.size()) {
+ return Status::Corruption(
+ "add_segment failed, inverted file writer size is {},"
+ "segment file writer size is {}",
+ _inverted_file_writers.size(),
_segment_file_writers.size());
+ }
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
{ segid = _segment_file_writers.size(); });
- if (segid >= _segment_file_writers.size()) {
- return Status::Corruption("add_segment failed, segment {} is never
opened", segid);
+ RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE,
&segment_file_size));
+ if (_inverted_file_writers.size() > 0) {
+ RETURN_IF_ERROR(
+ _calc_file_size(segid, FileType::INVERTED_INDEX_FILE,
&inverted_file_size));
}
- file_writer = _segment_file_writers[segid].get();
}
- DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.null_file_writer", {
file_writer = nullptr; });
+
+ if (segment_file_size + inverted_file_size != stat.data_size) {
+ return Status::Corruption(
+ "add_segment failed, segment stat {} does not match, file
size={}, inverted file "
+ "size={}, stat.data_size={}, tablet id={}",
+ segid, segment_file_size, inverted_file_size, stat.data_size,
_req.tablet_id);
+ }
+
+ return _rowset_writer->add_segment(segid, stat, flush_schema);
+}
+
+Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type,
size_t* file_size) {
+ io::FileWriter* file_writer = nullptr;
+ auto& file_writers =
+ (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers :
_inverted_file_writers;
+
+ if (segid >= file_writers.size()) {
+ return Status::Corruption("calc file size failed, file {} is never
opened, file type is {}",
+ segid, file_type);
+ }
+ file_writer = file_writers[segid].get();
+ DBUG_EXECUTE_IF("LoadStreamWriter.calc_file_size.null_file_writer", {
file_writer = nullptr; });
if (file_writer == nullptr) {
- return Status::Corruption("add_segment failed, file writer {} is
destoryed", segid);
+ return Status::Corruption(
+ "calc file size failed, file writer {} is destoryed, file type
is {}", segid,
+ file_type);
}
if (file_writer->state() != io::FileWriter::State::CLOSED) {
- return Status::Corruption("add_segment failed, segment {} is not
closed",
+ return Status::Corruption("calc file size failed, file {} is not
closed",
file_writer->path().native());
}
- if (file_writer->bytes_appended() != stat.data_size) {
- return Status::Corruption(
- "add_segment failed, segment stat {} does not match, file
size={}, "
- "stat.data_size={}",
- file_writer->path().native(), file_writer->bytes_appended(),
stat.data_size);
- }
- return _rowset_writer->add_segment(segid, stat, flush_schema);
+ *file_size = file_writer->bytes_appended();
+ return Status::OK();
}
Status LoadStreamWriter::close() {
@@ -224,6 +263,14 @@ Status LoadStreamWriter::close() {
}
}
+ for (const auto& writer : _inverted_file_writers) {
+ if (writer->state() != io::FileWriter::State::CLOSED) {
+ return Status::Corruption(
+ "LoadStreamWriter close failed, inverted file {} is not
closed",
+ writer->path().native());
+ }
+ }
+
RETURN_IF_ERROR(_rowset_builder->build_rowset());
RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index 9e3fce3c7db..b22817cb85c 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -17,6 +17,8 @@
#pragma once
+#include <gen_cpp/internal_service.pb.h>
+
#include <atomic>
#include <memory>
#include <mutex>
@@ -61,12 +63,15 @@ public:
Status init();
- Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf);
+ Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf,
+ FileType file_type = FileType::SEGMENT_FILE);
- Status close_segment(uint32_t segid);
+ Status close_writer(uint32_t segid, FileType file_type);
Status add_segment(uint32_t segid, const SegmentStatistics& stat,
TabletSchemaSPtr flush_chema);
+ Status _calc_file_size(uint32_t segid, FileType file_type, size_t*
file_size);
+
// wait for all memtables to be flushed.
Status close();
@@ -81,6 +86,7 @@ private:
std::unordered_map<uint32_t /*segid*/, SegmentStatisticsSharedPtr>
_segment_stat_map;
std::mutex _segment_stat_map_lock;
std::vector<io::FileWriterPtr> _segment_file_writers;
+ std::vector<io::FileWriterPtr> _inverted_file_writers;
QueryThreadContext _query_thread_context;
};
diff --git a/be/src/util/thrift_util.cpp b/be/src/util/thrift_util.cpp
index 395c01ec390..2efb012aa20 100644
--- a/be/src/util/thrift_util.cpp
+++ b/be/src/util/thrift_util.cpp
@@ -156,7 +156,7 @@ std::string to_string(const TUniqueId& id) {
return std::to_string(id.hi).append(std::to_string(id.lo));
}
-bool _has_inverted_index_or_partial_update(TOlapTableSink sink) {
+bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink) {
OlapTableSchemaParam schema;
if (!schema.init(sink.schema).ok()) {
return false;
@@ -167,7 +167,12 @@ bool _has_inverted_index_or_partial_update(TOlapTableSink
sink) {
for (const auto& index_schema : schema.indexes()) {
for (const auto& index : index_schema->indexes) {
if (index->index_type() == INVERTED) {
- return true;
+ if (sink.schema.inverted_index_file_storage_format ==
+ TInvertedIndexFileStorageFormat::V1) {
+ return true;
+ } else {
+ return false;
+ }
}
}
}
diff --git a/be/src/util/thrift_util.h b/be/src/util/thrift_util.h
index 9f4792ff64b..a7d6620d5d3 100644
--- a/be/src/util/thrift_util.h
+++ b/be/src/util/thrift_util.h
@@ -177,6 +177,6 @@ bool t_network_address_comparator(const TNetworkAddress& a,
const TNetworkAddres
PURE std::string to_string(const TUniqueId& id);
-PURE bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
+PURE bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink);
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index caebb381db6..63f91678989 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -206,7 +206,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
// APPEND_DATA
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int64_t segment_id, uint64_t offset,
std::span<const Slice> data,
- bool segment_eos) {
+ bool segment_eos, FileType file_type) {
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
@@ -217,6 +217,7 @@ Status LoadStreamStub::append_data(int64_t partition_id,
int64_t index_id, int64
header.set_segment_eos(segment_eos);
header.set_offset(offset);
header.set_opcode(doris::PStreamHeader::APPEND_DATA);
+ header.set_file_type(file_type);
return _encode_and_send(header, data);
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index a7d34ff8569..dd15eb7bf4c 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -137,7 +137,7 @@ public:
Status
append_data(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
int64_t segment_id, uint64_t offset, std::span<const
Slice> data,
- bool segment_eos = false);
+ bool segment_eos = false, FileType file_type =
FileType::SEGMENT_FILE);
// ADD_SEGMENT
Status add_segment(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index b9b0e0818cf..69f286b205b 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -60,7 +60,8 @@ class StreamSinkFileWriterTest : public testing::Test {
// APPEND_DATA
virtual Status append_data(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int64_t segment_id, uint64_t offset,
std::span<const Slice> data,
- bool segment_eos = false) override {
+ bool segment_eos = false,
+ FileType file_type =
FileType::SEGMENT_FILE) override {
EXPECT_EQ(PARTITION_ID, partition_id);
EXPECT_EQ(INDEX_ID, index_id);
EXPECT_EQ(TABLET_ID, tablet_id);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 8c40e467338..996f9d2fc1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -327,6 +327,7 @@ public class OlapTableSink extends DataSink {
}
}
}
+
schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat());
return schemaParam;
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 9d8a72e01cc..4457c50917b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -24,6 +24,7 @@ option java_package = "org.apache.doris.proto";
import "data.proto";
import "descriptors.proto";
import "types.proto";
+import "olap_common.proto";
import "olap_file.proto";
option cc_generic_services = true;
@@ -909,6 +910,7 @@ message PStreamHeader {
repeated PTabletID tablets = 10;
optional TabletSchemaPB flush_schema = 11;
optional uint64 offset = 12;
+ optional FileType file_type = 13;
}
message PGetWalQueueSizeRequest{
diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto
index a452e0ff6a6..e60aa7603fc 100644
--- a/gensrc/proto/olap_common.proto
+++ b/gensrc/proto/olap_common.proto
@@ -58,3 +58,8 @@ message PTopNCounter {
required uint32 space_expand_rate = 2;
repeated PCounter counter = 3;
}
+
+enum FileType {
+ SEGMENT_FILE = 1;
+ INVERTED_INDEX_FILE = 2;
+}
\ No newline at end of file
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 2b8a74afd66..cb844c93361 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -249,6 +249,7 @@ struct TOlapTableSchemaParam {
10: optional bool is_strict_mode = false
11: optional string auto_increment_column
12: optional i32 auto_increment_column_unique_id = -1
+ 13: optional Types.TInvertedIndexFileStorageFormat
inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1
}
struct TTabletLocation {
diff --git a/regression-test/data/inverted_index_p0/load/test_insert.out
b/regression-test/data/inverted_index_p0/load/test_insert.out
new file mode 100644
index 00000000000..b8f7f12afbc
--- /dev/null
+++ b/regression-test/data/inverted_index_p0/load/test_insert.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
diff --git a/regression-test/data/inverted_index_p0/load/test_stream_load.out
b/regression-test/data/inverted_index_p0/load/test_stream_load.out
new file mode 100644
index 00000000000..5723e4218d2
--- /dev/null
+++ b/regression-test/data/inverted_index_p0/load/test_stream_load.out
@@ -0,0 +1,45 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_1 --
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+
+-- !sql_1 --
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+
diff --git
a/regression-test/data/fault_injection_p0/test_index_lowercase_fault_injection.out
b/regression-test/data/inverted_index_p0/test_index_lowercase_fault_injection.out
similarity index 100%
rename from
regression-test/data/fault_injection_p0/test_index_lowercase_fault_injection.out
rename to
regression-test/data/inverted_index_p0/test_index_lowercase_fault_injection.out
diff --git
a/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out
b/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out
new file mode 100644
index 00000000000..5c6d903a9b4
--- /dev/null
+++
b/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_1 --
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_1 --
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+
+-- !sql_2 --
+2 {"a":18811,"b":"hello world","c":1181111}
+3 {"a":18811,"b":"hello wworld","c":11111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2 {"a":18811,"b":"hello world","c":1181111}
+4 {"a":1234,"b":"hello xxx world","c":8181111}
+
diff --git
a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
index ea9e9ffb8bb..fb04b128822 100644
---
a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
@@ -96,8 +96,11 @@ suite("test_delta_writer_v2_back_pressure_fault_injection",
"nonConcurrent") {
logger.info(res.toString())
}
}
+
} catch(Exception e) {
logger.info(e.getMessage())
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure")
}
sql """ DROP TABLE IF EXISTS `baseall` """
diff --git
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
index 65e68f3d3fa..6a6aa0efd43 100644
---
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
@@ -143,22 +143,22 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("LocalFileSystem.create_file_impl.open_file_failed",
"")
// LoadStreamWriter append_data meet null file writer error
load_with_injection("LoadStreamWriter.append_data.null_file_writer", "")
- // LoadStreamWriter close_segment meet not inited error
- load_with_injection("LoadStreamWriter.close_segment.uninited_writer", "")
- // LoadStreamWriter close_segment meet not bad segid error
- load_with_injection("LoadStreamWriter.close_segment.bad_segid", "")
- // LoadStreamWriter close_segment meet null file writer error
- load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "")
- // LoadStreamWriter close_segment meet file writer failed to close error
+ // LoadStreamWriter close_writer meet not inited error
+ load_with_injection("LoadStreamWriter.close_writer.uninited_writer", "")
+ // LoadStreamWriter close_writer meet not bad segid error
+ load_with_injection("LoadStreamWriter.close_writer.bad_segid", "")
+ // LoadStreamWriter close_writer meet null file writer error
+ load_with_injection("LoadStreamWriter.close_writer.null_file_writer", "")
+ // LoadStreamWriter close_writer meet file writer failed to close error
load_with_injection("LocalFileWriter.close.failed", "")
- // LoadStreamWriter close_segment meet bytes_appended and real file size
not match error
- load_with_injection("FileWriter.close_segment.zero_bytes_appended", "")
+ // LoadStreamWriter close_writer meet bytes_appended and real file size
not match error
+ load_with_injection("FileWriter.close_writer.zero_bytes_appended", "")
// LoadStreamWriter add_segment meet not inited error
load_with_injection("LoadStreamWriter.add_segment.uninited_writer", "")
// LoadStreamWriter add_segment meet not bad segid error
load_with_injection("LoadStreamWriter.add_segment.bad_segid", "")
// LoadStreamWriter add_segment meet null file writer error
- load_with_injection("LoadStreamWriter.add_segment.null_file_writer", "")
+ load_with_injection("LoadStreamWriter.calc_file_size.null_file_writer", "")
// LoadStreamWriter add_segment meet bytes_appended and real file size not
match error
load_with_injection("FileWriter.add_segment.zero_bytes_appended", "")
// LoadStream init failed coz LoadStreamWriter init failed
diff --git a/regression-test/suites/inverted_index_p0/load/test_insert.groovy
b/regression-test/suites/inverted_index_p0/load/test_insert.groovy
new file mode 100644
index 00000000000..97b3ca07937
--- /dev/null
+++ b/regression-test/suites/inverted_index_p0/load/test_insert.groovy
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_insert_with_index", "p0") {
+
+ def set_be_config = { key, value ->
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+ def test = { format ->
+ def srcName = "src_table"
+ def dstName = "dst_table"
+ sql """ DROP TABLE IF EXISTS ${srcName}; """
+ sql """
+ CREATE TABLE ${srcName} (
+ k bigint,
+ v variant,
+ INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" =
"english") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k`) BUCKETS 2
+ PROPERTIES ( "replication_num" = "1",
"inverted_index_storage_format" = ${format});
+ """
+
+ sql """insert into ${srcName} values(1, '{"a" : 123, "b" : "xxxyyy",
"c" : 111999111}')"""
+ sql """insert into ${srcName} values(2, '{"a" : 18811, "b" : "hello
world", "c" : 1181111}')"""
+ sql """insert into ${srcName} values(3, '{"a" : 18811, "b" : "hello
wworld", "c" : 11111}')"""
+ sql """insert into ${srcName} values(4, '{"a" : 1234, "b" : "hello xxx
world", "c" : 8181111}')"""
+ qt_sql_2 """select * from ${srcName} where cast(v["a"] as smallint) >
123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024
order by k"""
+ sql """insert into ${srcName} values(5, '{"a" : 123456789, "b" :
123456, "c" : 8181111}')"""
+ qt_sql_3 """select * from ${srcName} where cast(v["a"] as int) > 123
and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order
by k"""
+
+ sql """ DROP TABLE IF EXISTS ${dstName}; """
+ sql """
+ CREATE TABLE ${dstName} (
+ k bigint,
+ v variant,
+ INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" =
"english") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k`) BUCKETS 2
+ PROPERTIES ( "replication_num" = "1",
"inverted_index_storage_format" = ${format});
+ """
+ sql """ insert into ${dstName} select * from ${srcName}"""
+ qt_sql_2 """select * from ${srcName} where cast(v["a"] as smallint) >
123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024
order by k"""
+ qt_sql_3 """select * from ${srcName} where cast(v["a"] as int) > 123
and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order
by k"""
+ sql """ DROP TABLE IF EXISTS ${dstName}; """
+ sql """ DROP TABLE IF EXISTS ${srcName}; """
+ }
+
+ set_be_config("inverted_index_ram_dir_enable", "true")
+ test.call("V1")
+ test.call("V2")
+ set_be_config("inverted_index_ram_dir_enable", "false")
+ test.call("V1")
+ test.call("V2")
+ set_be_config("inverted_index_ram_dir_enable", "true")
+}
\ No newline at end of file
diff --git
a/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy
b/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy
new file mode 100644
index 00000000000..0fd0ca35627
--- /dev/null
+++ b/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_spark_load_with_index_p0", "p0") {
+
+ def set_be_config = { key, value ->
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def test = { format ->
+ // Need spark cluster, upload data file to hdfs
+ def testTable = "tbl_test_spark_load"
+ def testTable2 = "tbl_test_spark_load2"
+ def testResource = "spark_resource"
+ def yarnAddress = "master:8032"
+ def hdfsAddress = "hdfs://master:9000"
+ def hdfsWorkingDir = "hdfs://master:9000/doris"
+ brokerName =getBrokerName()
+ hdfsUser = getHdfsUser()
+ hdfsPasswd = getHdfsPasswd()
+
+ def create_test_table = {testTablex ->
+ def result1 = sql """
+ CREATE TABLE IF NOT EXISTS ${testTablex} (
+ c_int int(11) NULL,
+ c_char char(15) NULL,
+ c_varchar varchar(100) NULL,
+ c_bool boolean NULL,
+ c_tinyint tinyint(4) NULL,
+ c_smallint smallint(6) NULL,
+ c_bigint bigint(20) NULL,
+ c_largeint largeint(40) NULL,
+ c_float float NULL,
+ c_double double NULL,
+ c_decimal decimal(6, 3) NULL,
+ c_decimalv3 decimal(6, 3) NULL,
+ c_date date NULL,
+ c_datev2 date NULL,
+ c_datetime datetime NULL,
+ c_datetimev2 datetime NULL,
+ INDEX idx_c_varchar(c_varchar) USING INVERTED,
+ INDEX idx_c_datetime(c_datetime) USING INVERTED
+ )
+ DISTRIBUTED BY HASH(c_int) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "inverted_index_storage_format" = ${format}
+ )
+ """
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+ }
+
+ def create_spark_resource = {sparkType, sparkMaster, sparkQueue ->
+ def result1 = sql """
+ CREATE EXTERNAL RESOURCE "${testResource}"
+ PROPERTIES
+ (
+ "type" = "spark",
+ "spark.master" = "yarn",
+ "spark.submit.deployMode" = "cluster",
+ "spark.executor.memory" = "1g",
+ "spark.yarn.queue" = "default",
+ "spark.hadoop.yarn.resourcemanager.address" =
"${yarnAddress}",
+ "spark.hadoop.fs.defaultFS" = "${hdfsAddress}",
+ "working_dir" = "${hdfsWorkingDir}",
+ "broker" = "${brokerName}",
+ "broker.username" = "${hdfsUser}",
+ "broker.password" = "${hdfsPasswd}"
+ );
+ """
+
+ // DDL/DML return 1 row and 3 column, the only value is update row
count
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Create resource should update 0
rows")
+ }
+
+ def load_from_hdfs_use_spark = {testTablex, testTablex2, label,
hdfsFilePath1, hdfsFilePath2 ->
+ def result1= sql """
+ LOAD LABEL ${label}
+ (
+ DATA INFILE("${hdfsFilePath1}")
+ INTO TABLE ${testTablex}
+ COLUMNS TERMINATED BY ",",
+ DATA INFILE("${hdfsFilePath2}")
+ INTO TABLE ${testTablex2}
+ COLUMNS TERMINATED BY "|"
+ )
+ WITH RESOURCE '${testResource}'
+ (
+ "spark.executor.memory" = "2g",
+ "spark.shuffle.compress" = "true"
+ )
+ PROPERTIES
+ (
+ "timeout" = "3600"
+ );
+ """
+
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
+ }
+
+ def check_load_result = {checklabel, testTablex, testTablex2 ->
+ max_try_milli_secs = 10000
+ while(max_try_milli_secs) {
+ result = sql "show load where label = '${checklabel}'"
+ if(result[0][2] == "FINISHED") {
+ sql "sync"
+ qt_select "select * from ${testTablex} order by c_int"
+ qt_select "select * from ${testTablex2} order by c_int"
+ break
+ } else {
+ sleep(1000) // wait 1 second every time
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertEquals(1, 2)
+ }
+ }
+ }
+ }
+
+ // if 'enableHdfs' in regression-conf.groovy has been set to true,
+ if (enableHdfs()) {
+ def hdfs_txt_file_path1 = uploadToHdfs
"load_p0/spark_load/all_types1.txt"
+ def hdfs_txt_file_path2 = uploadToHdfs
"load_p0/spark_load/all_types2.txt"
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ sql "DROP TABLE IF EXISTS ${testTable2}"
+ create_test_table.call(testTable)
+ create_test_table.call(testTable2)
+ def test_load_label =
UUID.randomUUID().toString().replaceAll("-", "")
+ load_from_hdfs.call(testTable, testTable2, test_load_label,
hdfs_txt_file_path1, hdfs_txt_file_path2)
+ check_load_result.call(test_load_label, testTable, testTable2)
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ try_sql("DROP TABLE IF EXISTS ${testTable2}")
+ }
+ }
+ }
+
+ set_be_config("inverted_index_ram_dir_enable", "true")
+ test.call("V1")
+ test.call("V2")
+ set_be_config("inverted_index_ram_dir_enable", "false")
+ test.call("V1")
+ test.call("V2")
+ set_be_config("inverted_index_ram_dir_enable", "true")
+}
diff --git
a/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy
b/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy
new file mode 100644
index 00000000000..f29ff3b3512
--- /dev/null
+++ b/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_stream_load_with_inverted_index_p0", "nonCurrent") {
+
+ def set_be_config = { key, value ->
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def tableName = "test_stream_load_with_inverted_index"
+ def calc_file_crc_on_tablet = { ip, port, tablet ->
+ return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
+ }
+
+ def load_json_data = {table_name, file_name ->
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ set 'max_filter_ratio', '0.1'
+ set 'memtable_on_sink_node', 'true'
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ logger.info("Stream load ${file_name} result:
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ // assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ boolean disableAutoCompaction = true
+ boolean has_update_be_config = false
+ try {
+ String backend_id;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "disable_auto_compaction") {
+ disableAutoCompaction = Boolean.parseBoolean(((List<String>)
ele)[2])
+ }
+ }
+ set_be_config.call("disable_auto_compaction", "true")
+ has_update_be_config = true
+
+ def test = { format ->
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE ${tableName} (
+ k bigint,
+ v variant,
+ INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" =
"english") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k`) BUCKETS 10
+ PROPERTIES ( "replication_num" = "1",
"inverted_index_storage_format" = ${format});
+ """
+
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+
+
+ def tablets = sql_return_maparray """ show tablets from
${tableName}; """
+
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
calc_file_crc_on_tablet(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run calc file: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def resultJson = parseJson(out.trim())
+ assertEquals(resultJson.start_version, "0")
+ assertEquals(resultJson.end_version, "11")
+ assertEquals(resultJson.rowset_count, "11")
+ }
+ qt_sql_1 """
+ select cast(v["repo"]["name"] as string) from ${tableName} where
cast(v["repo"]["name"] as string) match_phrase_prefix "davesbingrewardsbot";
+ """
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ }
+
+ set_be_config("inverted_index_ram_dir_enable", "true")
+ // test.call("V1")
+ test.call("V2")
+ set_be_config("inverted_index_ram_dir_enable", "false")
+ // test.call("V1")
+ test.call("V2")
+ set_be_config("inverted_index_ram_dir_enable", "true")
+
+ } finally {
+ if (has_update_be_config) {
+ set_be_config.call("disable_auto_compaction",
disableAutoCompaction.toString())
+ }
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy
b/regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy
similarity index 99%
rename from
regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy
rename to
regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy
index e18060a7d68..2ed2a04a93b 100644
---
a/regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy
+++
b/regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy
@@ -77,4 +77,4 @@ suite("test_index_lowercase_fault_injection",
"nonConcurrent") {
qt_sql """ select count() from ${testTable} where (request match
'http'); """
} finally {
}
-}
\ No newline at end of file
+ }
diff --git
a/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy
b/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy
new file mode 100644
index 00000000000..a68870e80d6
--- /dev/null
+++
b/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_stream_load_with_inverted_index", "p2") {
+ def tableName = "test_stream_load_with_inverted_index"
+
+ def set_be_config = { key, value ->
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def calc_file_crc_on_tablet = { ip, port, tablet ->
+ return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
+ }
+
+ def load_json_data = {table_name, file_name ->
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ set 'max_filter_ratio', '0.1'
+ set 'memtable_on_sink_node', 'true'
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ logger.info("Stream load ${file_name} result:
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ // assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ boolean disableAutoCompaction = true
+ boolean has_update_be_config = false
+ try {
+ String backend_id;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "disable_auto_compaction") {
+ disableAutoCompaction = Boolean.parseBoolean(((List<String>)
ele)[2])
+ }
+ }
+ set_be_config.call("disable_auto_compaction", "true")
+ has_update_be_config = true
+
+ def test = { format ->
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE ${tableName} (
+ k bigint,
+ v variant,
+ INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" =
"english") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "2",
"inverted_index_storage_format" = ${format});
+ """
+
+ def tablets = sql_return_maparray """ show tablets from
${tableName}; """
+
+ String first_backend_id;
+ List<String> other_backend_id = new ArrayList<>()
+
+ String tablet_id = tablets[0].TabletId
+ def tablet_info = sql_return_maparray """ show tablet
${tablet_id}; """
+ logger.info("tablet: " + tablet_info)
+ for (def tablet in tablets) {
+ first_backend_id = tablet.BackendId
+ other_backend_id.add(tablet.BackendId)
+ }
+ other_backend_id.remove(first_backend_id)
+
+ def checkTabletFileCrc = {
+ def (first_code, first_out, first_err) =
calc_file_crc_on_tablet(backendId_to_backendIP[first_backend_id],
backendId_to_backendHttpPort[first_backend_id], tablet_id)
+ logger.info("Run calc_file_crc_on_tablet: ip=" +
backendId_to_backendIP[first_backend_id] + " code=" + first_code + ", out=" +
first_out + ", err=" + first_err)
+
+ for (String backend: other_backend_id) {
+ def (other_code, other_out, other_err) =
calc_file_crc_on_tablet(backendId_to_backendIP[backend],
backendId_to_backendHttpPort[backend], tablet_id)
+ logger.info("Run calc_file_crc_on_tablet: ip=" +
backendId_to_backendIP[backend] + " code=" + other_code + ", out=" + other_out
+ ", err=" + other_err)
+ assertTrue(parseJson(first_out.trim()).crc_value ==
parseJson(other_out.trim()).crc_value)
+ assertTrue(parseJson(first_out.trim()).start_version ==
parseJson(other_out.trim()).start_version)
+ assertTrue(parseJson(first_out.trim()).end_version ==
parseJson(other_out.trim()).end_version)
+ assertTrue(parseJson(first_out.trim()).file_count ==
parseJson(other_out.trim()).file_count)
+ assertTrue(parseJson(first_out.trim()).rowset_count ==
parseJson(other_out.trim()).rowset_count)
+ }
+ }
+
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(tableName, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+
+
+ // check
+ checkTabletFileCrc.call()
+
+ qt_sql_1 """
+ select cast(v["repo"]["name"] as string) from ${tableName} where
cast(v["repo"]["name"] as string) match "davesbingrewardsbot";
+ """
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE ${tableName} (
+ k bigint,
+ v variant,
+ INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" =
"english") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "2",
"inverted_index_storage_format" = ${format});
+ """
+
+ sql """insert into ${tableName} values(1, '{"a" : 123, "b" :
"xxxyyy", "c" : 111999111}')"""
+ sql """insert into ${tableName} values(2, '{"a" : 18811, "b" :
"hello world", "c" : 1181111}')"""
+ sql """insert into ${tableName} values(3, '{"a" : 18811, "b" :
"hello wworld", "c" : 11111}')"""
+ sql """insert into ${tableName} values(4, '{"a" : 1234, "b" :
"hello xxx world", "c" : 8181111}')"""
+ qt_sql_2 """select * from ${tableName} where cast(v["a"] as
smallint) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as
int) > 1024 order by k"""
+ sql """insert into ${tableName} values(5, '{"a" : 123456789, "b" :
123456, "c" : 8181111}')"""
+ qt_sql_3 """select * from ${tableName} where cast(v["a"] as int) >
123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111
order by k"""
+
+ // check
+ checkTabletFileCrc.call()
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ }
+ set_be_config("inverted_index_ram_dir_enable", "true")
+ // test.call("V1")
+ test.call("V2")
+ set_be_config("inverted_index_ram_dir_enable", "false")
+ // test.call("V1")
+ test.call("V2")
+ set_be_config("inverted_index_ram_dir_enable", "true")
+ } finally {
+ if (has_update_be_config) {
+ set_be_config.call("disable_auto_compaction",
disableAutoCompaction.toString())
+ }
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy
b/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy
new file mode 100644
index 00000000000..055cff66c09
--- /dev/null
+++ b/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_into_with_inverted_index", "p2"){
+ def src_table = "srcTable"
+ def dst_table = "dstTable"
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${src_table} (
+ k bigint,
+ v variant
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 2
+ properties("replication_num" = "1", "disable_auto_compaction" =
"true");
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${dst_table} (
+ k bigint,
+ v variant,
+ INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english")
COMMENT ''
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 2
+ properties("replication_num" = "1", "disable_auto_compaction" =
"true");
+ """
+
+ def load_json_data = {table_name, file_name ->
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ set 'max_filter_ratio', '0.1'
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ logger.info("Stream load ${file_name} result:
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ // assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+ for (int i = 1; i <= 100; i++) {
+ load_json_data.call(src_table, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ }
+
+ sql """ insert into ${dst_table} select * from ${src_table}"""
+}
diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
index 5411224c200..77114904f18 100644
--- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
+++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
@@ -34,7 +34,9 @@ suite("test_http_stream", "p0") {
dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
- dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+ dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP,
+ INDEX idx_dt_2 (`dt_2`) USING INVERTED,
+ INDEX idx_dt_3 (`dt_3`) USING INVERTED
)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
@@ -298,7 +300,9 @@ suite("test_http_stream", "p0") {
sex TINYINT,
phone LARGEINT,
address VARCHAR(500),
- register_time DATETIME
+ register_time DATETIME,
+ INDEX idx_username (`username`) USING INVERTED,
+ INDEX idx_address (`address`) USING INVERTED
)
DUPLICATE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
diff --git a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy
b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy
index ff239e5fef1..9eb948bf55a 100644
--- a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy
+++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy
@@ -36,7 +36,9 @@ suite("test_mysql_load", "p0") {
`v9` date REPLACE_IF_NOT_NULL NULL,
`v10` char(10) REPLACE_IF_NOT_NULL NULL,
`v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
- `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+ `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL,
+ INDEX idx_k1 (`k1`) USING INVERTED,
+ INDEX idx_k2 (`k2`) USING INVERTED
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
diff --git
a/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy
b/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy
index b63c8711301..79c3fecd69e 100644
--- a/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy
+++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy
@@ -28,7 +28,9 @@ suite("test_mysql_load_big_file", "p0") {
`v1` tinyint(4) NULL,
`v2` string NULL,
`v3` date NULL,
- `v4` datetime NULL
+ `v4` datetime NULL,
+ INDEX idx_v2 (`v2`) USING INVERTED,
+ INDEX idx_v3 (`v3`) USING INVERTED
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]