This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c58d18147b0 [refactor](resultwriter) rename append_block to write
method is more meaningful (#29635)
c58d18147b0 is described below
commit c58d18147b0ba361f008d95391b4697e226b5cea
Author: yiguolei <[email protected]>
AuthorDate: Mon Jan 8 00:00:33 2024 +0800
[refactor](resultwriter) rename append_block to write method is more
meaningful (#29635)
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/exec/result_sink_operator.cpp | 2 +-
be/src/runtime/result_writer.h | 3 ++-
be/src/service/point_query_executor.cpp | 2 +-
be/src/vec/sink/async_writer_sink.h | 2 +-
be/src/vec/sink/multi_cast_data_stream_sink.h | 3 +--
be/src/vec/sink/varrow_flight_result_writer.cpp | 2 +-
be/src/vec/sink/varrow_flight_result_writer.h | 2 +-
be/src/vec/sink/vmysql_result_writer.cpp | 2 +-
be/src/vec/sink/vmysql_result_writer.h | 2 +-
be/src/vec/sink/vresult_sink.cpp | 2 +-
be/src/vec/sink/writer/async_result_writer.cpp | 2 +-
be/src/vec/sink/writer/async_result_writer.h | 9 +++------
be/src/vec/sink/writer/vfile_result_writer.cpp | 2 +-
be/src/vec/sink/writer/vfile_result_writer.h | 2 +-
be/src/vec/sink/writer/vjdbc_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vjdbc_table_writer.h | 2 +-
be/src/vec/sink/writer/vmysql_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vmysql_table_writer.h | 2 +-
be/src/vec/sink/writer/vodbc_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vodbc_table_writer.h | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 6 +++---
be/src/vec/sink/writer/vtablet_writer.h | 2 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 6 +++---
be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +-
be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp | 2 +-
25 files changed, 32 insertions(+), 35 deletions(-)
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index dcf0d996c62..8dc6eed2998 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -136,7 +136,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block,
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}
- RETURN_IF_ERROR(local_state._writer->append_block(*block));
+ RETURN_IF_ERROR(local_state._writer->write(*block));
if (_fetch_option.use_two_phase_fetch) {
// Block structure may be changed by calling
_second_phase_fetch_data().
// So we should clear block in case of unmatched columns
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index f199d2f6b9d..f65f06399b8 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -44,7 +44,8 @@ public:
[[nodiscard]] bool output_object_data() const { return
_output_object_data; }
- virtual Status append_block(vectorized::Block& block) = 0;
+ // Write is sync, it will do real IO work.
+ virtual Status write(vectorized::Block& block) = 0;
virtual bool can_sink() { return true; }
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index d122a9d3cb7..a86d5ed90b8 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -334,7 +334,7 @@ template <typename MysqlWriter>
Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block,
PTabletKeyLookupResponse* response) {
block.clear_names();
- RETURN_IF_ERROR(mysql_writer.append_block(block));
+ RETURN_IF_ERROR(mysql_writer.write(block));
assert(mysql_writer.results().size() == 1);
uint8_t* buf = nullptr;
uint32_t len = 0;
diff --git a/be/src/vec/sink/async_writer_sink.h
b/be/src/vec/sink/async_writer_sink.h
index 8963f9a4ec0..600eb609281 100644
--- a/be/src/vec/sink/async_writer_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -82,7 +82,7 @@ public:
SCOPED_TIMER(_exec_timer);
COUNTER_UPDATE(_blocks_sent_counter, 1);
COUNTER_UPDATE(_output_rows_counter, block->rows());
- return _writer->append_block(*block);
+ return _writer->write(*block);
}
Status sink(RuntimeState* state, vectorized::Block* block, bool eos =
false) override {
diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h
b/be/src/vec/sink/multi_cast_data_stream_sink.h
index b2229142837..7cc057013aa 100644
--- a/be/src/vec/sink/multi_cast_data_stream_sink.h
+++ b/be/src/vec/sink/multi_cast_data_stream_sink.h
@@ -36,8 +36,7 @@ public:
SCOPED_TIMER(_exec_timer);
COUNTER_UPDATE(_blocks_sent_counter, 1);
COUNTER_UPDATE(_output_rows_counter, block->rows());
- static_cast<void>(_multi_cast_data_streamer->push(state, block, eos));
- return Status::OK();
+ return _multi_cast_data_streamer->push(state, block, eos);
};
Status open(doris::RuntimeState* state) override { return Status::OK(); };
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index 771040bfb8b..49885b3420e 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -52,7 +52,7 @@ void VArrowFlightResultWriter::_init_profile() {
_bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent",
TUnit::BYTES);
}
-Status VArrowFlightResultWriter::append_block(Block& input_block) {
+Status VArrowFlightResultWriter::write(Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h
b/be/src/vec/sink/varrow_flight_result_writer.h
index b9b44d1dfdf..774d938bc9d 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -43,7 +43,7 @@ public:
Status init(RuntimeState* state) override;
- Status append_block(Block& block) override;
+ Status write(Block& block) override;
bool can_sink() override;
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp
b/be/src/vec/sink/vmysql_result_writer.cpp
index 6a7e16a4841..41f0682b1c3 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -105,7 +105,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
}
template <bool is_binary_format>
-Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
+Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
diff --git a/be/src/vec/sink/vmysql_result_writer.h
b/be/src/vec/sink/vmysql_result_writer.h
index 10a0b7e9e04..8227d09dcc1 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -47,7 +47,7 @@ public:
Status init(RuntimeState* state) override;
- Status append_block(Block& block) override;
+ Status write(Block& block) override;
bool can_sink() override;
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index c6e7e7b87d3..3fa2e035976 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -141,7 +141,7 @@ Status VResultSink::send(RuntimeState* state, Block* block,
bool eos) {
DCHECK(_sink_type == TResultSinkType::MYSQL_PROTOCAL);
RETURN_IF_ERROR(second_phase_fetch_data(state, block));
}
- RETURN_IF_ERROR(_writer->append_block(*block));
+ RETURN_IF_ERROR(_writer->write(*block));
if (_fetch_option.use_two_phase_fetch) {
// Block structure may be changed by calling
_second_phase_fetch_data().
// So we should clear block in case of unmatched columns
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 422dc2efef4..93b86ca843e 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -111,7 +111,7 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
}
auto block = _get_block_from_queue();
- auto status = write(block);
+ auto status = write(*block);
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
_writer_status = status;
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index e91ff1a0701..d48e41daa62 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -51,7 +51,7 @@ class Block;
*
* The Sub class of AsyncResultWriter need to impl two virtual function
* * Status open() the first time IO work like: create file/ connect
networking
- * * Status append_block() do the real IO work for block
+ * * Status write() do the real IO work for block
*/
class AsyncResultWriter : public ResultWriter {
public:
@@ -71,8 +71,6 @@ public:
virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
- Status write(std::unique_ptr<Block>& block) { return append_block(*block);
}
-
bool can_write() {
std::lock_guard l(_m);
return _data_queue_is_available() || _is_finished();
@@ -80,9 +78,7 @@ public:
[[nodiscard]] bool is_pending_finish() const { return
!_writer_thread_closed; }
- void process_block(RuntimeState* state, RuntimeProfile* profile);
-
- // sink the block date to date queue
+ // sink the block date to date queue, it is async
Status sink(Block* block, bool eos);
// Add the IO thread task process block() to thread pool to dispose the IO
@@ -99,6 +95,7 @@ protected:
void _return_free_block(std::unique_ptr<Block>);
private:
+ void process_block(RuntimeState* state, RuntimeProfile* profile);
[[nodiscard]] bool _data_queue_is_available() const { return
_data_queue.size() < QUEUE_SIZE; }
[[nodiscard]] bool _is_finished() const { return !_writer_status.ok() ||
_eos; }
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 67c386c24d7..f7015e17c96 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -221,7 +221,7 @@ std::string VFileResultWriter::_file_format_to_name() {
}
}
-Status VFileResultWriter::append_block(Block& block) {
+Status VFileResultWriter::write(Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index 0e7e0ccefc0..864d0966a77 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -58,7 +58,7 @@ public:
VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
- Status append_block(Block& block) override;
+ Status write(Block& block) override;
Status close(Status s = Status::OK()) override;
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
index d4cb90d0a5c..a4805921eff 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
@@ -54,7 +54,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: AsyncResultWriter(output_expr_ctxs),
JdbcConnector(create_connect_param(t_sink)) {}
-Status VJdbcTableWriter::append_block(vectorized::Block& block) {
+Status VJdbcTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index b1e4d5ad284..735c023fce5 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -44,7 +44,7 @@ public:
return init_to_write(profile);
}
- Status append_block(vectorized::Block& block) override;
+ Status write(vectorized::Block& block) override;
Status close(Status s) override { return JdbcConnector::close(s); }
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index 4a3513c80f4..d9ca6d96f99 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
return Status::OK();
}
-Status VMysqlTableWriter::append_block(vectorized::Block& block) {
+Status VMysqlTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h
b/be/src/vec/sink/writer/vmysql_table_writer.h
index a88c5730cbd..856d0a21ec5 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -51,7 +51,7 @@ public:
// connect to mysql server
Status open(RuntimeState* state, RuntimeProfile* profile) override;
- Status append_block(vectorized::Block& block) override;
+ Status write(vectorized::Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index caa5c2c3abd..da068c3d677 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink&
t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: AsyncResultWriter(output_expr_ctxs),
ODBCConnector(create_connect_param(t_sink)) {}
-Status VOdbcTableWriter::append_block(vectorized::Block& block) {
+Status VOdbcTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h
b/be/src/vec/sink/writer/vodbc_table_writer.h
index 4c0e6a19f6a..a28947355e7 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -44,7 +44,7 @@ public:
return init_to_write(profile);
}
- Status append_block(vectorized::Block& block) override;
+ Status write(vectorized::Block& block) override;
Status close(Status s) override { return ODBCConnector::close(s); }
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 3741e80fab2..a216a5034c3 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1337,9 +1337,9 @@ Status VTabletWriter::_send_new_partition_batch() {
// these order is only.
// 1. clear batching stats(and flag goes true) so that we won't make
a new batching process in dealing batched block.
// 2. deal batched block
- // 3. now reuse the column of lval block. cuz append_block doesn't
real adjust it. it generate a new block from that.
+ // 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
- RETURN_IF_ERROR(this->append_block(tmp_block));
+ RETURN_IF_ERROR(this->write(tmp_block));
_row_distribution._batching_block->set_mutable_columns(
tmp_block.mutate_columns()); // Recovery back
_row_distribution._batching_block->clear_column_data();
@@ -1606,7 +1606,7 @@ void VTabletWriter::_generate_index_channels_payloads(
}
}
-Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
+Status VTabletWriter::write(doris::vectorized::Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 05a9c455ca2..a93c2bffc78 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -518,7 +518,7 @@ public:
Status init_properties(ObjectPool* pool);
- Status append_block(Block& block) override;
+ Status write(Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index e02fba7c21c..bf4033eebed 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -365,7 +365,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id,
int64_t partition_id,
return Status::OK();
}
-Status VTabletWriterV2::append_block(Block& input_block) {
+Status VTabletWriterV2::write(Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
@@ -470,9 +470,9 @@ Status VTabletWriterV2::_send_new_partition_batch() {
// these order is only.
// 1. clear batching stats(and flag goes true) so that we won't make
a new batching process in dealing batched block.
// 2. deal batched block
- // 3. now reuse the column of lval block. cuz append_block doesn't
real adjust it. it generate a new block from that.
+ // 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
- RETURN_IF_ERROR(this->append_block(tmp_block));
+ RETURN_IF_ERROR(this->write(tmp_block));
_row_distribution._batching_block->set_mutable_columns(
tmp_block.mutate_columns()); // Recovery back
_row_distribution._batching_block->clear_column_data();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index c829b254c77..e4cdcdca09e 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -108,7 +108,7 @@ public:
Status init_properties(ObjectPool* pool);
- Status append_block(Block& block) override;
+ Status write(Block& block) override;
Status open(RuntimeState* state, RuntimeProfile* profile) override;
diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
index 5fff0d75ccf..e781960c5e9 100644
--- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
@@ -319,7 +319,7 @@ void serialize_and_deserialize_mysql_test() {
// mysql_writer init
vectorized::VMysqlResultWriter<false> mysql_writer(nullptr,
_output_vexpr_ctxs, nullptr);
- Status st = mysql_writer.append_block(block);
+ Status st = mysql_writer.write(block);
EXPECT_TRUE(st.ok());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]