This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ba87f7d3a31 [fix](pipelineX) add table sink and some fix in pipelineX 
(#25314)
ba87f7d3a31 is described below

commit ba87f7d3a31d75271d1063684f41a3be4a051d0e
Author: Mryange <[email protected]>
AuthorDate: Wed Oct 11 20:18:08 2023 +0800

    [fix](pipelineX) add table sink and some fix in pipelineX (#25314)
---
 ...ink_operator.h => olap_table_sink_operator.cpp} | 51 +++++++-------
 be/src/pipeline/exec/olap_table_sink_operator.h    | 80 +++++++++++++++++++++-
 .../pipeline/exec/partition_sort_sink_operator.cpp |  7 +-
 .../pipeline/exec/partition_sort_sink_operator.h   |  2 +
 .../exec/partition_sort_source_operator.cpp        |  8 +++
 be/src/pipeline/pipeline_x/operator.cpp            | 15 ++--
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 12 +++-
 .../main/java/org/apache/doris/qe/Coordinator.java |  2 +-
 .../jdbc/test_jdbc_query_mysql.out                 | 14 ++--
 .../jdbc/test_jdbc_query_mysql.groovy              |  2 +-
 .../insert/test_insert_move_memtable.groovy        |  1 +
 11 files changed, 152 insertions(+), 42 deletions(-)

diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.cpp
similarity index 52%
copy from be/src/pipeline/exec/olap_table_sink_operator.h
copy to be/src/pipeline/exec/olap_table_sink_operator.cpp
index e3bab01faf4..999bf921d54 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.cpp
@@ -15,35 +15,40 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+#include "olap_table_sink_operator.h"
 
-#include "operator.h"
-#include "vec/sink/vtablet_sink.h"
+#include "common/status.h"
 
 namespace doris {
+class DataSink;
+} // namespace doris
 
-namespace pipeline {
-
-class OlapTableSinkOperatorBuilder final
-        : public DataSinkOperatorBuilder<vectorized::VOlapTableSink> {
-public:
-    OlapTableSinkOperatorBuilder(int32_t id, DataSink* sink)
-            : DataSinkOperatorBuilder(id, "OlapTableSinkOperator", sink) {}
-
-    OperatorPtr build_operator() override;
-};
-
-class OlapTableSinkOperator final : public 
DataSinkOperator<OlapTableSinkOperatorBuilder> {
-public:
-    OlapTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* 
sink)
-            : DataSinkOperator(operator_builder, sink) {}
-
-    bool can_write() override { return true; } // TODO: need use mem_limit
-};
+namespace doris::pipeline {
 
 OperatorPtr OlapTableSinkOperatorBuilder::build_operator() {
     return std::make_shared<OlapTableSinkOperator>(this, _sink);
 }
 
-} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+Status OlapTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(profile()->total_time_counter());
+    SCOPED_TIMER(_open_timer);
+    auto& p = _parent->cast<Parent>();
+    RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
+    return Status::OK();
+}
+
+Status OlapTableSinkLocalState::close(RuntimeState* state, Status exec_status) 
{
+    if (Base::_closed) {
+        return Status::OK();
+    }
+    SCOPED_TIMER(_close_timer);
+    SCOPED_TIMER(profile()->total_time_counter());
+    if (_closed) {
+        return _close_status;
+    }
+    _close_status = Base::close(state, exec_status);
+    return _close_status;
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.h
index e3bab01faf4..244480273e2 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
 #include "vec/sink/vtablet_sink.h"
 
 namespace doris {
@@ -41,9 +42,82 @@ public:
     bool can_write() override { return true; } // TODO: need use mem_limit
 };
 
-OperatorPtr OlapTableSinkOperatorBuilder::build_operator() {
-    return std::make_shared<OlapTableSinkOperator>(this, _sink);
-}
+class OlapTableSinkOperatorX;
+
+class OlapTableSinkLocalState final
+        : public AsyncWriterSink<vectorized::VTabletWriter, 
OlapTableSinkOperatorX> {
+public:
+    using Base = AsyncWriterSink<vectorized::VTabletWriter, 
OlapTableSinkOperatorX>;
+    using Parent = OlapTableSinkOperatorX;
+    ENABLE_FACTORY_CREATOR(OlapTableSinkLocalState);
+    OlapTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state) {};
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override {
+        SCOPED_TIMER(profile()->total_time_counter());
+        SCOPED_TIMER(_open_timer);
+        return Base::open(state);
+    }
+
+    Status close(RuntimeState* state, Status exec_status) override;
+    friend class OlapTableSinkOperatorX;
+
+private:
+    Status _close_status = Status::OK();
+};
+class OlapTableSinkOperatorX final : public 
DataSinkOperatorX<OlapTableSinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<OlapTableSinkLocalState>;
+    OlapTableSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc,
+                           const std::vector<TExpr>& t_output_expr, bool 
group_commit)
+            : Base(0),
+              _row_desc(row_desc),
+              _t_output_expr(t_output_expr),
+              _group_commit(group_commit),
+              _pool(pool) {};
+
+    Status init(const TDataSink& thrift_sink) override {
+        // From the thrift expressions create the real exprs.
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
+        return Status::OK();
+    }
+    Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::prepare(state));
+        return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc);
+    }
+
+    Status open(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::open(state));
+        return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+    }
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override {
+        CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        SCOPED_TIMER(local_state.profile()->total_time_counter());
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+        return local_state.sink(state, in_block, source_state);
+    }
+
+    FinishDependency* finish_blocked_by(RuntimeState* state) const override {
+        auto& local_state = 
state->get_sink_local_state(id())->cast<OlapTableSinkLocalState>();
+        return local_state._finish_dependency->finish_blocked_by();
+    };
+
+    WriteDependency* wait_for_dependency(RuntimeState* state) override {
+        CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
+        return local_state.write_blocked_by();
+    }
+
+private:
+    friend class OlapTableSinkLocalState;
+    template <typename Writer, typename Parent>
+    friend class AsyncWriterSink;
+    const RowDescriptor& _row_desc;
+    vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+    const std::vector<TExpr>& _t_output_expr;
+    const bool _group_commit;
+    ObjectPool* _pool;
+};
 
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 2bc19bf391c..fcdbb685358 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -55,7 +55,8 @@ 
PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, const T
         : DataSinkOperatorX(tnode.node_id),
           _pool(pool),
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
-          _limit(tnode.limit) {}
+          _limit(tnode.limit),
+          _topn_phase(tnode.partition_sort_node.ptopn_phase) {}
 
 Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
