This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bea1618da8d [refactor](minor) Delete unused code (#38848)
bea1618da8d is described below
commit bea1618da8d94134d7ffa193032360c54fb9fa17
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 5 22:15:16 2024 +0800
[refactor](minor) Delete unused code (#38848)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/dependency.h | 9 -----
be/src/pipeline/exec/operator.cpp | 8 ++---
be/src/pipeline/exec/operator.h | 3 +-
be/src/pipeline/exec/result_file_sink_operator.cpp | 6 ++--
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
.../vec/runtime/shared_hash_table_controller.cpp | 2 --
be/src/vec/runtime/shared_hash_table_controller.h | 1 -
be/src/vec/sink/writer/async_result_writer.cpp | 40 +++++++++-------------
be/src/vec/sink/writer/async_result_writer.h | 15 +++-----
.../sink/writer/iceberg/viceberg_table_writer.cpp | 6 ++--
.../sink/writer/iceberg/viceberg_table_writer.h | 4 ++-
be/src/vec/sink/writer/vfile_result_writer.cpp | 23 +++++++------
be/src/vec/sink/writer/vfile_result_writer.h | 8 +++--
be/src/vec/sink/writer/vhive_table_writer.cpp | 6 ++--
be/src/vec/sink/writer/vhive_table_writer.h | 4 ++-
be/src/vec/sink/writer/vjdbc_table_writer.cpp | 7 ++--
be/src/vec/sink/writer/vjdbc_table_writer.h | 4 ++-
be/src/vec/sink/writer/vmysql_table_writer.cpp | 6 ++--
be/src/vec/sink/writer/vmysql_table_writer.h | 4 ++-
be/src/vec/sink/writer/vodbc_table_writer.cpp | 7 ++--
be/src/vec/sink/writer/vodbc_table_writer.h | 4 ++-
be/src/vec/sink/writer/vtablet_writer.cpp | 6 ++--
be/src/vec/sink/writer/vtablet_writer.h | 4 ++-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 6 ++--
be/src/vec/sink/writer/vtablet_writer_v2.h | 4 ++-
25 files changed, 100 insertions(+), 89 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 1e29cf904c7..171947ba1c7 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -649,15 +649,6 @@ public:
std::mutex sink_eos_lock;
};
-class AsyncWriterDependency final : public Dependency {
-public:
- using SharedState = BasicSharedState;
- ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
- AsyncWriterDependency(int id, int node_id)
- : Dependency(id, node_id, "AsyncWriterDependency", true) {}
- ~AsyncWriterDependency() override = default;
-};
-
using SetHashTableVariants =
std::variant<std::monostate,
vectorized::MethodSerialized<HashMap<StringRef,
RowRefListWithFlags>>,
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 0928b32f41d..07e0c3cf640 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -621,10 +621,10 @@ template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
- _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
- _async_writer_dependency =
- AsyncWriterDependency::create_shared(_parent->operator_id(),
_parent->node_id());
- _writer->set_dependency(_async_writer_dependency.get(),
_finish_dependency.get());
+ _async_writer_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
+
"AsyncWriterDependency", true);
+ _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs,
_async_writer_dependency,
+ _finish_dependency));
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _async_writer_dependency->name()
+ "]Time", 1);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 2db981ba88e..9d549690461 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -871,8 +871,7 @@ protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<Writer> _writer;
- std::shared_ptr<AsyncWriterDependency> _async_writer_dependency;
-
+ std::shared_ptr<Dependency> _async_writer_dependency;
std::shared_ptr<Dependency> _finish_dependency;
};
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 029bea7494e..0ba727543cd 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -105,7 +105,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type,
state->fragment_instance_id(),
_output_vexpr_ctxs, _sender, nullptr,
state->return_object_data_as_binary(),
- p._output_row_descriptor));
+ p._output_row_descriptor, _async_writer_dependency,
_finish_dependency));
} else {
// init channel
_output_block = vectorized::Block::create_unique(
@@ -113,7 +113,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type,
state->fragment_instance_id(),
_output_vexpr_ctxs, nullptr, _output_block.get(),
- state->return_object_data_as_binary(),
p._output_row_descriptor));
+ state->return_object_data_as_binary(),
p._output_row_descriptor,
+ _async_writer_dependency, _finish_dependency));
std::map<int64_t, int64_t> fragment_id_to_channel_index;
for (int i = 0; i < p._dests.size(); ++i) {
@@ -129,7 +130,6 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
RETURN_IF_ERROR(_channel->init_stub(state));
}
}
- _writer->set_dependency(_async_writer_dependency.get(),
_finish_dependency.get());
_writer->set_header_info(p._header_type, p._header);
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index c4a2073c911..a2f26ac0a00 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1470,7 +1470,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
break;
}
default:
- return Status::InternalError("Unsupported exec type in pipelineX: {}",
+ return Status::InternalError("Unsupported exec type in pipeline: {}",
print_plan_node_type(tnode.node_type));
}
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index a416ba6349e..4b77b1ed8a3 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -52,7 +52,6 @@ void SharedHashTableController::signal(int my_node_id) {
for (auto& dep : _dependencies[my_node_id]) {
dep->set_ready();
}
- _cv.notify_all();
}
void SharedHashTableController::signal_finish(int my_node_id) {
@@ -60,7 +59,6 @@ void SharedHashTableController::signal_finish(int my_node_id)
{
for (auto& dep : _finish_dependencies[my_node_id]) {
dep->set_ready();
}
- _cv.notify_all();
}
TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int
my_node_id) {
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index ec3c616bca8..b04b1cdba06 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -95,7 +95,6 @@ private:
std::map<int /*node id*/,
std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies;
std::map<int /*node id*/,
std::vector<std::shared_ptr<pipeline::Dependency>>>
_finish_dependencies;
- std::condition_variable _cv;
std::map<int /*node id*/, TUniqueId /*fragment instance id*/>
_builder_fragment_ids;
std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
};
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 82c5f4ab288..e5fe8f589d5 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -33,16 +33,10 @@ class TExpr;
namespace vectorized {
-AsyncResultWriter::AsyncResultWriter(const
doris::vectorized::VExprContextSPtrs& output_expr_ctxs)
- : _vec_output_expr_ctxs(output_expr_ctxs),
- _dependency(nullptr),
- _finish_dependency(nullptr) {}
-
-void AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep,
- pipeline::Dependency* finish_dep) {
- _dependency = dep;
- _finish_dependency = finish_dep;
-}
+AsyncResultWriter::AsyncResultWriter(const
doris::vectorized::VExprContextSPtrs& output_expr_ctxs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency>
fin_dep)
+ : _vec_output_expr_ctxs(output_expr_ctxs), _dependency(dep),
_finish_dependency(fin_dep) {}
Status AsyncResultWriter::sink(Block* block, bool eos) {
auto rows = block->rows();
@@ -58,12 +52,13 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
return _writer_status.status();
}
- if (_dependency && _is_finished()) {
+ DCHECK(_dependency);
+ if (_is_finished()) {
_dependency->set_ready();
}
if (rows) {
_data_queue.emplace_back(std::move(add_block));
- if (_dependency && !_data_queue_is_available() && !_is_finished()) {
+ if (!_data_queue_is_available() && !_is_finished()) {
_dependency->block();
}
}
@@ -81,7 +76,8 @@ std::unique_ptr<Block>
AsyncResultWriter::_get_block_from_queue() {
DCHECK(!_data_queue.empty());
auto block = std::move(_data_queue.front());
_data_queue.pop_front();
- if (_dependency && _data_queue_is_available()) {
+ DCHECK(_dependency);
+ if (_data_queue_is_available()) {
_dependency->set_ready();
}
return block;
@@ -89,10 +85,8 @@ std::unique_ptr<Block>
AsyncResultWriter::_get_block_from_queue() {
Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile*
profile) {
// Should set to false here, to
- _writer_thread_closed = false;
- if (_finish_dependency) {
- _finish_dependency->block();
- }
+ DCHECK(_finish_dependency);
+ _finish_dependency->block();
// This is a async thread, should lock the task ctx, to make sure
runtimestate and profile
// not deconstructed before the thread exit.
auto task_ctx = state->get_task_execution_context();
@@ -113,6 +107,7 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
force_close(status);
}
+ DCHECK(_dependency);
if (_writer_status.ok()) {
while (true) {
if (!_eos && _data_queue.empty() && _writer_status.ok()) {
@@ -133,7 +128,7 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
_writer_status.update(status);
- if (_dependency && _is_finished()) {
+ if (_is_finished()) {
_dependency->set_ready();
}
break;
@@ -174,16 +169,14 @@ void AsyncResultWriter::process_block(RuntimeState*
state, RuntimeProfile* profi
if (_writer_status.ok()) {
_writer_status.update(close_st);
}
- _writer_thread_closed = true;
}
// should set _finish_dependency first, as close function maybe blocked by
wait_close of execution_timeout
_set_ready_to_finish();
}
void AsyncResultWriter::_set_ready_to_finish() {
- if (_finish_dependency) {
- _finish_dependency->set_ready();
- }
+ DCHECK(_finish_dependency);
+ _finish_dependency->set_ready();
}
Status AsyncResultWriter::_projection_block(doris::vectorized::Block&
input_block,
@@ -201,7 +194,8 @@ Status
AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc
void AsyncResultWriter::force_close(Status s) {
std::lock_guard l(_m);
_writer_status.update(s);
- if (_dependency && _is_finished()) {
+ DCHECK(_dependency);
+ if (_is_finished()) {
_dependency->set_ready();
}
_cv.notify_one();
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 5e21dc13e12..36bca48358a 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -33,7 +33,6 @@ class TDataSink;
class TExpr;
namespace pipeline {
-class AsyncWriterDependency;
class Dependency;
class PipelineTask;
@@ -55,9 +54,9 @@ class Block;
*/
class AsyncResultWriter : public ResultWriter {
public:
- AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs);
-
- void set_dependency(pipeline::AsyncWriterDependency* dep,
pipeline::Dependency* finish_dep);
+ AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
void force_close(Status s);
@@ -96,13 +95,9 @@ private:
// Default value is ok
AtomicStatus _writer_status;
bool _eos = false;
- // The writer is not started at the beginning. If prepare failed but not
open, the the writer
- // is not started, so should not pending finish on it.
- bool _writer_thread_closed = true;
- // Used by pipelineX
- pipeline::AsyncWriterDependency* _dependency;
- pipeline::Dependency* _finish_dependency;
+ std::shared_ptr<pipeline::Dependency> _dependency;
+ std::shared_ptr<pipeline::Dependency> _finish_dependency;
moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
};
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 070dbad3d78..898b71d1d9a 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -33,8 +33,10 @@ namespace doris {
namespace vectorized {
VIcebergTableWriter::VIcebergTableWriter(const TDataSink& t_sink,
- const VExprContextSPtrs&
output_expr_ctxs)
- : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+ const VExprContextSPtrs&
output_expr_ctxs,
+ std::shared_ptr<pipeline::Dependency>
dep,
+ std::shared_ptr<pipeline::Dependency>
fin_dep)
+ : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) {
DCHECK(_t_sink.__isset.iceberg_table_sink);
}
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index e2e582e04ad..ae53d3af98e 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -46,7 +46,9 @@ struct ColumnWithTypeAndName;
class VIcebergTableWriter final : public AsyncResultWriter {
public:
- VIcebergTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ VIcebergTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
~VIcebergTableWriter() = default;
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index ce8f2d18e07..16491311c17 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -60,17 +60,18 @@
namespace doris::vectorized {
-VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs)
- : AsyncResultWriter(output_exprs) {}
-
-VFileResultWriter::VFileResultWriter(const pipeline::ResultFileOptions*
file_opts,
- const TStorageBackendType::type
storage_type,
- const TUniqueId fragment_instance_id,
- const VExprContextSPtrs&
output_vexpr_ctxs,
- std::shared_ptr<BufferControlBlock>
sinker,
- Block* output_block, bool
output_object_data,
- const RowDescriptor&
output_row_descriptor)
- : AsyncResultWriter(output_vexpr_ctxs),
+VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency>
fin_dep)
+ : AsyncResultWriter(output_exprs, dep, fin_dep) {}
+
+VFileResultWriter::VFileResultWriter(
+ const pipeline::ResultFileOptions* file_opts, const
TStorageBackendType::type storage_type,
+ const TUniqueId fragment_instance_id, const VExprContextSPtrs&
output_vexpr_ctxs,
+ std::shared_ptr<BufferControlBlock> sinker, Block* output_block, bool
output_object_data,
+ const RowDescriptor& output_row_descriptor,
std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep)
+ : AsyncResultWriter(output_vexpr_ctxs, dep, fin_dep),
_file_opts(file_opts),
_storage_type(storage_type),
_fragment_instance_id(fragment_instance_id),
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index 42753a5e261..bf0a5d3e9e2 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -56,9 +56,13 @@ public:
const TUniqueId fragment_instance_id,
const VExprContextSPtrs& _output_vexpr_ctxs,
std::shared_ptr<BufferControlBlock> sinker, Block*
output_block,
- bool output_object_data, const RowDescriptor&
output_row_descriptor);
+ bool output_object_data, const RowDescriptor&
output_row_descriptor,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
- VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
Status write(RuntimeState* state, Block& block) override;
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index f90c7134ccd..53f70b6b31a 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -30,8 +30,10 @@ namespace doris {
namespace vectorized {
VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
- const VExprContextSPtrs& output_expr_ctxs)
- : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+ const VExprContextSPtrs& output_expr_ctxs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency>
fin_dep)
+ : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) {
DCHECK(_t_sink.__isset.hive_table_sink);
}
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h
b/be/src/vec/sink/writer/vhive_table_writer.h
index 6c8b972f280..9361fdbc408 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -39,7 +39,9 @@ struct ColumnWithTypeAndName;
class VHiveTableWriter final : public AsyncResultWriter {
public:
- VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
~VHiveTableWriter() override = default;
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
index b7c8d1f78dd..8c24f4746ad 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
@@ -57,8 +57,11 @@ JdbcConnectorParam
VJdbcTableWriter::create_connect_param(const doris::TDataSink
}
VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
- const VExprContextSPtrs& output_expr_ctxs)
- : AsyncResultWriter(output_expr_ctxs),
JdbcConnector(create_connect_param(t_sink)) {}
+ const VExprContextSPtrs& output_expr_ctxs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency>
fin_dep)
+ : AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
+ JdbcConnector(create_connect_param(t_sink)) {}
Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Block output_block;
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index b8216c3bcd6..aa957d2495b 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -36,7 +36,9 @@ class VJdbcTableWriter final : public AsyncResultWriter,
public JdbcConnector {
public:
static JdbcConnectorParam create_connect_param(const TDataSink&);
- VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
// connect to jdbc server
Status open(RuntimeState* state, RuntimeProfile* profile) override {
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index 45afe8ce019..a0d47ffec1e 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -61,8 +61,10 @@ std::string MysqlConnInfo::debug_string() const {
}
VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink,
- const VExprContextSPtrs& output_expr_ctxs)
- : AsyncResultWriter(output_expr_ctxs) {
+ const VExprContextSPtrs& output_expr_ctxs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency>
fin_dep)
+ : AsyncResultWriter(output_expr_ctxs, dep, fin_dep) {
const auto& t_mysql_sink = t_sink.mysql_table_sink;
_conn_info.host = t_mysql_sink.host;
_conn_info.port = t_mysql_sink.port;
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h
b/be/src/vec/sink/writer/vmysql_table_writer.h
index 072885b176b..04efabf3ffb 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -46,7 +46,9 @@ class Block;
class VMysqlTableWriter final : public AsyncResultWriter {
public:
- VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
// connect to mysql server
Status open(RuntimeState* state, RuntimeProfile* profile) override;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index c70bdd4ca19..19cb2e50109 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -42,8 +42,11 @@ ODBCConnectorParam
VOdbcTableWriter::create_connect_param(const doris::TDataSink
}
VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& t_sink,
- const VExprContextSPtrs& output_expr_ctxs)
- : AsyncResultWriter(output_expr_ctxs),
ODBCConnector(create_connect_param(t_sink)) {}
+ const VExprContextSPtrs& output_expr_ctxs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency>
fin_dep)
+ : AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
+ ODBCConnector(create_connect_param(t_sink)) {}
Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Block output_block;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h
b/be/src/vec/sink/writer/vodbc_table_writer.h
index fa4dc47b77f..9638dea684a 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -36,7 +36,9 @@ class VOdbcTableWriter final : public AsyncResultWriter,
public ODBCConnector {
public:
static ODBCConnectorParam create_connect_param(const TDataSink&);
- VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
// connect to odbc server
Status open(RuntimeState* state, RuntimeProfile* profile) override {
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index d3d6c35fc42..99eac0c1e51 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -976,8 +976,10 @@ void VNodeChannel::mark_close(bool hang_wait) {
_eos_is_produced = true;
}
-VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs)
- : AsyncResultWriter(output_exprs), _t_sink(t_sink) {
+VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep)
+ : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
_transfer_large_data_by_brpc = config::transfer_large_data_by_brpc;
}
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index b9fbc4d0873..993f9781955 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -542,7 +542,9 @@ namespace doris::vectorized {
// write result to file
class VTabletWriter final : public AsyncResultWriter {
public:
- VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
Status write(RuntimeState* state, Block& block) override;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 8bf0520aba0..a73cd5b4444 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -55,8 +55,10 @@
namespace doris::vectorized {
-VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs)
- : AsyncResultWriter(output_exprs), _t_sink(t_sink) {
+VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep)
+ : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
DCHECK(t_sink.__isset.olap_table_sink);
}
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 363dea54c3b..c3be80ce93e 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -102,7 +102,9 @@ using RowsForTablet = std::unordered_map<int64_t, Rows>;
class VTabletWriterV2 final : public AsyncResultWriter {
public:
// Construct from thrift struct which is generated by FE.
- VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs,
+ std::shared_ptr<pipeline::Dependency> dep,
+ std::shared_ptr<pipeline::Dependency> fin_dep);
~VTabletWriterV2() override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]