This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 11fecafb749 [fix](move-memtable) fallback if target table contains
inverted index (#25498)
11fecafb749 is described below
commit 11fecafb749bd1d4421830455944310ce5afec43
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Oct 18 22:11:59 2023 +0800
[fix](move-memtable) fallback if target table contains inverted index
(#25498)
---
be/src/exec/data_sink.cpp | 21 +++++++++++++++++++--
be/src/exec/data_sink.h | 4 ++++
be/src/pipeline/pipeline_fragment_context.cpp | 19 ++++++++++++++++++-
be/src/pipeline/pipeline_fragment_context.h | 1 +
4 files changed, 42 insertions(+), 3 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index c63137420ed..e160e9ebeec 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -147,7 +147,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
case TDataSinkType::OLAP_TABLE_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
- if (state->query_options().enable_memtable_on_sink_node) {
+ if (state->query_options().enable_memtable_on_sink_node &&
+ _has_inverted_index(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc,
output_exprs, &status));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc,
output_exprs, false));
@@ -294,7 +295,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
case TDataSinkType::OLAP_TABLE_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
- if (state->query_options().enable_memtable_on_sink_node) {
+ if (state->query_options().enable_memtable_on_sink_node &&
+ _has_inverted_index(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc,
output_exprs, &status));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc,
output_exprs, false));
@@ -348,4 +350,19 @@ Status DataSink::prepare(RuntimeState* state) {
return Status::OK();
}
+bool DataSink::_has_inverted_index(TOlapTableSink sink) {
+ OlapTableSchemaParam schema;
+ if (!schema.init(sink.schema).ok()) {
+ return false;
+ }
+ for (const auto& index_schema : schema.indexes()) {
+ for (const auto& index : index_schema->indexes) {
+ if (index->index_type() == INVERTED) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
} // namespace doris
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 778ee2da83a..254bd7e18ad 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -42,6 +42,7 @@ class QueryStatistics;
class TDataSink;
class TExpr;
class TPipelineFragmentParams;
+class TOlapTableSink;
namespace vectorized {
class Block;
@@ -114,6 +115,9 @@ public:
virtual bool can_write() { return true; }
+private:
+ static bool _has_inverted_index(TOlapTableSink sink);
+
protected:
// Set to true after close() has been called. subclasses should check and
set this in
// close().
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index bf0c1b80df5..a9eedb3a489 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -757,7 +757,9 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
case TDataSinkType::OLAP_TABLE_SINK: {
- if (state->query_options().enable_memtable_on_sink_node) {
+ DCHECK(thrift_sink.__isset.olap_table_sink);
+ if (state->query_options().enable_memtable_on_sink_node &&
+ _has_inverted_index(thrift_sink.olap_table_sink)) {
sink_ =
std::make_shared<OlapTableSinkV2OperatorBuilder>(next_operator_builder_id(),
_sink.get());
} else {
@@ -887,4 +889,19 @@ Status PipelineFragmentContext::send_report(bool done) {
shared_from_this());
}
+bool PipelineFragmentContext::_has_inverted_index(TOlapTableSink sink) {
+ OlapTableSchemaParam schema;
+ if (!schema.init(sink.schema).ok()) {
+ return false;
+ }
+ for (const auto& index_schema : schema.indexes()) {
+ for (const auto& index : index_schema->indexes) {
+ if (index->index_type() == INVERTED) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 80c38880bf3..760df4c4d7d 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -217,6 +217,7 @@ protected:
report_status_callback _report_status_cb;
private:
+ static bool _has_inverted_index(TOlapTableSink sink);
std::vector<std::unique_ptr<PipelineTask>> _tasks;
bool _group_commit;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]