@@ -106,7 +107,9 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             local_state._value_places[0]->append_whole_block(input_block, 
_child_x->row_desc());
         } else {
             //just simply use partition num to check
-            if (local_state._num_partition > 
config::partition_topn_partition_threshold &&
+            //if is TWO_PHASE_GLOBAL, must be sort all data thought partition 
num threshold have been exceeded.
+            if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL &&
+                local_state._num_partition > 
config::partition_topn_partition_threshold &&
                 local_state.child_input_rows < 10000 * 
local_state._num_partition) {
                 {
                     std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 7dbe616fd6f..6c124bf3b19 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -110,6 +110,8 @@ private:
     int _partition_exprs_num = 0;
     vectorized::VExprContextSPtrs _partition_expr_ctxs;
 
+    TPartTopNPhase::type _topn_phase;
+
     // Expressions and parameters used for build _sort_description
     vectorized::VSortExecExprs _vsort_exec_exprs;
     std::vector<bool> _is_asc_order;
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 1382bf716b8..a67728de4f3 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -53,6 +53,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
             
local_state._shared_state->blocks_buffer.front().swap(*output_block);
             local_state._shared_state->blocks_buffer.pop();
             //if buffer have no data, block reading and wait for signal again
+            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
+                                                                   
output_block->columns()));
             if (local_state._shared_state->blocks_buffer.empty()) {
                 local_state._dependency->block_reading();
             }
@@ -61,7 +63,13 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
     }
 
     // is_ready_for_read: this is set by sink node using: 
local_state._dependency->set_ready_for_read()
+    // notice: must output block from _blocks_buffer firstly, and then 
get_sorted_block.
+    // as when the child is eos, then set _can_read = true, and 
_partition_sorts have push_back sorter.
+    // if we move the _blocks_buffer output at last(behind 286 line),
+    // it's maybe eos but not output all data: when _blocks_buffer.empty() and 
_can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
     RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
+    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
+                                                           
output_block->columns()));
     {
         std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
         if (local_state._shared_state->blocks_buffer.empty() &&
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 191dabd66ef..85598e6874f 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -42,6 +42,7 @@
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
 #include "pipeline/exec/olap_scan_operator.h"
+#include "pipeline/exec/olap_table_sink_operator.h"
 #include "pipeline/exec/partition_sort_sink_operator.h"
 #include "pipeline/exec/partition_sort_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
@@ -588,11 +589,15 @@ Status AsyncWriterSink<Writer, 
Parent>::close(RuntimeState* state, Status exec_s
     }
     COUNTER_SET(_wait_for_dependency_timer, 
_async_writer_dependency->write_watcher_elapse_time());
     // if the init failed, the _writer may be nullptr. so here need check
-    if (_writer && _writer->need_normal_close()) {
-        if (exec_status.ok() && !state->is_cancelled()) {
-            RETURN_IF_ERROR(_writer->commit_trans());
+    if (_writer) {
+        if (_writer->need_normal_close()) {
+            if (exec_status.ok() && !state->is_cancelled()) {
+                RETURN_IF_ERROR(_writer->commit_trans());
+            }
+            RETURN_IF_ERROR(_writer->close(exec_status));
+        } else {
+            RETURN_IF_ERROR(_writer->get_writer_status());
         }
-        RETURN_IF_ERROR(_writer->close(exec_status));
     }
     return PipelineXSinkLocalState<>::close(state, exec_status);
 }
@@ -610,6 +615,7 @@ DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
 DECLARE_OPERATOR_X(ResultSinkLocalState)
 DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
 DECLARE_OPERATOR_X(ResultFileSinkLocalState)
+DECLARE_OPERATOR_X(OlapTableSinkLocalState)
 DECLARE_OPERATOR_X(AnalyticSinkLocalState)
 DECLARE_OPERATOR_X(SortSinkLocalState)
 DECLARE_OPERATOR_X(BlockingAggSinkLocalState)
@@ -685,5 +691,6 @@ template class PipelineXLocalState<SetDependency>;
 
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
+template class AsyncWriterSink<doris::vectorized::VTabletWriter, 
OlapTableSinkOperatorX>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1191653637f..2c852eecac9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -65,6 +65,7 @@
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
 #include "pipeline/exec/olap_scan_operator.h"
+#include "pipeline/exec/olap_table_sink_operator.h"
 #include "pipeline/exec/partition_sort_sink_operator.h"
 #include "pipeline/exec/partition_sort_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
@@ -264,6 +265,15 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, 
thrift_sink.result_sink));
         break;
     }
