This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b6babf3af4 [pipelineX](sink) support jdbc table sink (#24970)
b6babf3af4 is described below
commit b6babf3af43a2bc8c0f2202aede6df64d0883376
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Sep 28 14:39:32 2023 +0800
[pipelineX](sink) support jdbc table sink (#24970)
* [pipelineX](sink) support jdbc table sink
---
be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 76 ++++++++++++++++++++++
be/src/pipeline/exec/jdbc_table_sink_operator.h | 71 ++++++++++++++++++++
be/src/pipeline/pipeline_x/operator.cpp | 6 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 14 ++++
4 files changed, 166 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
new file mode 100644
index 0000000000..a551762027
--- /dev/null
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "jdbc_table_sink_operator.h"
+
+#include <memory>
+
+#include "common/object_pool.h"
+#include "pipeline/exec/operator.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris {
+class DataSink;
+} // namespace doris
+
+namespace doris::pipeline {
+
+JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc,
+ const std::vector<TExpr>&
t_output_expr)
+ : DataSinkOperatorX(0), _row_desc(row_desc),
_t_output_expr(t_output_expr) {}
+
+Status JdbcTableSinkOperatorX::init(const TDataSink& thrift_sink) {
+
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::init(thrift_sink));
+ // From the thrift expressions create the real exprs.
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr,
_output_vexpr_ctxs));
+ return Status::OK();
+}
+
+Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::prepare(state));
+ // Prepare the exprs to run.
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
+ return Status::OK();
+}
+
+Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::open(state));
+ // Prepare the exprs to run.
+ RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
+ return Status::OK();
+}
+
+Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
block,
+ SourceState source_state) {
+ CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ SCOPED_TIMER(local_state.profile()->total_time_counter());
+ RETURN_IF_ERROR(local_state.sink(state, block, source_state));
+ return Status::OK();
+}
+
+WriteDependency* JdbcTableSinkOperatorX::wait_for_dependency(RuntimeState*
state) {
+ CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
+ return local_state.write_blocked_by();
+}
+
+bool JdbcTableSinkOperatorX::is_pending_finish(RuntimeState* state) const {
+ auto& local_state =
state->get_sink_local_state(id())->cast<JdbcTableSinkLocalState>();
+ return local_state.is_pending_finish();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h
b/be/src/pipeline/exec/jdbc_table_sink_operator.h
new file mode 100644
index 0000000000..6db9c38065
--- /dev/null
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
+#include "vec/sink/vresult_sink.h"
+#include "vec/sink/writer/vjdbc_table_writer.h"
+
+namespace doris {
+class DataSink;
+
+namespace pipeline {
+
+class JdbcTableSinkOperatorX;
+class JdbcTableSinkLocalState final
+ : public AsyncWriterSink<vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX> {
+ ENABLE_FACTORY_CREATOR(JdbcTableSinkLocalState);
+
+public:
+ using Base = AsyncWriterSink<vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX>;
+ JdbcTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+ : AsyncWriterSink<vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX>(parent, state) {
+ }
+
+private:
+ friend class JdbcTableSinkOperatorX;
+};
+
+class JdbcTableSinkOperatorX final : public
DataSinkOperatorX<JdbcTableSinkLocalState> {
+public:
+ JdbcTableSinkOperatorX(const RowDescriptor& row_desc, const
std::vector<TExpr>& select_exprs);
+ Status init(const TDataSink& thrift_sink) override;
+ Status prepare(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override;
+
+ WriteDependency* wait_for_dependency(RuntimeState* state) override;
+ bool is_pending_finish(RuntimeState* state) const override;
+
+private:
+ friend class JdbcTableSinkLocalState;
+ template <typename Writer, typename Parent>
+ friend class AsyncWriterSink;
+
+ const RowDescriptor& _row_desc;
+ const std::vector<TExpr>& _t_output_expr;
+ vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 47d7731344..c6331b04fb 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -35,6 +35,7 @@
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
+#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
@@ -510,7 +511,8 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState*
state, Status exec_s
return Status::OK();
}
COUNTER_SET(_wait_for_dependency_timer,
_async_writer_dependency->write_watcher_elapse_time());
- if (_writer->need_normal_close()) {
+ // if the init failed, the _writer may be nullptr. so here need check
+ if (_writer && _writer->need_normal_close()) {
if (exec_status.ok() && !state->is_cancelled()) {
RETURN_IF_ERROR(_writer->commit_trans());
}
@@ -535,6 +537,7 @@ bool AsyncWriterSink<Writer, Parent>::is_pending_finish() {
#define DECLARE_OPERATOR_X(LOCAL_STATE) template class
DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
+DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
@@ -602,5 +605,6 @@ template class PipelineXSinkLocalState<MultiCastDependency>;
template class PipelineXLocalState<PartitionSortDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter,
ResultFileSinkOperatorX>;
+template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX>;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7c67b1a07b..64ea086e9a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -58,6 +58,7 @@
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
+#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
@@ -247,6 +248,19 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
_sink.reset(new ResultSinkOperatorX(row_desc, output_exprs,
thrift_sink.result_sink));
break;
}
+ case TDataSinkType::JDBC_TABLE_SINK: {
+ if (!thrift_sink.__isset.jdbc_table_sink) {
+ return Status::InternalError("Missing data jdbc sink.");
+ }
+ if (config::enable_java_support) {
+ _sink.reset(new JdbcTableSinkOperatorX(row_desc, output_exprs));
+ } else {
+ return Status::InternalError(
+ "Jdbc table sink is not enabled, you can change be config "
+ "enable_java_support to true and restart be.");
+ }
+ break;
+ }
case TDataSinkType::RESULT_FILE_SINK: {
if (!thrift_sink.__isset.result_file_sink) {
return Status::InternalError("Missing result file sink.");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]