This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e3ab2caef8 [improvement](sink) Support local exchange for multi 
fragment instances (#12017)
e3ab2caef8 is described below

commit e3ab2caef8cdc3872cf42ce709b3e252b9505bd3
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Aug 25 19:28:23 2022 +0800

    [improvement](sink) Support local exchange for multi fragment instances 
(#12017)
---
 be/src/vec/sink/vdata_stream_sender.cpp             | 21 ++++++++++++++-------
 be/src/vec/sink/vdata_stream_sender.h               |  3 +++
 .../java/org/apache/doris/qe/SessionVariable.java   | 10 ++++++++++
 gensrc/thrift/PaloInternalService.thrift            |  2 ++
 4 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 00fd63c692..b2ef88af49 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -76,6 +76,10 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) 
{
         return Status::InternalError(msg);
     }
 
+    if (state->query_options().__isset.enable_local_exchange) {
+        _enable_local_exchange = state->query_options().enable_local_exchange;
+    }
+
     // In bucket shuffle join will set fragment_instance_id (-1, -1)
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
@@ -85,11 +89,11 @@ Status VDataStreamSender::Channel::init(RuntimeState* 
state) {
 }
 
 Status VDataStreamSender::Channel::send_current_block(bool eos) {
-    // TODO: Now, local exchange will cause the performance problem is in a 
multi-threaded scenario
-    //  so this feature is turned off here. We need to re-examine this logic
-    //    if (is_local()) {
-    //        return send_local_block(eos);
-    //    }
+    // FIXME: Now, local exchange will cause the performance problem is in a 
multi-threaded scenario
+    // so this feature is turned off here by default. We need to re-examine 
this logic
+    if (_enable_local_exchange && is_local()) {
+        return send_local_block(eos);
+    }
     auto block = _mutable_block->to_block();
     RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block));
     block.clear_column_data();
@@ -103,15 +107,16 @@ Status VDataStreamSender::Channel::send_local_block(bool 
eos) {
     std::shared_ptr<VDataStreamRecvr> recvr =
             
_parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id,
                                                                     
_dest_node_id);
+    Block block = _mutable_block->to_block();
+    _mutable_block->set_muatable_columns(block.clone_empty_columns());
     if (recvr != nullptr) {
-        Block block = _mutable_block->to_block();
         COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
+        COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
         recvr->add_block(&block, _parent->_sender_id, true);
         if (eos) {
             recvr->remove_sender(_parent->_sender_id, _be_number);
         }
     }
-    _mutable_block->clear();
     return Status::OK();
 }
 
@@ -121,6 +126,7 @@ Status VDataStreamSender::Channel::send_local_block(Block* 
block) {
                                                                     
_dest_node_id);
     if (recvr != nullptr) {
         COUNTER_UPDATE(_parent->_local_bytes_send_counter, block->bytes());
+        COUNTER_UPDATE(_parent->_local_sent_rows, block->rows());
         recvr->add_block(block, _parent->_sender_id, false);
     }
     return Status::OK();
@@ -426,6 +432,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
     _uncompressed_bytes_counter = ADD_COUNTER(profile(), 
"UncompressedRowBatchSize", TUnit::BYTES);
     _ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT);
+    _local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT);
     _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
     _compress_timer = ADD_TIMER(profile(), "CompressTime");
     _overall_throughput = profile()->add_derived_counter(
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index be9861caf5..c03eb0804c 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -135,6 +135,7 @@ protected:
     RuntimeProfile::Counter* _bytes_sent_counter;
     RuntimeProfile::Counter* _uncompressed_bytes_counter;
     RuntimeProfile::Counter* _ignore_rows;
+    RuntimeProfile::Counter* _local_sent_rows;
 
     std::unique_ptr<MemTracker> _mem_tracker;
 
@@ -302,6 +303,8 @@ private:
     PBlock* _ch_cur_pb_block = nullptr;
     PBlock _ch_pb_block1;
     PBlock _ch_pb_block2;
+
+    bool _enable_local_exchange = false;
 };
 
 template <typename Channels, typename HashVals>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 70be8869d2..026920ccb5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -209,6 +209,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC = 
"fragment_transmission_compression_codec";
 
+    public static final String ENABLE_LOCAL_EXCHANGE = "enable_local_exchange";
+
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, 
String>();
     // check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -525,6 +527,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_FUNCTION_PUSHDOWN)
     public boolean enableFunctionPushdown;
 
+    @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE)
+    public boolean enableLocalExchange = false;
+
     public String getBlockEncryptionMode() {
         return blockEncryptionMode;
     }
@@ -918,6 +923,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return this.enableFunctionPushdown;
     }
 
+    public boolean getEnableLocalExchange() {
+        return enableLocalExchange;
+    }
+
     /**
      * getInsertVisibleTimeoutMs.
      **/
@@ -1113,6 +1122,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setEnableFunctionPushdown(enableFunctionPushdown);
         
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
+        tResult.setEnableLocalExchange(enableLocalExchange);
 
         return tResult;
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 92d41abbd7..b9ea800554 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -167,6 +167,8 @@ struct TQueryOptions {
   45: optional bool enable_function_pushdown;
 
   46: optional string fragment_transmission_compression_codec;
+
+  47: optional bool enable_local_exchange;
 }
     
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to