This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c74ca15753 [pipeline](sink) Supprt Async Writer Sink of result file
sink and memory scratch sink (#23589)
c74ca15753 is described below
commit c74ca157530a8b10bf24324f957df67e273599a4
Author: HappenLee <[email protected]>
AuthorDate: Thu Aug 31 22:44:25 2023 +0800
[pipeline](sink) Supprt Async Writer Sink of result file sink and memory
scratch sink (#23589)
---
be/src/exec/data_sink.cpp | 22 +++----
be/src/exec/data_sink.h | 4 +-
be/src/runtime/record_batch_queue.h | 2 +
be/src/runtime/result_writer.h | 11 +---
.../runtime/vfile_writer_wrapper.h} | 35 ++++++++--
be/src/vec/runtime/vparquet_writer.h | 30 +--------
.../sink/{vtable_sink.h => async_writer_sink.h} | 34 ++++------
be/src/vec/sink/multi_cast_data_stream_sink.h | 6 +-
be/src/vec/sink/vdata_stream_sender.cpp | 2 -
be/src/vec/sink/vdata_stream_sender.h | 2 -
be/src/vec/sink/vmemory_scratch_sink.cpp | 4 ++
be/src/vec/sink/vmemory_scratch_sink.h | 2 +-
be/src/vec/sink/vresult_file_sink.cpp | 77 ++++++----------------
be/src/vec/sink/vresult_file_sink.h | 25 +++----
be/src/vec/sink/vresult_sink.cpp | 1 +
be/src/vec/sink/vresult_sink.h | 4 +-
be/src/vec/sink/vtablet_sink.h | 5 --
be/src/vec/sink/vtablet_sink_v2.h | 5 --
be/src/vec/sink/writer/async_result_writer.h | 4 +-
.../writer}/vfile_result_writer.cpp | 51 +++++++-------
.../{runtime => sink/writer}/vfile_result_writer.h | 34 +++++++---
be/src/vec/sink/writer/vjdbc_table_writer.h | 2 +
be/src/vec/sink/writer/vmysql_table_writer.cpp | 4 +-
be/src/vec/sink/writer/vmysql_table_writer.h | 4 +-
be/src/vec/sink/writer/vodbc_table_writer.h | 3 +-
25 files changed, 159 insertions(+), 214 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 53f44dd590..5c40475eeb 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -30,12 +30,12 @@
#include <string>
#include "common/config.h"
+#include "vec/sink/async_writer_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmemory_scratch_sink.h"
#include "vec/sink/vresult_file_sink.h"
#include "vec/sink/vresult_sink.h"
-#include "vec/sink/vtable_sink.h"
#include "vec/sink/vtablet_sink.h"
#include "vec/sink/vtablet_sink_v2.h"
@@ -92,9 +92,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
params.destinations,
send_query_statistics_with_every_batch, output_exprs,
desc_tbl));
} else {
- sink->reset(new doris::vectorized::VResultFileSink(
- state, pool, row_desc, thrift_sink.result_file_sink,
- send_query_statistics_with_every_batch, output_exprs));
+ sink->reset(new doris::vectorized::VResultFileSink(row_desc,
output_exprs));
}
break;
}
@@ -112,7 +110,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
return Status::InternalError("Missing data buffer sink.");
}
vectorized::VMysqlTableSink* vmysql_tbl_sink =
- new vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
+ new vectorized::VMysqlTableSink(row_desc, output_exprs);
sink->reset(vmysql_tbl_sink);
break;
#else
@@ -124,7 +122,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
- sink->reset(new vectorized::VOdbcTableSink(pool, row_desc,
output_exprs));
+ sink->reset(new vectorized::VOdbcTableSink(row_desc, output_exprs));
break;
}
@@ -133,7 +131,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
return Status::InternalError("Missing data jdbc sink.");
}
if (config::enable_java_support) {
- sink->reset(new vectorized::VJdbcTableSink(pool, row_desc,
output_exprs));
+ sink->reset(new vectorized::VJdbcTableSink(row_desc,
output_exprs));
} else {
return Status::InternalError(
"Jdbc table sink is not enabled, you can change be config "
@@ -234,9 +232,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
params.destinations,
send_query_statistics_with_every_batch, output_exprs,
desc_tbl));
} else {
- sink->reset(new doris::vectorized::VResultFileSink(
- state, pool, row_desc, thrift_sink.result_file_sink,
- send_query_statistics_with_every_batch, output_exprs));
+ sink->reset(new doris::vectorized::VResultFileSink(row_desc,
output_exprs));
}
break;
}
@@ -254,7 +250,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
return Status::InternalError("Missing data buffer sink.");
}
vectorized::VMysqlTableSink* vmysql_tbl_sink =
- new vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
+ new vectorized::VMysqlTableSink(row_desc, output_exprs);
sink->reset(vmysql_tbl_sink);
break;
#else
@@ -266,7 +262,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
- sink->reset(new vectorized::VOdbcTableSink(pool, row_desc,
output_exprs));
+ sink->reset(new vectorized::VOdbcTableSink(row_desc, output_exprs));
break;
}
@@ -275,7 +271,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
return Status::InternalError("Missing data jdbc sink.");
}
if (config::enable_java_support) {
- sink->reset(new vectorized::VJdbcTableSink(pool, row_desc,
output_exprs));
+ sink->reset(new vectorized::VJdbcTableSink(row_desc,
output_exprs));
} else {
return Status::InternalError(
"Jdbc table sink is not enabled, you can change be config "
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 8200ff5d4c..778ee2da83 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -104,7 +104,7 @@ public:
DescriptorTbl& desc_tbl);
// Returns the runtime profile for the sink.
- virtual RuntimeProfile* profile() = 0;
+ RuntimeProfile* profile() { return _profile; }
virtual void set_query_statistics(std::shared_ptr<QueryStatistics>
statistics) {
_query_statistics = statistics;
@@ -121,6 +121,8 @@ protected:
std::string _name;
const RowDescriptor& _row_desc;
+ RuntimeProfile* _profile = nullptr; // Allocated from _pool
+
// Maybe this will be transferred to BufferControlBlock.
std::shared_ptr<QueryStatistics> _query_statistics;
diff --git a/be/src/runtime/record_batch_queue.h
b/be/src/runtime/record_batch_queue.h
index 7ababc9b61..7528b85f09 100644
--- a/be/src/runtime/record_batch_queue.h
+++ b/be/src/runtime/record_batch_queue.h
@@ -59,6 +59,8 @@ public:
// Shut down the queue. Wakes up all threads waiting on blocking_get or
blocking_put.
void shutdown();
+ size_t size() { return _queue.get_size(); }
+
private:
BlockingQueue<std::shared_ptr<arrow::RecordBatch>> _queue;
SpinLock _status_lock;
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index a1458f1a71..b6cdd10c3a 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -34,7 +34,6 @@ class RuntimeState;
class ResultWriter {
public:
ResultWriter() = default;
- ResultWriter(bool output_object_data) :
_output_object_data(output_object_data) {}
virtual ~ResultWriter() = default;
virtual Status init(RuntimeState* state) = 0;
@@ -43,7 +42,7 @@ public:
virtual int64_t get_written_rows() const { return _written_rows; }
- virtual bool output_object_data() const { return _output_object_data; }
+ bool output_object_data() const { return _output_object_data; }
virtual Status append_block(vectorized::Block& block) = 0;
@@ -53,17 +52,9 @@ public:
_output_object_data = output_object_data;
}
- static const std::string NULL_IN_CSV;
- virtual void set_header_info(const std::string& header_type, const
std::string& header) {
- _header_type = header_type;
- _header = header;
- }
-
protected:
int64_t _written_rows = 0; // number of rows written
bool _output_object_data = false;
- std::string _header_type;
- std::string _header;
};
} // namespace doris
diff --git a/be/src/runtime/result_writer.cpp
b/be/src/vec/runtime/vfile_writer_wrapper.h
similarity index 50%
rename from be/src/runtime/result_writer.cpp
rename to be/src/vec/runtime/vfile_writer_wrapper.h
index b5537e486c..e418a14ffa 100644
--- a/be/src/runtime/result_writer.cpp
+++ b/be/src/vec/runtime/vfile_writer_wrapper.h
@@ -15,12 +15,37 @@
// specific language governing permissions and limitations
// under the License.
-#include "runtime/result_writer.h"
+#pragma once
-namespace doris {
+#include <memory>
+#include <vector>
-const std::string ResultWriter::NULL_IN_CSV = "\\N";
+#include "common/status.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr_fwd.h"
-}
+namespace doris::vectorized {
-/* vim: set ts=4 sw=4 sts=4 tw=100 expandtab : */
+class VFileWriterWrapper {
+public:
+ VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool
output_object_data)
+ : _output_vexpr_ctxs(output_vexpr_ctxs),
+ _cur_written_rows(0),
+ _output_object_data(output_object_data) {}
+
+ virtual ~VFileWriterWrapper() = default;
+
+ virtual Status prepare() = 0;
+
+ virtual Status write(const Block& block) = 0;
+
+ virtual Status close() = 0;
+
+ virtual int64_t written_len() = 0;
+
+protected:
+ const VExprContextSPtrs& _output_vexpr_ctxs;
+ int64_t _cur_written_rows;
+ bool _output_object_data;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vparquet_writer.h
b/be/src/vec/runtime/vparquet_writer.h
index 22410b5d06..36514c9fe8 100644
--- a/be/src/vec/runtime/vparquet_writer.h
+++ b/be/src/vec/runtime/vparquet_writer.h
@@ -26,12 +26,7 @@
#include <parquet/types.h>
#include <stdint.h>
-#include <memory>
-#include <vector>
-
-#include "common/status.h"
-#include "vec/core/block.h"
-#include "vec/exprs/vexpr_fwd.h"
+#include "vfile_writer_wrapper.h"
namespace doris {
namespace io {
@@ -90,29 +85,6 @@ public:
const TypeDescriptor& type_desc);
};
-class VFileWriterWrapper {
-public:
- VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool
output_object_data)
- : _output_vexpr_ctxs(output_vexpr_ctxs),
- _cur_written_rows(0),
- _output_object_data(output_object_data) {}
-
- virtual ~VFileWriterWrapper() = default;
-
- virtual Status prepare() = 0;
-
- virtual Status write(const Block& block) = 0;
-
- virtual Status close() = 0;
-
- virtual int64_t written_len() = 0;
-
-protected:
- const VExprContextSPtrs& _output_vexpr_ctxs;
- int64_t _cur_written_rows;
- bool _output_object_data;
-};
-
// a wrapper of parquet output stream
class VParquetWriterWrapper final : public VFileWriterWrapper {
public:
diff --git a/be/src/vec/sink/vtable_sink.h b/be/src/vec/sink/async_writer_sink.h
similarity index 75%
rename from be/src/vec/sink/vtable_sink.h
rename to be/src/vec/sink/async_writer_sink.h
index fc41faa175..8eb177ce8e 100644
--- a/be/src/vec/sink/vtable_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -42,10 +42,10 @@ namespace vectorized {
class Block;
template <typename Writer, const char* Name>
-class VTableSink : public DataSink {
+class AsyncWriterSink : public DataSink {
public:
- VTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const
std::vector<TExpr>& t_exprs)
- : DataSink(row_desc), _pool(pool), _t_output_expr(t_exprs) {
+ AsyncWriterSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
t_exprs)
+ : DataSink(row_desc), _t_output_expr(t_exprs) {
_name = Name;
}
@@ -87,14 +87,12 @@ public:
return _writer->sink(block, eos);
}
- RuntimeProfile* profile() override { return _profile; }
-
bool can_write() override { return _writer->can_write(); }
Status close(RuntimeState* state, Status exec_status) override {
if (_writer->need_normal_close()) {
if (exec_status.ok() && !state->is_cancelled()) {
- RETURN_IF_ERROR(_writer->finish_trans());
+ RETURN_IF_ERROR(_writer->commit_trans());
}
RETURN_IF_ERROR(_writer->close());
}
@@ -111,37 +109,31 @@ public:
bool is_close_done() override { return !_writer->is_pending_finish(); }
protected:
- // owned by RuntimeState
- ObjectPool* _pool;
const std::vector<TExpr>& _t_output_expr;
VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<Writer> _writer;
- RuntimeProfile* _profile;
};
inline constexpr char VJDBC_TABLE_SINK_NAME[] = "VJdbcTableSink";
inline constexpr char VODBC_TABLE_SINK_NAME[] = "VOdbcTableSink";
inline constexpr char VMYSQL_TABLE_SINK_NAME[] = "VMysqlTableSink";
-class VJdbcTableSink : public VTableSink<VJdbcTableWriter,
VJDBC_TABLE_SINK_NAME> {
+class VJdbcTableSink : public AsyncWriterSink<VJdbcTableWriter,
VJDBC_TABLE_SINK_NAME> {
public:
- VJdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
- const std::vector<TExpr>& t_exprs)
- : VTableSink<VJdbcTableWriter, VJDBC_TABLE_SINK_NAME>(pool,
row_desc, t_exprs) {};
+ VJdbcTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
t_exprs)
+ : AsyncWriterSink<VJdbcTableWriter,
VJDBC_TABLE_SINK_NAME>(row_desc, t_exprs) {};
};
-class VOdbcTableSink : public VTableSink<VOdbcTableWriter,
VODBC_TABLE_SINK_NAME> {
+class VOdbcTableSink : public AsyncWriterSink<VOdbcTableWriter,
VODBC_TABLE_SINK_NAME> {
public:
- VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
- const std::vector<TExpr>& t_exprs)
- : VTableSink<VOdbcTableWriter, VODBC_TABLE_SINK_NAME>(pool,
row_desc, t_exprs) {};
+ VOdbcTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
t_exprs)
+ : AsyncWriterSink<VOdbcTableWriter,
VODBC_TABLE_SINK_NAME>(row_desc, t_exprs) {};
};
-class VMysqlTableSink : public VTableSink<VMysqlTableWriter,
VMYSQL_TABLE_SINK_NAME> {
+class VMysqlTableSink : public AsyncWriterSink<VMysqlTableWriter,
VMYSQL_TABLE_SINK_NAME> {
public:
- VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
- const std::vector<TExpr>& t_exprs)
- : VTableSink<VMysqlTableWriter, VMYSQL_TABLE_SINK_NAME>(pool,
row_desc, t_exprs) {};
+ VMysqlTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
t_exprs)
+ : AsyncWriterSink<VMysqlTableWriter,
VMYSQL_TABLE_SINK_NAME>(row_desc, t_exprs) {};
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h
b/be/src/vec/sink/multi_cast_data_stream_sink.h
index df210d74ff..364586cad0 100644
--- a/be/src/vec/sink/multi_cast_data_stream_sink.h
+++ b/be/src/vec/sink/multi_cast_data_stream_sink.h
@@ -25,7 +25,9 @@ namespace doris::vectorized {
class MultiCastDataStreamSink : public DataSink {
public:
MultiCastDataStreamSink(std::shared_ptr<pipeline::MultiCastDataStreamer>&
streamer)
- : DataSink(streamer->row_desc()),
_multi_cast_data_streamer(streamer) {};
+ : DataSink(streamer->row_desc()),
_multi_cast_data_streamer(streamer) {
+ _profile = _multi_cast_data_streamer->profile();
+ };
~MultiCastDataStreamSink() override = default;
@@ -39,8 +41,6 @@ public:
// use sink to check can_write, now always true after we support spill to
disk
bool can_write() override { return _multi_cast_data_streamer->can_write();
}
- RuntimeProfile* profile() override { return
_multi_cast_data_streamer->profile(); }
-
std::shared_ptr<pipeline::MultiCastDataStreamer>&
get_multi_cast_data_streamer() {
return _multi_cast_data_streamer;
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 4c547c7840..23194db6d9 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -300,7 +300,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
_pool(pool),
_current_channel_idx(0),
_part_type(sink.output_partition.type),
- _profile(nullptr),
_serialize_batch_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_send_timer(nullptr),
@@ -365,7 +364,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
_pool(pool),
_current_channel_idx(0),
_part_type(TPartitionType::UNPARTITIONED),
- _profile(nullptr),
_serialize_batch_timer(nullptr),
_compress_timer(nullptr),
_brpc_send_timer(nullptr),
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index a7cceb1385..a3e4ccf2a6 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -117,7 +117,6 @@ public:
Status send(RuntimeState* state, Block* block, bool eos = false) override;
Status try_close(RuntimeState* state, Status exec_status) override;
Status close(RuntimeState* state, Status exec_status) override;
- RuntimeProfile* profile() override { return _profile; }
RuntimeState* state() { return _state; }
@@ -192,7 +191,6 @@ protected:
std::vector<Channel<VDataStreamSender>*> _channels;
std::vector<std::shared_ptr<Channel<VDataStreamSender>>>
_channel_shared_ptrs;
- RuntimeProfile* _profile; // Allocated from _pool
RuntimeProfile::Counter* _serialize_batch_timer;
RuntimeProfile::Counter* _compress_timer;
RuntimeProfile::Counter* _brpc_send_timer;
diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp
b/be/src/vec/sink/vmemory_scratch_sink.cpp
index 6e9f5ca743..9b65838a3a 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.cpp
+++ b/be/src/vec/sink/vmemory_scratch_sink.cpp
@@ -90,6 +90,10 @@ Status MemoryScratchSink::open(RuntimeState* state) {
return VExpr::open(_output_vexpr_ctxs, state);
}
+bool MemoryScratchSink::can_write() {
+ return _queue->size() < 10;
+}
+
Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
diff --git a/be/src/vec/sink/vmemory_scratch_sink.h
b/be/src/vec/sink/vmemory_scratch_sink.h
index c6306c481a..848952b1d7 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.h
+++ b/be/src/vec/sink/vmemory_scratch_sink.h
@@ -60,7 +60,7 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
- RuntimeProfile* profile() override { return _profile; }
+ bool can_write() override;
private:
Status _prepare_vexpr(RuntimeState* state);
diff --git a/be/src/vec/sink/vresult_file_sink.cpp
b/be/src/vec/sink/vresult_file_sink.cpp
index e30585419b..66d6414081 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -18,7 +18,6 @@
#include "vec/sink/vresult_file_sink.h"
#include <gen_cpp/DataSinks_types.h>
-#include <gen_cpp/PaloInternalService_types.h>
#include <glog/logging.h>
#include <opentelemetry/nostd/shared_ptr.h>
#include <time.h>
@@ -29,14 +28,10 @@
#include "common/config.h"
#include "common/object_pool.h"
#include "runtime/buffer_control_block.h"
-#include "runtime/exec_env.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"
-#include "util/uid_util.h"
#include "vec/exprs/vexpr.h"
-#include "vec/runtime/vfile_result_writer.h"
namespace doris {
class QueryStatistics;
@@ -45,71 +40,45 @@ class TExpr;
namespace doris::vectorized {
-VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool,
- const RowDescriptor& row_desc, const
TResultFileSink& sink,
- bool send_query_statistics_with_every_batch,
+VResultFileSink::VResultFileSink(const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
- : DataSink(row_desc), _t_output_expr(t_output_expr) {
- CHECK(sink.__isset.file_options);
- _file_opts.reset(new ResultFileOptions(sink.file_options));
- CHECK(sink.__isset.storage_backend_type);
- _storage_type = sink.storage_backend_type;
- _is_top_sink = true;
-
- _name = "VResultFileSink";
- //for impl csv_with_name and csv_with_names_and_types
- _header_type = sink.header_type;
- _header = sink.header;
-}
+ : AsyncWriterSink<VFileResultWriter, VRESULT_FILE_SINK>(row_desc,
t_output_expr) {}
VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int
sender_id,
const RowDescriptor& row_desc, const
TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>&
destinations,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr,
DescriptorTbl& descs)
- : DataSink(row_desc),
- _t_output_expr(t_output_expr),
+ : AsyncWriterSink<VFileResultWriter, VRESULT_FILE_SINK>(row_desc,
t_output_expr),
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false)
{
- CHECK(sink.__isset.file_options);
- _file_opts.reset(new ResultFileOptions(sink.file_options));
- CHECK(sink.__isset.storage_backend_type);
- _storage_type = sink.storage_backend_type;
_is_top_sink = false;
CHECK_EQ(destinations.size(), 1);
_stream_sender.reset(new VDataStreamSender(state, pool, sender_id,
row_desc, sink.dest_node_id,
destinations,
send_query_statistics_with_every_batch));
-
- _name = "VResultFileSink";
- //for impl csv_with_name and csv_with_names_and_types
- _header_type = sink.header_type;
- _header = sink.header;
}
Status VResultFileSink::init(const TDataSink& tsink) {
if (!_is_top_sink) {
RETURN_IF_ERROR(_stream_sender->init(tsink));
}
- return Status::OK();
-}
-Status VResultFileSink::prepare_exprs(RuntimeState* state) {
- // From the thrift expressions create the real exprs.
- RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr,
_output_vexpr_ctxs));
- // Prepare the exprs to run.
- RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
- return Status::OK();
+ auto& sink = tsink.result_file_sink;
+ CHECK(sink.__isset.file_options);
+ _file_opts.reset(new ResultFileOptions(sink.file_options));
+ CHECK(sink.__isset.storage_backend_type);
+ _storage_type = sink.storage_backend_type;
+
+ _name = "VResultFileSink";
+ //for impl csv_with_name and csv_with_names_and_types
+ _header_type = sink.header_type;
+ _header = sink.header;
+
+ return VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs);
}
Status VResultFileSink::prepare(RuntimeState* state) {
- RETURN_IF_ERROR(DataSink::prepare(state));
- std::stringstream title;
- title << "VResultFileSink (fragment_instance_id=" <<
print_id(state->fragment_instance_id())
- << ")";
- // create profile
- _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
- // prepare output_expr
- RETURN_IF_ERROR(prepare_exprs(state));
+ RETURN_IF_ERROR(AsyncWriterSink::prepare(state));
CHECK(_file_opts.get() != nullptr);
if (_is_top_sink) {
@@ -120,7 +89,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
// create writer
_writer.reset(new (std::nothrow) VFileResultWriter(
_file_opts.get(), _storage_type,
state->fragment_instance_id(), _output_vexpr_ctxs,
- _profile, _sender.get(), nullptr,
state->return_object_data_as_binary(),
+ _sender.get(), nullptr, state->return_object_data_as_binary(),
_output_row_descriptor));
} else {
// init channel
@@ -128,13 +97,12 @@ Status VResultFileSink::prepare(RuntimeState* state) {
Block::create_unique(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1);
_writer.reset(new (std::nothrow) VFileResultWriter(
_file_opts.get(), _storage_type,
state->fragment_instance_id(), _output_vexpr_ctxs,
- _profile, nullptr, _output_block.get(),
state->return_object_data_as_binary(),
+ nullptr, _output_block.get(),
state->return_object_data_as_binary(),
_output_row_descriptor));
RETURN_IF_ERROR(_stream_sender->prepare(state));
_profile->add_child(_stream_sender->profile(), true, nullptr);
}
_writer->set_header_info(_header_type, _header);
- RETURN_IF_ERROR(_writer->init(state));
return Status::OK();
}
@@ -142,12 +110,7 @@ Status VResultFileSink::open(RuntimeState* state) {
if (!_is_top_sink) {
RETURN_IF_ERROR(_stream_sender->open(state));
}
- return VExpr::open(_output_vexpr_ctxs, state);
-}
-
-Status VResultFileSink::send(RuntimeState* state, Block* block, bool eos) {
- RETURN_IF_ERROR(_writer->append_block(*block));
- return Status::OK();
+ return AsyncWriterSink::open(state);
}
Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
@@ -157,7 +120,7 @@ Status VResultFileSink::close(RuntimeState* state, Status
exec_status) {
Status final_status = exec_status;
// close the writer
- if (_writer) {
+ if (_writer && _writer->need_normal_close()) {
Status st = _writer->close();
if (!st.ok() && exec_status.ok()) {
// close file writer failed, should return this error to client
diff --git a/be/src/vec/sink/vresult_file_sink.h
b/be/src/vec/sink/vresult_file_sink.h
index 848a6c371d..fdf4843a52 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -24,11 +24,12 @@
#include <vector>
#include "common/status.h"
-#include "exec/data_sink.h"
#include "runtime/descriptors.h"
#include "vec/core/block.h"
+#include "vec/sink/async_writer_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vresult_sink.h"
+#include "vec/sink/writer/vfile_result_writer.h"
namespace doris {
class BufferControlBlock;
@@ -44,51 +45,43 @@ class TResultFileSink;
namespace vectorized {
class VExprContext;
-class VResultFileSink : public DataSink {
+inline constexpr char VRESULT_FILE_SINK[] = "VResultFileSink";
+
+class VResultFileSink : public AsyncWriterSink<VFileResultWriter,
VRESULT_FILE_SINK> {
public:
- VResultFileSink(RuntimeState* state, ObjectPool* pool, const
RowDescriptor& row_desc,
- const TResultFileSink& sink, bool
send_query_statistics_with_every_batch,
- const std::vector<TExpr>& t_output_expr);
+ VResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
t_output_expr);
+
VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr, DescriptorTbl&
descs);
- ~VResultFileSink() override = default;
+
Status init(const TDataSink& thrift_sink) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
- // send data in 'batch' to this backend stream mgr
- // Blocks until all rows in batch are placed in the buffer
- Status send(RuntimeState* state, Block* block, bool eos = false) override;
+
// Flush all buffered data and close all existing channels to destination
// hosts. Further send() calls are illegal after calling close().
Status close(RuntimeState* state, Status exec_status) override;
- RuntimeProfile* profile() override { return _profile; }
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics)
override;
private:
- Status prepare_exprs(RuntimeState* state);
// set file options when sink type is FILE
std::unique_ptr<ResultFileOptions> _file_opts;
TStorageBackendType::type _storage_type;
// Owned by the RuntimeState.
- const std::vector<TExpr>& _t_output_expr;
- VExprContextSPtrs _output_vexpr_ctxs;
RowDescriptor _output_row_descriptor;
std::unique_ptr<Block> _output_block = nullptr;
std::shared_ptr<BufferControlBlock> _sender;
std::unique_ptr<VDataStreamSender> _stream_sender;
- std::shared_ptr<ResultWriter> _writer;
int _buf_size = 1024; // Allocated from _pool
bool _is_top_sink = true;
std::string _header;
std::string _header_type;
-
- RuntimeProfile* _profile;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 2e82011bd7..5fdaaf1628 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -38,6 +38,7 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/sink/vmysql_result_writer.h"
+#include "vec/sink/writer/vfile_result_writer.h"
namespace doris {
class QueryStatistics;
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index c9374d7cf9..a10e60d467 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -135,7 +135,6 @@ public:
// Flush all buffered data and close all existing channels to destination
// hosts. Further send() calls are illegal after calling close().
virtual Status close(RuntimeState* state, Status exec_status) override;
- virtual RuntimeProfile* profile() override { return _profile; }
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics)
override;
@@ -152,8 +151,7 @@ private:
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<ResultWriter> _writer;
- RuntimeProfile* _profile; // Allocated from _pool
- int _buf_size; // Allocated from _pool
+ int _buf_size; // Allocated from _pool
// for fetch data by rowids
TFetchOption _fetch_option;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index e6d4992b48..dc408d392b 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -408,9 +408,6 @@ public:
size_t get_pending_bytes() const;
- // Returns the runtime profile for the sink.
- RuntimeProfile* profile() override { return _profile; }
-
// the consumer func of sending pending batches in every NodeChannel.
// use polling & NodeChannel::try_send_and_fetch_status() to achieve
nonblocking sending.
// only focus on pending batches and channel status, the internal errors
of NodeChannels will be handled by the producer
@@ -464,8 +461,6 @@ private:
OlapTableLocationParam* _slave_location = nullptr;
DorisNodesInfo* _nodes_info = nullptr;
- RuntimeProfile* _profile = nullptr;
-
std::unique_ptr<OlapTabletFinder> _tablet_finder;
// index_channel
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index 5f50463268..047377f4e2 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -132,9 +132,6 @@ public:
Status close(RuntimeState* state, Status close_status) override;
Status send(RuntimeState* state, vectorized::Block* block, bool eos =
false) override;
- // Returns the runtime profile for the sink.
- RuntimeProfile* profile() override { return _profile; }
-
private:
Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool,
LoadStreamStub& stub_template);
@@ -181,8 +178,6 @@ private:
OlapTableLocationParam* _location = nullptr;
DorisNodesInfo* _nodes_info = nullptr;
- RuntimeProfile* _profile = nullptr;
-
std::unique_ptr<OlapTabletFinder> _tablet_finder;
std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index b294e110a9..2f70b05266 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -54,7 +54,9 @@ public:
virtual bool in_transaction() { return false; }
- bool need_normal_close() { return _need_normal_close; }
+ virtual Status commit_trans() { return Status::OK(); }
+
+ bool need_normal_close() const { return _need_normal_close; }
Status init(RuntimeState* state) override { return Status::OK(); }
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
similarity index 94%
rename from be/src/vec/runtime/vfile_result_writer.cpp
rename to be/src/vec/sink/writer/vfile_result_writer.cpp
index 9d5fc4e158..e54c426f74 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/runtime/vfile_result_writer.h"
+#include "vfile_result_writer.h"
#include <gen_cpp/Data_types.h>
#include <gen_cpp/Metrics_types.h>
@@ -72,18 +72,20 @@ namespace doris::vectorized {
const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
using doris::operator<<;
+VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs)
+ : AsyncResultWriter(output_exprs) {}
+
VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
const TStorageBackendType::type
storage_type,
const TUniqueId fragment_instance_id,
const VExprContextSPtrs&
output_vexpr_ctxs,
- RuntimeProfile* parent_profile,
BufferControlBlock* sinker,
- Block* output_block, bool
output_object_data,
+ BufferControlBlock* sinker, Block*
output_block,
+ bool output_object_data,
const RowDescriptor&
output_row_descriptor)
- : _file_opts(file_opts),
+ : AsyncResultWriter(output_vexpr_ctxs),
+ _file_opts(file_opts),
_storage_type(storage_type),
_fragment_instance_id(fragment_instance_id),
- _output_vexpr_ctxs(output_vexpr_ctxs),
- _parent_profile(parent_profile),
_sinker(sinker),
_output_block(output_block),
_output_row_descriptor(output_row_descriptor),
@@ -91,9 +93,9 @@ VFileResultWriter::VFileResultWriter(const ResultFileOptions*
file_opts,
_output_object_data = output_object_data;
}
-Status VFileResultWriter::init(RuntimeState* state) {
+Status VFileResultWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
- _init_profile();
+ _init_profile(profile);
// Delete existing files
if (_file_opts->delete_existing_files) {
RETURN_IF_ERROR(_delete_dir());
@@ -101,8 +103,8 @@ Status VFileResultWriter::init(RuntimeState* state) {
return _create_next_file_writer();
}
-void VFileResultWriter::_init_profile() {
- RuntimeProfile* profile =
_parent_profile->create_child("VFileResultWriter", true, true);
+void VFileResultWriter::_init_profile(RuntimeProfile* parent_profile) {
+ RuntimeProfile* profile =
parent_profile->create_child("VFileResultWriter", true, true);
_append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
_convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime",
"AppendBatchTime");
_file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime",
"AppendBatchTime");
@@ -160,13 +162,13 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
break;
case TFileFormatType::FORMAT_PARQUET:
_vfile_writer.reset(new VParquetWriterWrapper(
- _file_writer_impl.get(), _output_vexpr_ctxs,
_file_opts->parquet_schemas,
+ _file_writer_impl.get(), _vec_output_expr_ctxs,
_file_opts->parquet_schemas,
_file_opts->parquet_commpression_type,
_file_opts->parquert_disable_dictionary,
_file_opts->parquet_version, _output_object_data));
RETURN_IF_ERROR(_vfile_writer->prepare());
break;
case TFileFormatType::FORMAT_ORC:
- _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(),
_output_vexpr_ctxs,
+ _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(),
_vec_output_expr_ctxs,
_file_opts->orc_schema,
_output_object_data));
RETURN_IF_ERROR(_vfile_writer->prepare());
break;
@@ -237,12 +239,9 @@ Status VFileResultWriter::append_block(Block& block) {
}
RETURN_IF_ERROR(write_csv_header());
SCOPED_TIMER(_append_row_batch_timer);
- Status status = Status::OK();
- // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
- // failed, just return the error status
Block output_block;
-
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
block,
-
&output_block));
+ RETURN_IF_ERROR(_projection_block(block, &output_block));
+
if (_vfile_writer) {
RETURN_IF_ERROR(_write_file(output_block));
} else {
@@ -267,7 +266,7 @@ Status VFileResultWriter::_write_csv_file(const Block&
block) {
if (col.column->is_null_at(i)) {
_plain_text_outstream << NULL_IN_CSV;
} else {
- switch (_output_vexpr_ctxs[col_id]->root()->type().type) {
+ switch (_vec_output_expr_ctxs[col_id]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
_plain_text_outstream << (int)*reinterpret_cast<const
int8_t*>(
@@ -327,7 +326,7 @@ Status VFileResultWriter::_write_csv_file(const Block&
block) {
const DateV2Value<DateTimeV2ValueType>* time_val =
(const
DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i)
.data);
- time_val->to_string(buf,
_output_vexpr_ctxs[col_id]->root()->type().scale);
+ time_val->to_string(buf,
_vec_output_expr_ctxs[col_id]->root()->type().scale);
_plain_text_outstream << buf;
break;
}
@@ -406,9 +405,9 @@ Status VFileResultWriter::_write_csv_file(const Block&
block) {
std::string VFileResultWriter::gen_types() {
std::string types;
- int num_columns = _output_vexpr_ctxs.size();
+ int num_columns = _vec_output_expr_ctxs.size();
for (int i = 0; i < num_columns; ++i) {
- types += type_to_string(_output_vexpr_ctxs[i]->root()->type().type);
+ types += type_to_string(_vec_output_expr_ctxs[i]->root()->type().type);
if (i < num_columns - 1) {
types += _file_opts->column_separator;
}
@@ -419,7 +418,7 @@ std::string VFileResultWriter::gen_types() {
Status VFileResultWriter::write_csv_header() {
if (!_header_sent && _header.size() > 0) {
- std::string tmp_header = _header;
+ std::string tmp_header(_header);
if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
tmp_header += gen_types();
}
@@ -628,9 +627,13 @@ Status VFileResultWriter::close() {
// because `_close_file_writer()` may be called in deconstructor,
// at that time, the RuntimeState may already been deconstructed,
// so does the profile in RuntimeState.
- COUNTER_SET(_written_rows_counter, _written_rows);
- SCOPED_TIMER(_writer_close_timer);
+ if (_written_rows_counter) {
+ COUNTER_SET(_written_rows_counter, _written_rows);
+ SCOPED_TIMER(_writer_close_timer);
+ }
return _close_file_writer(true);
}
+const string VFileResultWriter::NULL_IN_CSV = "\\N";
+
} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
similarity index 85%
rename from be/src/vec/runtime/vfile_result_writer.h
rename to be/src/vec/sink/writer/vfile_result_writer.h
index 4ec425755d..b56e41c377 100644
--- a/be/src/vec/runtime/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -29,10 +29,10 @@
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "runtime/descriptors.h"
-#include "runtime/result_writer.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
-#include "vec/runtime/vparquet_writer.h"
+#include "vec/runtime/vfile_writer_wrapper.h"
+#include "vec/sink/writer/async_result_writer.h"
namespace doris {
class BufferControlBlock;
@@ -47,35 +47,45 @@ struct ResultFileOptions;
namespace doris::vectorized {
// write result to file
-class VFileResultWriter final : public ResultWriter {
+class VFileResultWriter final : public AsyncResultWriter {
public:
VFileResultWriter(const ResultFileOptions* file_option,
const TStorageBackendType::type storage_type,
const TUniqueId fragment_instance_id,
- const VExprContextSPtrs& _output_vexpr_ctxs,
RuntimeProfile* parent_profile,
- BufferControlBlock* sinker, Block* output_block, bool
output_object_data,
+ const VExprContextSPtrs& _output_vexpr_ctxs,
BufferControlBlock* sinker,
+ Block* output_block, bool output_object_data,
const RowDescriptor& output_row_descriptor);
- virtual ~VFileResultWriter() = default;
+
+ VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
Status append_block(Block& block) override;
- Status init(RuntimeState* state) override;
Status close() override;
+ Status open(RuntimeState* state, RuntimeProfile* profile) override {
+ return _init(state, profile);
+ }
+
// file result writer always return statistic result in one row
int64_t get_written_rows() const override { return 1; }
std::string gen_types();
Status write_csv_header();
+ void set_header_info(const std::string& header_type, const std::string&
header) {
+ _header_type = header_type;
+ _header = header;
+ }
+
private:
+ Status _init(RuntimeState* state, RuntimeProfile*);
Status _write_file(const Block& block);
Status _write_csv_file(const Block& block);
// if buffer exceed the limit, write the data buffered in
_plain_text_outstream via file_writer
// if eos, write the data even if buffer is not full.
Status _flush_plain_text_outstream(bool eos);
- void _init_profile();
+ void _init_profile(RuntimeProfile*);
Status _create_file_writer(const std::string& file_name);
Status _create_next_file_writer();
@@ -96,11 +106,12 @@ private:
// delete the dir of file_path
Status _delete_dir();
+ static const std::string NULL_IN_CSV;
+
RuntimeState* _state; // not owned, set when init
const ResultFileOptions* _file_opts;
TStorageBackendType::type _storage_type;
TUniqueId _fragment_instance_id;
- const VExprContextSPtrs& _output_vexpr_ctxs;
// If the result file format is plain text, like CSV, this _file_writer is
owned by this FileResultWriter.
// If the result file format is Parquet, this _file_writer is owned by
_parquet_writer.
@@ -119,7 +130,6 @@ private:
// the suffix idx of export file name, start at 0
int _file_idx = 0;
- RuntimeProfile* _parent_profile; // profile from result sink, not owned
// total time cost on append batch operation
RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
// tuple convert timer, child timer of _append_row_batch_timer
@@ -142,5 +152,9 @@ private:
RowDescriptor _output_row_descriptor;
// parquet/orc file writer
std::unique_ptr<VFileWriterWrapper> _vfile_writer;
+
+ std::string_view _header_type;
+ std::string_view _header;
+ std::unique_ptr<VFileResultWriter> _writer;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index 1aa1e59482..205f0835f5 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -50,6 +50,8 @@ public:
bool in_transaction() override { return
TableConnector::_is_in_transaction; }
+ Status commit_trans() override { return JdbcConnector::finish_trans(); }
+
private:
JdbcConnectorParam _param;
};
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index a9dad2ca80..6897a6b3e2 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -115,12 +115,12 @@ Status VMysqlTableWriter::append_block(vectorized::Block&
block) {
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
for (int i = 0; i < num_rows; ++i) {
- RETURN_IF_ERROR(insert_row(output_block, i));
+ RETURN_IF_ERROR(_insert_row(output_block, i));
}
return Status::OK();
}
-Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) {
+Status VMysqlTableWriter::_insert_row(vectorized::Block& block, size_t row) {
_insert_stmt_buffer.clear();
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (",
_conn_info.table_name);
int num_columns = _vec_output_expr_ctxs.size();
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h
b/be/src/vec/sink/writer/vmysql_table_writer.h
index bb134f9a58..9028801144 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -53,12 +53,10 @@ public:
Status append_block(vectorized::Block& block) override;
- Status finish_trans() { return Status::OK(); }
-
Status close() override;
private:
- Status insert_row(vectorized::Block& block, size_t row);
+ Status _insert_row(vectorized::Block& block, size_t row);
MysqlConnInfo _conn_info;
fmt::memory_buffer _insert_stmt_buffer;
MYSQL* _mysql_conn;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h
b/be/src/vec/sink/writer/vodbc_table_writer.h
index e07f44c9e4..3df973e4b1 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -1,4 +1,3 @@
-
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -50,6 +49,8 @@ public:
Status close() override { return ODBCConnector::close(); }
bool in_transaction() override { return
TableConnector::_is_in_transaction; }
+
+ Status commit_trans() override { return ODBCConnector::finish_trans(); }
};
} // namespace vectorized
} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]