This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 835cf1f [fix](data-sink) Sinks call DataSink::close instead of
operating _closed directly (#8727)
835cf1f is described below
commit 835cf1fe204281e80174dbfd6ca8832849e4916b
Author: dataroaring <[email protected]>
AuthorDate: Thu Mar 31 12:36:33 2022 +0800
[fix](data-sink) Sinks call DataSink::close instead of operating _closed
directly (#8727)
TabletSink::_is_closed is duplicated with DataSink::_closed and
all sinks should call DataSink::close rather than set _closed
directly.
Fix for https://github.com/apache/incubator-doris/issues/8726.
---
be/src/exec/tablet_sink.cpp | 4 ++--
be/src/exec/tablet_sink.h | 2 --
be/src/runtime/data_stream_sender.cpp | 2 +-
be/src/runtime/export_sink.cpp | 5 ++++-
be/src/runtime/memory_scratch_sink.cpp | 3 +--
be/src/runtime/mysql_table_sink.cpp | 5 ++++-
be/src/runtime/odbc_table_sink.cpp | 5 ++++-
be/src/runtime/result_sink.cpp | 3 +--
be/src/vec/sink/result_sink.cpp | 3 +--
be/src/vec/sink/vdata_stream_sender.cpp | 6 ++++--
10 files changed, 22 insertions(+), 16 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index adeeb94..bc616b4 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -977,7 +977,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch*
input_batch) {
}
Status OlapTableSink::close(RuntimeState* state, Status close_status) {
- if (_is_closed) {
+ if (_closed) {
/// The close method may be called twice.
/// In the open_internal() method of plan_fragment_executor, close is
called once.
/// If an error occurs in this call, it will be called again in
fragment_mgr.
@@ -1090,7 +1090,7 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
_output_batch.reset();
_close_status = status;
- _is_closed = true;
+ DataSink::close(state, close_status);
return status;
}
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index e69cef0..6749602 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -484,8 +484,6 @@ protected:
int64_t _load_channel_timeout_s = 0;
int32_t _send_batch_parallelism = 1;
- // True if this sink has been closed once bool
- bool _is_closed = false;
// Save the status of close() method
Status _close_status;
diff --git a/be/src/runtime/data_stream_sender.cpp
b/be/src/runtime/data_stream_sender.cpp
index e3bd87b..cdd3742 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -642,7 +642,6 @@ Status DataStreamSender::close(RuntimeState* state, Status
exec_status) {
// TODO: only close channels that didn't have any errors
// make all channels close parallel
if (_closed) return Status::OK();
- _closed = true;
Status final_st = Status::OK();
for (int i = 0; i < _channels.size(); ++i) {
Status st = _channels[i]->close(state);
@@ -662,6 +661,7 @@ Status DataStreamSender::close(RuntimeState* state, Status
exec_status) {
}
Expr::close(_partition_expr_ctxs, state);
+ DataSink::close(state, exec_status);
return final_st;
}
diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp
index 37cb719..c9d529a 100644
--- a/be/src/runtime/export_sink.cpp
+++ b/be/src/runtime/export_sink.cpp
@@ -216,12 +216,15 @@ Status ExportSink::gen_row_buffer(TupleRow* row,
std::stringstream* ss) {
}
Status ExportSink::close(RuntimeState* state, Status exec_status) {
+ if (_closed) {
+ return Status::OK();
+ }
Expr::close(_output_expr_ctxs, state);
if (_file_writer != nullptr) {
_file_writer->close();
_file_writer = nullptr;
}
- return Status::OK();
+ return DataSink::close(state, exec_status);
}
Status ExportSink::open_file_writer() {
diff --git a/be/src/runtime/memory_scratch_sink.cpp
b/be/src/runtime/memory_scratch_sink.cpp
index ae3c020..03b87ba 100644
--- a/be/src/runtime/memory_scratch_sink.cpp
+++ b/be/src/runtime/memory_scratch_sink.cpp
@@ -96,8 +96,7 @@ Status MemoryScratchSink::close(RuntimeState* state, Status
exec_status) {
_queue->blocking_put(nullptr);
}
Expr::close(_output_expr_ctxs, state);
- _closed = true;
- return Status::OK();
+ return DataSink::close(state, exec_status);
}
} // namespace doris
diff --git a/be/src/runtime/mysql_table_sink.cpp
b/be/src/runtime/mysql_table_sink.cpp
index 0e5042c..3b17c1c 100644
--- a/be/src/runtime/mysql_table_sink.cpp
+++ b/be/src/runtime/mysql_table_sink.cpp
@@ -80,8 +80,11 @@ Status MysqlTableSink::send(RuntimeState* state, RowBatch*
batch) {
}
Status MysqlTableSink::close(RuntimeState* state, Status exec_status) {
+ if (_closed) {
+ return Status::OK();
+ }
Expr::close(_output_expr_ctxs, state);
- return Status::OK();
+ return DataSink::close(state, exec_status);
}
} // namespace doris
diff --git a/be/src/runtime/odbc_table_sink.cpp
b/be/src/runtime/odbc_table_sink.cpp
index c813f38..e95f893 100644
--- a/be/src/runtime/odbc_table_sink.cpp
+++ b/be/src/runtime/odbc_table_sink.cpp
@@ -89,11 +89,14 @@ Status OdbcTableSink::send(RuntimeState* state, RowBatch*
batch) {
}
Status OdbcTableSink::close(RuntimeState* state, Status exec_status) {
+ if (_closed) {
+ return Status::OK();
+ }
Expr::close(_output_expr_ctxs, state);
if (exec_status.ok() && _use_transaction) {
RETURN_IF_ERROR(_writer->finish_trans());
}
- return Status::OK();
+ return DataSink::close(state, exec_status);
}
} // namespace doris
diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp
index b83ae8a..b079037 100644
--- a/be/src/runtime/result_sink.cpp
+++ b/be/src/runtime/result_sink.cpp
@@ -135,8 +135,7 @@ Status ResultSink::close(RuntimeState* state, Status
exec_status) {
Expr::close(_output_expr_ctxs, state);
- _closed = true;
- return Status::OK();
+ return DataSink::close(state, exec_status);
}
void ResultSink::set_query_statistics(std::shared_ptr<QueryStatistics>
statistics) {
diff --git a/be/src/vec/sink/result_sink.cpp b/be/src/vec/sink/result_sink.cpp
index fda7702..bc71447 100644
--- a/be/src/vec/sink/result_sink.cpp
+++ b/be/src/vec/sink/result_sink.cpp
@@ -125,8 +125,7 @@ Status VResultSink::close(RuntimeState* state, Status
exec_status) {
state->fragment_instance_id());
VExpr::close(_output_vexpr_ctxs, state);
- _closed = true;
- return Status::OK();
+ return DataSink::close(state, exec_status);
}
void VResultSink::set_query_statistics(std::shared_ptr<QueryStatistics>
statistics) {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index da06fe7..31d8fd6 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -122,7 +122,9 @@ Status VDataStreamSender::Channel::send_block(PBlock*
block, bool eos) {
_closure->cntl.Reset();
}
VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id
- << " dest_node=" << _dest_node_id;
+ << " dest_node=" << _dest_node_id << " to_host=" <<
_brpc_dest_addr.hostname
+ << " _packet_seq=" << _packet_seq
+ << " row_desc=" << _row_desc.debug_string();
if (_is_transfer_chain && (_send_query_statistics_with_every_batch ||
eos)) {
auto statistic = _brpc_request.mutable_query_statistics();
_parent->_query_statistics->to_pb(statistic);
@@ -499,7 +501,6 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block) {
Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
if (_closed) return Status::OK();
- _closed = true;
Status final_st = Status::OK();
for (int i = 0; i < _channels.size(); ++i) {
@@ -519,6 +520,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status
exec_status) {
iter->close(state);
}
VExpr::close(_partition_expr_ctxs, state);
+ DataSink::close(state, exec_status);
return final_st;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]