+    case TDataSinkType::OLAP_TABLE_SINK: {
+        if (state->query_options().enable_memtable_on_sink_node) {
+            return Status::InternalError(
+                    "Unsuported OLAP_TABLE_SINK with 
enable_memtable_on_sink_node ");
+        } else {
+            _sink.reset(new OlapTableSinkOperatorX(pool, row_desc, 
output_exprs, false));
+        }
+        break;
+    }
     case TDataSinkType::JDBC_TABLE_SINK: {
         if (!thrift_sink.__isset.jdbc_table_sink) {
             return Status::InternalError("Missing data jdbc sink.");
@@ -394,7 +404,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
 
         _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
         
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
-
+        
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
         std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             auto task = std::make_unique<PipelineXTask>(_pipelines[pip_idx], 
_total_tasks++,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index c6ea704aa72..5247973f57a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -323,7 +323,7 @@ public class Coordinator implements CoordInterface {
         this.enablePipelineEngine = 
context.getSessionVariable().getEnablePipelineEngine()
                 && (fragments.size() > 0);
         this.enablePipelineXEngine = 
context.getSessionVariable().getEnablePipelineXEngine()
-                && (fragments.size() > 0 && fragments.get(0).getSink() 
instanceof ResultSink);
+                && (fragments.size() > 0);
 
         initQueryOptions(context);
 
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out 
b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out
index f1fb0d33517..9a2ff768a13 100644
--- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out
+++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out
@@ -1152,14 +1152,14 @@ abc     \N
 -- !sql14 --
 \N     342
 0      136
-16     1
-17     1
-28     1
+1      1
+11     1
+13     1
+14     1
+2      1
 4      1
-52     1
-58     1
-61     1
-89     1
+7      1
+8      1
 
 -- !sql15 --
 1025   1
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy 
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy
index b30c5bc103d..32c50512524 100644
--- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy
+++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy
@@ -733,7 +733,7 @@ suite("test_jdbc_query_mysql", 
"p0,external,mysql,external_docker,external_docke
         order_qt_sql13 """ SELECT k2, sum(CAST(NULL AS BIGINT)) FROM 
$jdbcMysql57Table1 GROUP BY k2 """
         order_qt_sql14 """ SELECT `key`, COUNT(*) as c FROM (
                             SELECT CASE WHEN k8 % 3 = 0 THEN NULL WHEN k8 % 5 
= 0 THEN 0 ELSE k8 END AS `key`
-                            FROM $jdbcMysql57Table1) as a GROUP BY `key` order 
by c desc limit 10"""
+                            FROM $jdbcMysql57Table1) as a GROUP BY `key` order 
by c desc , `key` asc limit 10"""
         order_qt_sql15 """ SELECT lines, COUNT(*) as c FROM (SELECT k7, 
COUNT(*) lines FROM $jdbcMysql57Table1 GROUP BY k7) U GROUP BY lines order by 
c"""
         order_qt_sql16 """ SELECT COUNT(DISTINCT k8 + 1) FROM 
$jdbcMysql57Table1 """
         order_qt_sql17 """ SELECT COUNT(*) FROM (SELECT DISTINCT k8 + 1 FROM 
$jdbcMysql57Table1) t """
diff --git 
a/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy 
b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
index bc9db5add0b..189db974fbf 100644
--- a/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
+++ b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
@@ -17,6 +17,7 @@
 
 suite("test_insert_move_memtable") {
     sql """ set enable_memtable_on_sink_node=true """
+    sql """ set experimental_enable_pipeline_x_engine=false """
     // todo: test insert, such as insert values, insert select, insert txn
     sql "show load"
     def test_baseall = "test_query_db.baseall";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to