This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit a41da6d5ff7c345b3551ad04356d6c679d5c4dfe
Author: dataroaring <[email protected]>
AuthorDate: Thu Mar 31 12:36:33 2022 +0800

    [fix](data-sink) Sinks call DataSink::close instead of operating _closed 
directly (#8727)
    
    TabletSink::_is_closed is duplicated with DataSink::_closed and
    all sinks should call DataSink::close rather than set _closed
    directly.
    
    Fix for https://github.com/apache/incubator-doris/issues/8726.
---
 be/src/exec/tablet_sink.cpp             | 4 ++--
 be/src/exec/tablet_sink.h               | 2 --
 be/src/runtime/data_stream_sender.cpp   | 2 +-
 be/src/runtime/export_sink.cpp          | 5 ++++-
 be/src/runtime/memory_scratch_sink.cpp  | 3 +--
 be/src/runtime/mysql_table_sink.cpp     | 5 ++++-
 be/src/runtime/odbc_table_sink.cpp      | 5 ++++-
 be/src/runtime/result_sink.cpp          | 3 +--
 be/src/vec/sink/result_sink.cpp         | 3 +--
 be/src/vec/sink/vdata_stream_sender.cpp | 6 ++++--
 10 files changed, 22 insertions(+), 16 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 6a5f9c6..7cb1c18 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -970,7 +970,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* 
input_batch) {
 }
 
 Status OlapTableSink::close(RuntimeState* state, Status close_status) {
-    if (_is_closed) {
+    if (_closed) {
         /// The close method may be called twice.
         /// In the open_internal() method of plan_fragment_executor, close is 
called once.
         /// If an error occurs in this call, it will be called again in 
fragment_mgr.
@@ -1083,7 +1083,7 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
     _output_batch.reset();
 
     _close_status = status;
-    _is_closed = true;
+    DataSink::close(state, close_status);
     return status;
 }
 
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 41d6521..242f45a 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -478,8 +478,6 @@ protected:
     int64_t _load_channel_timeout_s = 0;
 
     int32_t _send_batch_parallelism = 1;
-    // True if this sink has been closed once bool
-    bool _is_closed = false;
     // Save the status of close() method
     Status _close_status;
 
diff --git a/be/src/runtime/data_stream_sender.cpp 
b/be/src/runtime/data_stream_sender.cpp
index 681f5fc..93b0a12 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -641,7 +641,6 @@ Status DataStreamSender::close(RuntimeState* state, Status 
exec_status) {
     // TODO: only close channels that didn't have any errors
     // make all channels close parallel
     if (_closed) return Status::OK();
-    _closed = true;
     Status final_st = Status::OK();
     for (int i = 0; i < _channels.size(); ++i) {
         Status st = _channels[i]->close(state);
@@ -661,6 +660,7 @@ Status DataStreamSender::close(RuntimeState* state, Status 
exec_status) {
     }
     Expr::close(_partition_expr_ctxs, state);
 
+    DataSink::close(state, exec_status);
     return final_st;
 }
 
diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp
index 9cc9f4c..200bbe0 100644
--- a/be/src/runtime/export_sink.cpp
+++ b/be/src/runtime/export_sink.cpp
@@ -219,12 +219,15 @@ Status ExportSink::gen_row_buffer(TupleRow* row, 
std::stringstream* ss) {
 }
 
 Status ExportSink::close(RuntimeState* state, Status exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
     Expr::close(_output_expr_ctxs, state);
     if (_file_writer != nullptr) {
         _file_writer->close();
         _file_writer = nullptr;
     }
-    return Status::OK();
+    return DataSink::close(state, exec_status);
 }
 
 Status ExportSink::open_file_writer() {
diff --git a/be/src/runtime/memory_scratch_sink.cpp 
b/be/src/runtime/memory_scratch_sink.cpp
index ae3c020..03b87ba 100644
--- a/be/src/runtime/memory_scratch_sink.cpp
+++ b/be/src/runtime/memory_scratch_sink.cpp
@@ -96,8 +96,7 @@ Status MemoryScratchSink::close(RuntimeState* state, Status 
exec_status) {
         _queue->blocking_put(nullptr);
     }
     Expr::close(_output_expr_ctxs, state);
-    _closed = true;
-    return Status::OK();
+    return DataSink::close(state, exec_status);
 }
 
 } // namespace doris
diff --git a/be/src/runtime/mysql_table_sink.cpp 
b/be/src/runtime/mysql_table_sink.cpp
index cb7911d..79c43fc 100644
--- a/be/src/runtime/mysql_table_sink.cpp
+++ b/be/src/runtime/mysql_table_sink.cpp
@@ -80,8 +80,11 @@ Status MysqlTableSink::send(RuntimeState* state, RowBatch* 
batch) {
 }
 
 Status MysqlTableSink::close(RuntimeState* state, Status exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
     Expr::close(_output_expr_ctxs, state);
-    return Status::OK();
+    return DataSink::close(state, exec_status);
 }
 
 } // namespace doris
diff --git a/be/src/runtime/odbc_table_sink.cpp 
b/be/src/runtime/odbc_table_sink.cpp
index b92b151..8804194 100644
--- a/be/src/runtime/odbc_table_sink.cpp
+++ b/be/src/runtime/odbc_table_sink.cpp
@@ -93,11 +93,14 @@ Status OdbcTableSink::send(RuntimeState* state, RowBatch* 
batch) {
 }
 
 Status OdbcTableSink::close(RuntimeState* state, Status exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
     Expr::close(_output_expr_ctxs, state);
     if (exec_status.ok() && _use_transaction) {
         RETURN_IF_ERROR(_writer->finish_trans());
     }
-    return Status::OK();
+    return DataSink::close(state, exec_status);
 }
 
 }
diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp
index 610f105..8b68b6b 100644
--- a/be/src/runtime/result_sink.cpp
+++ b/be/src/runtime/result_sink.cpp
@@ -130,8 +130,7 @@ Status ResultSink::close(RuntimeState* state, Status 
exec_status) {
 
     Expr::close(_output_expr_ctxs, state);
 
-    _closed = true;
-    return Status::OK();
+    return DataSink::close(state, exec_status);
 }
 
 void ResultSink::set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
diff --git a/be/src/vec/sink/result_sink.cpp b/be/src/vec/sink/result_sink.cpp
index fda7702..bc71447 100644
--- a/be/src/vec/sink/result_sink.cpp
+++ b/be/src/vec/sink/result_sink.cpp
@@ -125,8 +125,7 @@ Status VResultSink::close(RuntimeState* state, Status 
exec_status) {
             state->fragment_instance_id());
 
     VExpr::close(_output_vexpr_ctxs, state);
-    _closed = true;
-    return Status::OK();
+    return DataSink::close(state, exec_status);
 }
 
 void VResultSink::set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index cff400c..32d5255 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -122,7 +122,9 @@ Status VDataStreamSender::Channel::send_block(PBlock* 
block, bool eos) {
         _closure->cntl.Reset();
     }
     VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id
-             << " dest_node=" << _dest_node_id;
+             << " dest_node=" << _dest_node_id << " to_host=" << 
_brpc_dest_addr.hostname
+            << " _packet_seq=" << _packet_seq
+            << " row_desc=" << _row_desc.debug_string();
     if (_is_transfer_chain && (_send_query_statistics_with_every_batch || 
eos)) {
         auto statistic = _brpc_request.mutable_query_statistics();
         _parent->_query_statistics->to_pb(statistic);
@@ -499,7 +501,6 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
 
 Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
     if (_closed) return Status::OK();
-    _closed = true;
 
     Status final_st = Status::OK();
     for (int i = 0; i < _channels.size(); ++i) {
@@ -519,6 +520,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status 
exec_status) {
         iter->close(state);
     }
     VExpr::close(_partition_expr_ctxs, state);
+    DataSink::close(state, exec_status);
     return final_st;
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to