This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ba87f7d3a31 [fix](pipelineX) add table sink and some fix in pipelineX
(#25314)
ba87f7d3a31 is described below
commit ba87f7d3a31d75271d1063684f41a3be4a051d0e
Author: Mryange <[email protected]>
AuthorDate: Wed Oct 11 20:18:08 2023 +0800
[fix](pipelineX) add table sink and some fix in pipelineX (#25314)
---
...ink_operator.h => olap_table_sink_operator.cpp} | 51 +++++++-------
be/src/pipeline/exec/olap_table_sink_operator.h | 80 +++++++++++++++++++++-
.../pipeline/exec/partition_sort_sink_operator.cpp | 7 +-
.../pipeline/exec/partition_sort_sink_operator.h | 2 +
.../exec/partition_sort_source_operator.cpp | 8 +++
be/src/pipeline/pipeline_x/operator.cpp | 15 ++--
.../pipeline_x/pipeline_x_fragment_context.cpp | 12 +++-
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +-
.../jdbc/test_jdbc_query_mysql.out | 14 ++--
.../jdbc/test_jdbc_query_mysql.groovy | 2 +-
.../insert/test_insert_move_memtable.groovy | 1 +
11 files changed, 152 insertions(+), 42 deletions(-)
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h
b/be/src/pipeline/exec/olap_table_sink_operator.cpp
similarity index 52%
copy from be/src/pipeline/exec/olap_table_sink_operator.h
copy to be/src/pipeline/exec/olap_table_sink_operator.cpp
index e3bab01faf4..999bf921d54 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.cpp
@@ -15,35 +15,40 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+#include "olap_table_sink_operator.h"
-#include "operator.h"
-#include "vec/sink/vtablet_sink.h"
+#include "common/status.h"
namespace doris {
+class DataSink;
+} // namespace doris
-namespace pipeline {
-
-class OlapTableSinkOperatorBuilder final
- : public DataSinkOperatorBuilder<vectorized::VOlapTableSink> {
-public:
- OlapTableSinkOperatorBuilder(int32_t id, DataSink* sink)
- : DataSinkOperatorBuilder(id, "OlapTableSinkOperator", sink) {}
-
- OperatorPtr build_operator() override;
-};
-
-class OlapTableSinkOperator final : public
DataSinkOperator<OlapTableSinkOperatorBuilder> {
-public:
- OlapTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink*
sink)
- : DataSinkOperator(operator_builder, sink) {}
-
- bool can_write() override { return true; } // TODO: need use mem_limit
-};
+namespace doris::pipeline {
OperatorPtr OlapTableSinkOperatorBuilder::build_operator() {
return std::make_shared<OlapTableSinkOperator>(this, _sink);
}
-} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+Status OlapTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
+ auto& p = _parent->cast<Parent>();
+ RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
+ return Status::OK();
+}
+
+Status OlapTableSinkLocalState::close(RuntimeState* state, Status exec_status)
{
+ if (Base::_closed) {
+ return Status::OK();
+ }
+ SCOPED_TIMER(_close_timer);
+ SCOPED_TIMER(profile()->total_time_counter());
+ if (_closed) {
+ return _close_status;
+ }
+ _close_status = Base::close(state, exec_status);
+ return _close_status;
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h
b/be/src/pipeline/exec/olap_table_sink_operator.h
index e3bab01faf4..244480273e2 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -18,6 +18,7 @@
#pragma once
#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
#include "vec/sink/vtablet_sink.h"
namespace doris {
@@ -41,9 +42,82 @@ public:
bool can_write() override { return true; } // TODO: need use mem_limit
};
-OperatorPtr OlapTableSinkOperatorBuilder::build_operator() {
- return std::make_shared<OlapTableSinkOperator>(this, _sink);
-}
+class OlapTableSinkOperatorX;
+
+class OlapTableSinkLocalState final
+ : public AsyncWriterSink<vectorized::VTabletWriter,
OlapTableSinkOperatorX> {
+public:
+ using Base = AsyncWriterSink<vectorized::VTabletWriter,
OlapTableSinkOperatorX>;
+ using Parent = OlapTableSinkOperatorX;
+ ENABLE_FACTORY_CREATOR(OlapTableSinkLocalState);
+ OlapTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+ : Base(parent, state) {};
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status open(RuntimeState* state) override {
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
+ return Base::open(state);
+ }
+
+ Status close(RuntimeState* state, Status exec_status) override;
+ friend class OlapTableSinkOperatorX;
+
+private:
+ Status _close_status = Status::OK();
+};
+class OlapTableSinkOperatorX final : public
DataSinkOperatorX<OlapTableSinkLocalState> {
+public:
+ using Base = DataSinkOperatorX<OlapTableSinkLocalState>;
+ OlapTableSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc,
+ const std::vector<TExpr>& t_output_expr, bool
group_commit)
+ : Base(0),
+ _row_desc(row_desc),
+ _t_output_expr(t_output_expr),
+ _group_commit(group_commit),
+ _pool(pool) {};
+
+ Status init(const TDataSink& thrift_sink) override {
+ // From the thrift expressions create the real exprs.
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr,
_output_vexpr_ctxs));
+ return Status::OK();
+ }
+ Status prepare(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::prepare(state));
+ return vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc);
+ }
+
+ Status open(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::open(state));
+ return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+ }
+ Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override {
+ CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ SCOPED_TIMER(local_state.profile()->total_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+ return local_state.sink(state, in_block, source_state);
+ }
+
+ FinishDependency* finish_blocked_by(RuntimeState* state) const override {
+ auto& local_state =
state->get_sink_local_state(id())->cast<OlapTableSinkLocalState>();
+ return local_state._finish_dependency->finish_blocked_by();
+ };
+
+ WriteDependency* wait_for_dependency(RuntimeState* state) override {
+ CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
+ return local_state.write_blocked_by();
+ }
+
+private:
+ friend class OlapTableSinkLocalState;
+ template <typename Writer, typename Parent>
+ friend class AsyncWriterSink;
+ const RowDescriptor& _row_desc;
+ vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+ const std::vector<TExpr>& _t_output_expr;
+ const bool _group_commit;
+ ObjectPool* _pool;
+};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 2bc19bf391c..fcdbb685358 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -55,7 +55,8 @@
PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, const T
: DataSinkOperatorX(tnode.node_id),
_pool(pool),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
- _limit(tnode.limit) {}
+ _limit(tnode.limit),
+ _topn_phase(tnode.partition_sort_node.ptopn_phase) {}
Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
@@ -106,7 +107,9 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._value_places[0]->append_whole_block(input_block,
_child_x->row_desc());
} else {
//just simply use partition num to check
- if (local_state._num_partition >
config::partition_topn_partition_threshold &&
+ //if is TWO_PHASE_GLOBAL, must be sort all data thought partition
num threshold have been exceeded.
+ if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL &&
+ local_state._num_partition >
config::partition_topn_partition_threshold &&
local_state.child_input_rows < 10000 *
local_state._num_partition) {
{
std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 7dbe616fd6f..6c124bf3b19 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -110,6 +110,8 @@ private:
int _partition_exprs_num = 0;
vectorized::VExprContextSPtrs _partition_expr_ctxs;
+ TPartTopNPhase::type _topn_phase;
+
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
std::vector<bool> _is_asc_order;
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 1382bf716b8..a67728de4f3 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -53,6 +53,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
//if buffer have no data, block reading and wait for signal again
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
+
output_block->columns()));
if (local_state._shared_state->blocks_buffer.empty()) {
local_state._dependency->block_reading();
}
@@ -61,7 +63,13 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
}
// is_ready_for_read: this is set by sink node using:
local_state._dependency->set_ready_for_read()
+ // notice: must output block from _blocks_buffer firstly, and then
get_sorted_block.
+ // as when the child is eos, then set _can_read = true, and
_partition_sorts have push_back sorter.
+ // if we move the _blocks_buffer output at last(behind 286 line),
+ // it's maybe eos but not output all data: when _blocks_buffer.empty() and
_can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
+
output_block->columns()));
{
std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
if (local_state._shared_state->blocks_buffer.empty() &&
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 191dabd66ef..85598e6874f 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -42,6 +42,7 @@
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
+#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
@@ -588,11 +589,15 @@ Status AsyncWriterSink<Writer,
Parent>::close(RuntimeState* state, Status exec_s
}
COUNTER_SET(_wait_for_dependency_timer,
_async_writer_dependency->write_watcher_elapse_time());
// if the init failed, the _writer may be nullptr. so here need check
- if (_writer && _writer->need_normal_close()) {
- if (exec_status.ok() && !state->is_cancelled()) {
- RETURN_IF_ERROR(_writer->commit_trans());
+ if (_writer) {
+ if (_writer->need_normal_close()) {
+ if (exec_status.ok() && !state->is_cancelled()) {
+ RETURN_IF_ERROR(_writer->commit_trans());
+ }
+ RETURN_IF_ERROR(_writer->close(exec_status));
+ } else {
+ RETURN_IF_ERROR(_writer->get_writer_status());
}
- RETURN_IF_ERROR(_writer->close(exec_status));
}
return PipelineXSinkLocalState<>::close(state, exec_status);
}
@@ -610,6 +615,7 @@ DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
+DECLARE_OPERATOR_X(OlapTableSinkLocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(BlockingAggSinkLocalState)
@@ -685,5 +691,6 @@ template class PipelineXLocalState<SetDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter,
ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX>;
+template class AsyncWriterSink<doris::vectorized::VTabletWriter,
OlapTableSinkOperatorX>;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1191653637f..2c852eecac9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -65,6 +65,7 @@
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
+#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
@@ -264,6 +265,15 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
_sink.reset(new ResultSinkOperatorX(row_desc, output_exprs,
thrift_sink.result_sink));
break;
}
+ case TDataSinkType::OLAP_TABLE_SINK: {
+ if (state->query_options().enable_memtable_on_sink_node) {
+ return Status::InternalError(
+ "Unsuported OLAP_TABLE_SINK with
enable_memtable_on_sink_node ");
+ } else {
+ _sink.reset(new OlapTableSinkOperatorX(pool, row_desc,
output_exprs, false));
+ }
+ break;
+ }
case TDataSinkType::JDBC_TABLE_SINK: {
if (!thrift_sink.__isset.jdbc_table_sink) {
return Status::InternalError("Missing data jdbc sink.");
@@ -394,7 +404,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
-
+
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto task = std::make_unique<PipelineXTask>(_pipelines[pip_idx],
_total_tasks++,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index c6ea704aa72..5247973f57a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -323,7 +323,7 @@ public class Coordinator implements CoordInterface {
this.enablePipelineEngine =
context.getSessionVariable().getEnablePipelineEngine()
&& (fragments.size() > 0);
this.enablePipelineXEngine =
context.getSessionVariable().getEnablePipelineXEngine()
- && (fragments.size() > 0 && fragments.get(0).getSink()
instanceof ResultSink);
+ && (fragments.size() > 0);
initQueryOptions(context);
diff --git
a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out
b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out
index f1fb0d33517..9a2ff768a13 100644
--- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out
+++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out
@@ -1152,14 +1152,14 @@ abc \N
-- !sql14 --
\N 342
0 136
-16 1
-17 1
-28 1
+1 1
+11 1
+13 1
+14 1
+2 1
4 1
-52 1
-58 1
-61 1
-89 1
+7 1
+8 1
-- !sql15 --
1025 1
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy
index b30c5bc103d..32c50512524 100644
--- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy
+++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy
@@ -733,7 +733,7 @@ suite("test_jdbc_query_mysql",
"p0,external,mysql,external_docker,external_docke
order_qt_sql13 """ SELECT k2, sum(CAST(NULL AS BIGINT)) FROM
$jdbcMysql57Table1 GROUP BY k2 """
order_qt_sql14 """ SELECT `key`, COUNT(*) as c FROM (
SELECT CASE WHEN k8 % 3 = 0 THEN NULL WHEN k8 % 5
= 0 THEN 0 ELSE k8 END AS `key`
- FROM $jdbcMysql57Table1) as a GROUP BY `key` order
by c desc limit 10"""
+ FROM $jdbcMysql57Table1) as a GROUP BY `key` order
by c desc , `key` asc limit 10"""
order_qt_sql15 """ SELECT lines, COUNT(*) as c FROM (SELECT k7,
COUNT(*) lines FROM $jdbcMysql57Table1 GROUP BY k7) U GROUP BY lines order by
c"""
order_qt_sql16 """ SELECT COUNT(DISTINCT k8 + 1) FROM
$jdbcMysql57Table1 """
order_qt_sql17 """ SELECT COUNT(*) FROM (SELECT DISTINCT k8 + 1 FROM
$jdbcMysql57Table1) t """
diff --git
a/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
index bc9db5add0b..189db974fbf 100644
--- a/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
+++ b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
@@ -17,6 +17,7 @@
suite("test_insert_move_memtable") {
sql """ set enable_memtable_on_sink_node=true """
+ sql """ set experimental_enable_pipeline_x_engine=false """
// todo: test insert, such as insert values, insert select, insert txn
sql "show load"
def test_baseall = "test_query_db.baseall";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]