This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c58d18147b0 [refactor](resultwriter) rename append_block to write 
method is more meaningful (#29635)
c58d18147b0 is described below

commit c58d18147b0ba361f008d95391b4697e226b5cea
Author: yiguolei <[email protected]>
AuthorDate: Mon Jan 8 00:00:33 2024 +0800

    [refactor](resultwriter) rename append_block to write method is more 
meaningful (#29635)
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/pipeline/exec/result_sink_operator.cpp               | 2 +-
 be/src/runtime/result_writer.h                              | 3 ++-
 be/src/service/point_query_executor.cpp                     | 2 +-
 be/src/vec/sink/async_writer_sink.h                         | 2 +-
 be/src/vec/sink/multi_cast_data_stream_sink.h               | 3 +--
 be/src/vec/sink/varrow_flight_result_writer.cpp             | 2 +-
 be/src/vec/sink/varrow_flight_result_writer.h               | 2 +-
 be/src/vec/sink/vmysql_result_writer.cpp                    | 2 +-
 be/src/vec/sink/vmysql_result_writer.h                      | 2 +-
 be/src/vec/sink/vresult_sink.cpp                            | 2 +-
 be/src/vec/sink/writer/async_result_writer.cpp              | 2 +-
 be/src/vec/sink/writer/async_result_writer.h                | 9 +++------
 be/src/vec/sink/writer/vfile_result_writer.cpp              | 2 +-
 be/src/vec/sink/writer/vfile_result_writer.h                | 2 +-
 be/src/vec/sink/writer/vjdbc_table_writer.cpp               | 2 +-
 be/src/vec/sink/writer/vjdbc_table_writer.h                 | 2 +-
 be/src/vec/sink/writer/vmysql_table_writer.cpp              | 2 +-
 be/src/vec/sink/writer/vmysql_table_writer.h                | 2 +-
 be/src/vec/sink/writer/vodbc_table_writer.cpp               | 2 +-
 be/src/vec/sink/writer/vodbc_table_writer.h                 | 2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp                   | 6 +++---
 be/src/vec/sink/writer/vtablet_writer.h                     | 2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp                | 6 +++---
 be/src/vec/sink/writer/vtablet_writer_v2.h                  | 2 +-
 be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp | 2 +-
 25 files changed, 32 insertions(+), 35 deletions(-)

diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index dcf0d996c62..8dc6eed2998 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -136,7 +136,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block,
     if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
         RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
     }
-    RETURN_IF_ERROR(local_state._writer->append_block(*block));
+    RETURN_IF_ERROR(local_state._writer->write(*block));
     if (_fetch_option.use_two_phase_fetch) {
         // Block structure may be changed by calling 
_second_phase_fetch_data().
         // So we should clear block in case of unmatched columns
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index f199d2f6b9d..f65f06399b8 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -44,7 +44,8 @@ public:
 
     [[nodiscard]] bool output_object_data() const { return 
_output_object_data; }
 
-    virtual Status append_block(vectorized::Block& block) = 0;
+    // Write is sync, it will do real IO work.
+    virtual Status write(vectorized::Block& block) = 0;
 
     virtual bool can_sink() { return true; }
 
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index d122a9d3cb7..a86d5ed90b8 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -334,7 +334,7 @@ template <typename MysqlWriter>
 Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block,
                         PTabletKeyLookupResponse* response) {
     block.clear_names();
-    RETURN_IF_ERROR(mysql_writer.append_block(block));
+    RETURN_IF_ERROR(mysql_writer.write(block));
     assert(mysql_writer.results().size() == 1);
     uint8_t* buf = nullptr;
     uint32_t len = 0;
diff --git a/be/src/vec/sink/async_writer_sink.h 
b/be/src/vec/sink/async_writer_sink.h
index 8963f9a4ec0..600eb609281 100644
--- a/be/src/vec/sink/async_writer_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -82,7 +82,7 @@ public:
         SCOPED_TIMER(_exec_timer);
         COUNTER_UPDATE(_blocks_sent_counter, 1);
         COUNTER_UPDATE(_output_rows_counter, block->rows());
-        return _writer->append_block(*block);
+        return _writer->write(*block);
     }
 
     Status sink(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override {
diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h 
b/be/src/vec/sink/multi_cast_data_stream_sink.h
index b2229142837..7cc057013aa 100644
--- a/be/src/vec/sink/multi_cast_data_stream_sink.h
+++ b/be/src/vec/sink/multi_cast_data_stream_sink.h
@@ -36,8 +36,7 @@ public:
         SCOPED_TIMER(_exec_timer);
         COUNTER_UPDATE(_blocks_sent_counter, 1);
         COUNTER_UPDATE(_output_rows_counter, block->rows());
-        static_cast<void>(_multi_cast_data_streamer->push(state, block, eos));
-        return Status::OK();
+        return _multi_cast_data_streamer->push(state, block, eos);
     };
 
     Status open(doris::RuntimeState* state) override { return Status::OK(); };
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp 
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index 771040bfb8b..49885b3420e 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -52,7 +52,7 @@ void VArrowFlightResultWriter::_init_profile() {
     _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", 
TUnit::BYTES);
 }
 
-Status VArrowFlightResultWriter::append_block(Block& input_block) {
+Status VArrowFlightResultWriter::write(Block& input_block) {
     SCOPED_TIMER(_append_row_batch_timer);
     Status status = Status::OK();
     if (UNLIKELY(input_block.rows() == 0)) {
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h 
b/be/src/vec/sink/varrow_flight_result_writer.h
index b9b44d1dfdf..774d938bc9d 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -43,7 +43,7 @@ public:
 
     Status init(RuntimeState* state) override;
 
-    Status append_block(Block& block) override;
+    Status write(Block& block) override;
 
     bool can_sink() override;
 
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp 
b/be/src/vec/sink/vmysql_result_writer.cpp
index 6a7e16a4841..41f0682b1c3 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -105,7 +105,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
 }
 
 template <bool is_binary_format>
-Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
+Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) {
     SCOPED_TIMER(_append_row_batch_timer);
     Status status = Status::OK();
     if (UNLIKELY(input_block.rows() == 0)) {
diff --git a/be/src/vec/sink/vmysql_result_writer.h 
b/be/src/vec/sink/vmysql_result_writer.h
index 10a0b7e9e04..8227d09dcc1 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -47,7 +47,7 @@ public:
 
     Status init(RuntimeState* state) override;
 
-    Status append_block(Block& block) override;
+    Status write(Block& block) override;
 
     bool can_sink() override;
 
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index c6e7e7b87d3..3fa2e035976 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -141,7 +141,7 @@ Status VResultSink::send(RuntimeState* state, Block* block, 
bool eos) {
         DCHECK(_sink_type == TResultSinkType::MYSQL_PROTOCAL);
         RETURN_IF_ERROR(second_phase_fetch_data(state, block));
     }
-    RETURN_IF_ERROR(_writer->append_block(*block));
+    RETURN_IF_ERROR(_writer->write(*block));
     if (_fetch_option.use_two_phase_fetch) {
         // Block structure may be changed by calling 
_second_phase_fetch_data().
         // So we should clear block in case of unmatched columns
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 422dc2efef4..93b86ca843e 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -111,7 +111,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
             }
 
             auto block = _get_block_from_queue();
-            auto status = write(block);
+            auto status = write(*block);
             if (!status.ok()) [[unlikely]] {
                 std::unique_lock l(_m);
                 _writer_status = status;
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index e91ff1a0701..d48e41daa62 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -51,7 +51,7 @@ class Block;
  *
  *  The Sub class of AsyncResultWriter need to impl two virtual function
  *     * Status open() the first time IO work like: create file/ connect 
networking
- *     * Status append_block() do the real IO work for block 
+ *     * Status write() do the real IO work for block 
  */
 class AsyncResultWriter : public ResultWriter {
 public:
@@ -71,8 +71,6 @@ public:
 
     virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
 
-    Status write(std::unique_ptr<Block>& block) { return append_block(*block); 
}
-
     bool can_write() {
         std::lock_guard l(_m);
         return _data_queue_is_available() || _is_finished();
@@ -80,9 +78,7 @@ public:
 
     [[nodiscard]] bool is_pending_finish() const { return 
!_writer_thread_closed; }
 
-    void process_block(RuntimeState* state, RuntimeProfile* profile);
-
-    // sink the block date to date queue
+    // sink the block date to date queue, it is async
     Status sink(Block* block, bool eos);
 
     // Add the IO thread task process block() to thread pool to dispose the IO
@@ -99,6 +95,7 @@ protected:
     void _return_free_block(std::unique_ptr<Block>);
 
 private:
+    void process_block(RuntimeState* state, RuntimeProfile* profile);
     [[nodiscard]] bool _data_queue_is_available() const { return 
_data_queue.size() < QUEUE_SIZE; }
     [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || 
_eos; }
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 67c386c24d7..f7015e17c96 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -221,7 +221,7 @@ std::string VFileResultWriter::_file_format_to_name() {
     }
 }
 
-Status VFileResultWriter::append_block(Block& block) {
+Status VFileResultWriter::write(Block& block) {
     if (block.rows() == 0) {
         return Status::OK();
     }
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index 0e7e0ccefc0..864d0966a77 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -58,7 +58,7 @@ public:
 
     VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    Status append_block(Block& block) override;
+    Status write(Block& block) override;
 
     Status close(Status s = Status::OK()) override;
 
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp 
b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
index d4cb90d0a5c..a4805921eff 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
@@ -54,7 +54,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
                                    const VExprContextSPtrs& output_expr_ctxs)
         : AsyncResultWriter(output_expr_ctxs), 
JdbcConnector(create_connect_param(t_sink)) {}
 
-Status VJdbcTableWriter::append_block(vectorized::Block& block) {
+Status VJdbcTableWriter::write(vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h 
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index b1e4d5ad284..735c023fce5 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -44,7 +44,7 @@ public:
         return init_to_write(profile);
     }
 
-    Status append_block(vectorized::Block& block) override;
+    Status write(vectorized::Block& block) override;
 
     Status close(Status s) override { return JdbcConnector::close(s); }
 
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp 
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index 4a3513c80f4..d9ca6d96f99 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
     return Status::OK();
 }
 
-Status VMysqlTableWriter::append_block(vectorized::Block& block) {
+Status VMysqlTableWriter::write(vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h 
b/be/src/vec/sink/writer/vmysql_table_writer.h
index a88c5730cbd..856d0a21ec5 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -51,7 +51,7 @@ public:
     // connect to mysql server
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status append_block(vectorized::Block& block) override;
+    Status write(vectorized::Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp 
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index caa5c2c3abd..da068c3d677 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& 
t_sink,
                                    const VExprContextSPtrs& output_expr_ctxs)
         : AsyncResultWriter(output_expr_ctxs), 
ODBCConnector(create_connect_param(t_sink)) {}
 
-Status VOdbcTableWriter::append_block(vectorized::Block& block) {
+Status VOdbcTableWriter::write(vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h 
b/be/src/vec/sink/writer/vodbc_table_writer.h
index 4c0e6a19f6a..a28947355e7 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -44,7 +44,7 @@ public:
         return init_to_write(profile);
     }
 
-    Status append_block(vectorized::Block& block) override;
+    Status write(vectorized::Block& block) override;
 
     Status close(Status s) override { return ODBCConnector::close(s); }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 3741e80fab2..a216a5034c3 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1337,9 +1337,9 @@ Status VTabletWriter::_send_new_partition_batch() {
         // these order is only.
         //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
         //  2. deal batched block
-        //  3. now reuse the column of lval block. cuz append_block doesn't 
real adjust it. it generate a new block from that.
+        //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
         _row_distribution.clear_batching_stats();
-        RETURN_IF_ERROR(this->append_block(tmp_block));
+        RETURN_IF_ERROR(this->write(tmp_block));
         _row_distribution._batching_block->set_mutable_columns(
                 tmp_block.mutate_columns()); // Recovery back
         _row_distribution._batching_block->clear_column_data();
@@ -1606,7 +1606,7 @@ void VTabletWriter::_generate_index_channels_payloads(
     }
 }
 
-Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
+Status VTabletWriter::write(doris::vectorized::Block& input_block) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 05a9c455ca2..a93c2bffc78 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -518,7 +518,7 @@ public:
 
     Status init_properties(ObjectPool* pool);
 
-    Status append_block(Block& block) override;
+    Status write(Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index e02fba7c21c..bf4033eebed 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -365,7 +365,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, 
int64_t partition_id,
     return Status::OK();
 }
 
-Status VTabletWriterV2::append_block(Block& input_block) {
+Status VTabletWriterV2::write(Block& input_block) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
 
@@ -470,9 +470,9 @@ Status VTabletWriterV2::_send_new_partition_batch() {
         // these order is only.
         //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
         //  2. deal batched block
-        //  3. now reuse the column of lval block. cuz append_block doesn't 
real adjust it. it generate a new block from that.
+        //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
         _row_distribution.clear_batching_stats();
-        RETURN_IF_ERROR(this->append_block(tmp_block));
+        RETURN_IF_ERROR(this->write(tmp_block));
         _row_distribution._batching_block->set_mutable_columns(
                 tmp_block.mutate_columns()); // Recovery back
         _row_distribution._batching_block->clear_column_data();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index c829b254c77..e4cdcdca09e 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -108,7 +108,7 @@ public:
 
     Status init_properties(ObjectPool* pool);
 
-    Status append_block(Block& block) override;
+    Status write(Block& block) override;
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
index 5fff0d75ccf..e781960c5e9 100644
--- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
@@ -319,7 +319,7 @@ void serialize_and_deserialize_mysql_test() {
     // mysql_writer init
     vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, 
_output_vexpr_ctxs, nullptr);
 
-    Status st = mysql_writer.append_block(block);
+    Status st = mysql_writer.write(block);
     EXPECT_TRUE(st.ok());
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to