This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit a41da6d5ff7c345b3551ad04356d6c679d5c4dfe Author: dataroaring <[email protected]> AuthorDate: Thu Mar 31 12:36:33 2022 +0800 [fix](data-sink) Sinks call DataSink::close instead of operating _closed directly (#8727) TabletSink::_is_closed is duplicated with DataSink::_closed and all sinks should call DataSink::close rather than set _closed directly. Fix for https://github.com/apache/incubator-doris/issues/8726. --- be/src/exec/tablet_sink.cpp | 4 ++-- be/src/exec/tablet_sink.h | 2 -- be/src/runtime/data_stream_sender.cpp | 2 +- be/src/runtime/export_sink.cpp | 5 ++++- be/src/runtime/memory_scratch_sink.cpp | 3 +-- be/src/runtime/mysql_table_sink.cpp | 5 ++++- be/src/runtime/odbc_table_sink.cpp | 5 ++++- be/src/runtime/result_sink.cpp | 3 +-- be/src/vec/sink/result_sink.cpp | 3 +-- be/src/vec/sink/vdata_stream_sender.cpp | 6 ++++-- 10 files changed, 22 insertions(+), 16 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 6a5f9c6..7cb1c18 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -970,7 +970,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { } Status OlapTableSink::close(RuntimeState* state, Status close_status) { - if (_is_closed) { + if (_closed) { /// The close method may be called twice. /// In the open_internal() method of plan_fragment_executor, close is called once. /// If an error occurs in this call, it will be called again in fragment_mgr. @@ -1083,7 +1083,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { _output_batch.reset(); _close_status = status; - _is_closed = true; + DataSink::close(state, close_status); return status; } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 41d6521..242f45a 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -478,8 +478,6 @@ protected: int64_t _load_channel_timeout_s = 0; int32_t _send_batch_parallelism = 1; - // True if this sink has been closed once bool - bool _is_closed = false; // Save the status of close() method Status _close_status; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 681f5fc..93b0a12 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -641,7 +641,6 @@ Status DataStreamSender::close(RuntimeState* state, Status exec_status) { // TODO: only close channels that didn't have any errors // make all channels close parallel if (_closed) return Status::OK(); - _closed = true; Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { Status st = _channels[i]->close(state); @@ -661,6 +660,7 @@ Status DataStreamSender::close(RuntimeState* state, Status exec_status) { } Expr::close(_partition_expr_ctxs, state); + DataSink::close(state, exec_status); return final_st; } diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp index 9cc9f4c..200bbe0 100644 --- a/be/src/runtime/export_sink.cpp +++ b/be/src/runtime/export_sink.cpp @@ -219,12 +219,15 @@ Status ExportSink::gen_row_buffer(TupleRow* row, std::stringstream* ss) { } Status ExportSink::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } Expr::close(_output_expr_ctxs, state); if (_file_writer != nullptr) { _file_writer->close(); _file_writer = nullptr; } - return Status::OK(); + return DataSink::close(state, exec_status); } Status ExportSink::open_file_writer() { diff --git a/be/src/runtime/memory_scratch_sink.cpp b/be/src/runtime/memory_scratch_sink.cpp index ae3c020..03b87ba 100644 --- a/be/src/runtime/memory_scratch_sink.cpp +++ b/be/src/runtime/memory_scratch_sink.cpp @@ -96,8 +96,7 @@ Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) { _queue->blocking_put(nullptr); } Expr::close(_output_expr_ctxs, state); - _closed = true; - return Status::OK(); + return DataSink::close(state, exec_status); } } // namespace doris diff --git a/be/src/runtime/mysql_table_sink.cpp b/be/src/runtime/mysql_table_sink.cpp index cb7911d..79c43fc 100644 --- a/be/src/runtime/mysql_table_sink.cpp +++ b/be/src/runtime/mysql_table_sink.cpp @@ -80,8 +80,11 @@ Status MysqlTableSink::send(RuntimeState* state, RowBatch* batch) { } Status MysqlTableSink::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } Expr::close(_output_expr_ctxs, state); - return Status::OK(); + return DataSink::close(state, exec_status); } } // namespace doris diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/runtime/odbc_table_sink.cpp index b92b151..8804194 100644 --- a/be/src/runtime/odbc_table_sink.cpp +++ b/be/src/runtime/odbc_table_sink.cpp @@ -93,11 +93,14 @@ Status OdbcTableSink::send(RuntimeState* state, RowBatch* batch) { } Status OdbcTableSink::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } Expr::close(_output_expr_ctxs, state); if (exec_status.ok() && _use_transaction) { RETURN_IF_ERROR(_writer->finish_trans()); } - return Status::OK(); + return DataSink::close(state, exec_status); } } diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index 610f105..8b68b6b 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -130,8 +130,7 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) { Expr::close(_output_expr_ctxs, state); - _closed = true; - return Status::OK(); + return DataSink::close(state, exec_status); } void ResultSink::set_query_statistics(std::shared_ptr<QueryStatistics> statistics) { diff --git a/be/src/vec/sink/result_sink.cpp b/be/src/vec/sink/result_sink.cpp index fda7702..bc71447 100644 --- a/be/src/vec/sink/result_sink.cpp +++ b/be/src/vec/sink/result_sink.cpp @@ -125,8 +125,7 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) { state->fragment_instance_id()); VExpr::close(_output_vexpr_ctxs, state); - _closed = true; - return Status::OK(); + return DataSink::close(state, exec_status); } void VResultSink::set_query_statistics(std::shared_ptr<QueryStatistics> statistics) { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index cff400c..32d5255 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -122,7 +122,9 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { _closure->cntl.Reset(); } VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id - << " dest_node=" << _dest_node_id; + << " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname + << " _packet_seq=" << _packet_seq + << " row_desc=" << _row_desc.debug_string(); if (_is_transfer_chain && (_send_query_statistics_with_every_batch || eos)) { auto statistic = _brpc_request.mutable_query_statistics(); _parent->_query_statistics->to_pb(statistic); @@ -499,7 +501,6 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { if (_closed) return Status::OK(); - _closed = true; Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { @@ -519,6 +520,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { iter->close(state); } VExpr::close(_partition_expr_ctxs, state); + DataSink::close(state, exec_status); return final_st; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
