This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e3ab2caef8 [improvement](sink) Support local exchange for multi
fragment instances (#12017)
e3ab2caef8 is described below
commit e3ab2caef8cdc3872cf42ce709b3e252b9505bd3
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Aug 25 19:28:23 2022 +0800
[improvement](sink) Support local exchange for multi fragment instances
(#12017)
---
be/src/vec/sink/vdata_stream_sender.cpp | 21 ++++++++++++++-------
be/src/vec/sink/vdata_stream_sender.h | 3 +++
.../java/org/apache/doris/qe/SessionVariable.java | 10 ++++++++++
gensrc/thrift/PaloInternalService.thrift | 2 ++
4 files changed, 29 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 00fd63c692..b2ef88af49 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -76,6 +76,10 @@ Status VDataStreamSender::Channel::init(RuntimeState* state)
{
return Status::InternalError(msg);
}
+ if (state->query_options().__isset.enable_local_exchange) {
+ _enable_local_exchange = state->query_options().enable_local_exchange;
+ }
+
// In bucket shuffle join will set fragment_instance_id (-1, -1)
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
// so the empty channel not need call function close_internal()
@@ -85,11 +89,11 @@ Status VDataStreamSender::Channel::init(RuntimeState*
state) {
}
Status VDataStreamSender::Channel::send_current_block(bool eos) {
- // TODO: Now, local exchange will cause the performance problem is in a
multi-threaded scenario
- // so this feature is turned off here. We need to re-examine this logic
- // if (is_local()) {
- // return send_local_block(eos);
- // }
+ // FIXME: Now, local exchange will cause the performance problem is in a
multi-threaded scenario
+ // so this feature is turned off here by default. We need to re-examine
this logic
+ if (_enable_local_exchange && is_local()) {
+ return send_local_block(eos);
+ }
auto block = _mutable_block->to_block();
RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block));
block.clear_column_data();
@@ -103,15 +107,16 @@ Status VDataStreamSender::Channel::send_local_block(bool
eos) {
std::shared_ptr<VDataStreamRecvr> recvr =
_parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id,
_dest_node_id);
+ Block block = _mutable_block->to_block();
+ _mutable_block->set_muatable_columns(block.clone_empty_columns());
if (recvr != nullptr) {
- Block block = _mutable_block->to_block();
COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
+ COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
recvr->add_block(&block, _parent->_sender_id, true);
if (eos) {
recvr->remove_sender(_parent->_sender_id, _be_number);
}
}
- _mutable_block->clear();
return Status::OK();
}
@@ -121,6 +126,7 @@ Status VDataStreamSender::Channel::send_local_block(Block*
block) {
_dest_node_id);
if (recvr != nullptr) {
COUNTER_UPDATE(_parent->_local_bytes_send_counter, block->bytes());
+ COUNTER_UPDATE(_parent->_local_sent_rows, block->rows());
recvr->add_block(block, _parent->_sender_id, false);
}
return Status::OK();
@@ -426,6 +432,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
_uncompressed_bytes_counter = ADD_COUNTER(profile(),
"UncompressedRowBatchSize", TUnit::BYTES);
_ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT);
+ _local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT);
_serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
_compress_timer = ADD_TIMER(profile(), "CompressTime");
_overall_throughput = profile()->add_derived_counter(
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index be9861caf5..c03eb0804c 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -135,6 +135,7 @@ protected:
RuntimeProfile::Counter* _bytes_sent_counter;
RuntimeProfile::Counter* _uncompressed_bytes_counter;
RuntimeProfile::Counter* _ignore_rows;
+ RuntimeProfile::Counter* _local_sent_rows;
std::unique_ptr<MemTracker> _mem_tracker;
@@ -302,6 +303,8 @@ private:
PBlock* _ch_cur_pb_block = nullptr;
PBlock _ch_pb_block1;
PBlock _ch_pb_block2;
+
+ bool _enable_local_exchange = false;
};
template <typename Channels, typename HashVals>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 70be8869d2..026920ccb5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -209,6 +209,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC =
"fragment_transmission_compression_codec";
+ public static final String ENABLE_LOCAL_EXCHANGE = "enable_local_exchange";
+
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field,
String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -525,6 +527,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_FUNCTION_PUSHDOWN)
public boolean enableFunctionPushdown;
+ @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE)
+ public boolean enableLocalExchange = false;
+
public String getBlockEncryptionMode() {
return blockEncryptionMode;
}
@@ -918,6 +923,10 @@ public class SessionVariable implements Serializable,
Writable {
return this.enableFunctionPushdown;
}
+ public boolean getEnableLocalExchange() {
+ return enableLocalExchange;
+ }
+
/**
* getInsertVisibleTimeoutMs.
**/
@@ -1113,6 +1122,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
+ tResult.setEnableLocalExchange(enableLocalExchange);
return tResult;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 92d41abbd7..b9ea800554 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -167,6 +167,8 @@ struct TQueryOptions {
45: optional bool enable_function_pushdown;
46: optional string fragment_transmission_compression_codec;
+
+ 47: optional bool enable_local_exchange;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]