This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 11fecafb749 [fix](move-memtable) fallback if target table contains 
inverted index (#25498)
11fecafb749 is described below

commit 11fecafb749bd1d4421830455944310ce5afec43
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Oct 18 22:11:59 2023 +0800

    [fix](move-memtable) fallback if target table contains inverted index 
(#25498)
---
 be/src/exec/data_sink.cpp                     | 21 +++++++++++++++++++--
 be/src/exec/data_sink.h                       |  4 ++++
 be/src/pipeline/pipeline_fragment_context.cpp | 19 ++++++++++++++++++-
 be/src/pipeline/pipeline_fragment_context.h   |  1 +
 4 files changed, 42 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index c63137420ed..e160e9ebeec 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -147,7 +147,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     case TDataSinkType::OLAP_TABLE_SINK: {
         Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
-        if (state->query_options().enable_memtable_on_sink_node) {
+        if (state->query_options().enable_memtable_on_sink_node &&
+            _has_inverted_index(thrift_sink.olap_table_sink)) {
             sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs, &status));
         } else {
             sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs, false));
@@ -294,7 +295,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     case TDataSinkType::OLAP_TABLE_SINK: {
         Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
-        if (state->query_options().enable_memtable_on_sink_node) {
+        if (state->query_options().enable_memtable_on_sink_node &&
+            _has_inverted_index(thrift_sink.olap_table_sink)) {
             sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs, &status));
         } else {
             sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs, false));
@@ -348,4 +350,19 @@ Status DataSink::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
+bool DataSink::_has_inverted_index(TOlapTableSink sink) {
+    OlapTableSchemaParam schema;
+    if (!schema.init(sink.schema).ok()) {
+        return false;
+    }
+    for (const auto& index_schema : schema.indexes()) {
+        for (const auto& index : index_schema->indexes) {
+            if (index->index_type() == INVERTED) {
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
 } // namespace doris
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 778ee2da83a..254bd7e18ad 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -42,6 +42,7 @@ class QueryStatistics;
 class TDataSink;
 class TExpr;
 class TPipelineFragmentParams;
+class TOlapTableSink;
 
 namespace vectorized {
 class Block;
@@ -114,6 +115,9 @@ public:
 
     virtual bool can_write() { return true; }
 
+private:
+    static bool _has_inverted_index(TOlapTableSink sink);
+
 protected:
     // Set to true after close() has been called. subclasses should check and 
set this in
     // close().
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index bf0c1b80df5..a9eedb3a489 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -757,7 +757,9 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
     }
     case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
     case TDataSinkType::OLAP_TABLE_SINK: {
-        if (state->query_options().enable_memtable_on_sink_node) {
+        DCHECK(thrift_sink.__isset.olap_table_sink);
+        if (state->query_options().enable_memtable_on_sink_node &&
+            _has_inverted_index(thrift_sink.olap_table_sink)) {
             sink_ = 
std::make_shared<OlapTableSinkV2OperatorBuilder>(next_operator_builder_id(),
                                                                      
_sink.get());
         } else {
@@ -887,4 +889,19 @@ Status PipelineFragmentContext::send_report(bool done) {
             shared_from_this());
 }
 
+bool PipelineFragmentContext::_has_inverted_index(TOlapTableSink sink) {
+    OlapTableSchemaParam schema;
+    if (!schema.init(sink.schema).ok()) {
+        return false;
+    }
+    for (const auto& index_schema : schema.indexes()) {
+        for (const auto& index : index_schema->indexes) {
+            if (index->index_type() == INVERTED) {
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 80c38880bf3..760df4c4d7d 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -217,6 +217,7 @@ protected:
     report_status_callback _report_status_cb;
 
 private:
+    static bool _has_inverted_index(TOlapTableSink sink);
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
     bool _group_commit;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to