This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 92e0221bf77 [branch-3.0](pick) pick #44421 #44459 #44545 (#44591)
92e0221bf77 is described below

commit 92e0221bf77c54e72f27d4f91dda33e3db1bd162
Author: Gabriel <[email protected]>
AuthorDate: Tue Nov 26 17:48:03 2024 +0800

    [branch-3.0](pick) pick #44421 #44459 #44545 (#44591)
    
    pick #44421 #44459 #44545
---
 be/src/exprs/runtime_filter.cpp                          |  7 ++++---
 be/src/pipeline/local_exchange/local_exchanger.h         | 16 +++++++---------
 be/src/pipeline/pipeline_fragment_context.cpp            |  2 +-
 be/src/runtime/fragment_mgr.cpp                          |  8 ++++----
 .../src/main/java/org/apache/doris/qe/Coordinator.java   | 14 ++++++--------
 .../java/org/apache/doris/qe/NereidsCoordinator.java     |  2 --
 gensrc/thrift/PaloInternalService.thrift                 |  4 ++--
 7 files changed, 24 insertions(+), 29 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 78d13aac279..7c5de4891f5 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1259,7 +1259,8 @@ void IRuntimeFilter::update_state() {
     // In pipelineX, runtime filters will be ready or timeout before open 
phase.
     if (expected == RuntimeFilterState::NOT_READY) {
         DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
-        COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
+        COUNTER_SET(_wait_timer,
+                    int64_t((MonotonicMillis() - registration_time_) * 
NANOS_PER_MILLIS));
         _rf_state_atomic = RuntimeFilterState::TIME_OUT;
     }
 }
@@ -1278,7 +1279,7 @@ PrimitiveType IRuntimeFilter::column_type() const {
 
 void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
-    COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
+    COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) 
* NANOS_PER_MILLIS));
     _rf_state_atomic.store(RuntimeFilterState::READY);
     if (!_filter_timer.empty()) {
         for (auto& timer : _filter_timer) {
@@ -1524,7 +1525,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* 
parent_profile) {
 void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t 
local_merge_time) {
     _profile->add_info_string("RealRuntimeFilterType", 
to_string(_wrapper->get_real_type()));
     _profile->add_info_string("LocalMergeTime",
-                              std::to_string(local_merge_time / 1000000000.0) 
+ " s");
+                              std::to_string((double)local_merge_time / 
NANOS_PER_SEC) + " s");
 }
 
 std::string IRuntimeFilter::debug_string() const {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index e8aa35c2f7c..af95e5348c8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -52,7 +52,7 @@ public:
     ExchangerBase(int running_sink_operators, int num_sources, int 
num_partitions,
                   int free_block_limit)
             : _running_sink_operators(running_sink_operators),
-              _running_source_operators(num_partitions),
+              _running_source_operators(num_sources),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
               _num_sources(num_sources),
@@ -201,10 +201,13 @@ struct BlockWrapper {
 class ShuffleExchanger : public Exchanger<PartitionedBlock> {
 public:
     ENABLE_FACTORY_CREATOR(ShuffleExchanger);
-    ShuffleExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<PartitionedBlock>(running_sink_operators, 
num_partitions,
+    ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
+                     int free_block_limit)
+            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
                                           free_block_limit) {
-        _data_queue.resize(num_partitions);
+        DCHECK_GT(num_partitions, 0);
+        DCHECK_GT(num_sources, 0);
+        _data_queue.resize(num_sources);
     }
     ~ShuffleExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -216,10 +219,6 @@ public:
     ExchangeType get_type() const override { return 
ExchangeType::HASH_SHUFFLE; }
 
 protected:
-    ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                     int free_block_limit)
-            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
-                                          free_block_limit) {}
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, LocalExchangeSinkLocalState& 
local_state);
 };
@@ -231,7 +230,6 @@ class BucketShuffleExchanger final : public 
ShuffleExchanger {
             : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions,
                                free_block_limit) {
         DCHECK_GT(num_partitions, 0);
-        _data_queue.resize(std::max(num_partitions, num_sources));
     }
     ~BucketShuffleExchanger() override = default;
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 553e059d1a5..01c14f1ddb3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -747,7 +747,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     switch (data_distribution.distribution_type) {
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
-                std::max(cur_pipe->num_tasks(), _num_instances),
+                std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances,
                 use_global_hash_shuffle ? _total_instances : _num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fbbc079c30a..81f2416d3b3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1261,8 +1261,8 @@ Status FragmentMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request) {
         if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
             query_ctx = q_ctx;
         } else {
-            return Status::InvalidArgument(
-                    "Sync filter size failed: Query context (query-id: {}) not 
found",
+            return Status::EndOfFile(
+                    "Sync filter size failed: Query context (query-id: {}) 
already finished",
                     queryid.to_string());
         }
     }
@@ -1282,8 +1282,8 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
         if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
             query_ctx = q_ctx;
         } else {
-            return Status::InvalidArgument(
-                    "Merge filter size failed: Query context (query-id: {}) 
not found",
+            return Status::EndOfFile(
+                    "Merge filter size failed: Query context (query-id: {}) 
already finished",
                     queryid.to_string());
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8be50772094..ca8a397bd5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1938,7 +1938,6 @@ public class Coordinator implements CoordInterface {
 
                             FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, key, 0, params);
                             instanceParam.perNodeScanRanges.put(planNodeId, 
scanRangeParams);
-                            instanceParam.perNodeSharedScans.put(planNodeId, 
sharedScan);
                             params.instanceExecParams.add(instanceParam);
                         }
                         params.ignoreDataDistribution = sharedScan;
@@ -2760,13 +2759,11 @@ public class Coordinator implements CoordInterface {
                         null, addressScanRange.getKey(), 0, params);
 
                 for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : scanRange) {
-                    instanceParam.addBucketSeq(nodeScanRangeMap.first);
                     for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange
                             : nodeScanRangeMap.second.entrySet()) {
                         if 
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
                             range.put(nodeScanRange.getKey(), 
Lists.newArrayList());
                             
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), 
Lists.newArrayList());
-                            
instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true);
                         }
                         
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
                         
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
@@ -2778,6 +2775,12 @@ public class Coordinator implements CoordInterface {
                     params.instanceExecParams.add(new FInstanceExecParam(
                             null, addressScanRange.getKey(), 0, params));
                 }
+                int index = 0;
+                for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : scanRange) {
+                    params.instanceExecParams.get(index % 
params.instanceExecParams.size())
+                            .addBucketSeq(nodeScanRangeMap.first);
+                    index++;
+                }
             } else {
                 int expectedInstanceNum = 1;
                 if (parallelExecInstanceNum > 1) {
@@ -3111,10 +3114,8 @@ public class Coordinator implements CoordInterface {
             for (int i = 0; i < instanceExecParams.size(); ++i) {
                 final FInstanceExecParam instanceExecParam = 
instanceExecParams.get(i);
                 Map<Integer, List<TScanRangeParams>> scanRanges = 
instanceExecParam.perNodeScanRanges;
-                Map<Integer, Boolean> perNodeSharedScans = 
instanceExecParam.perNodeSharedScans;
                 if (scanRanges == null) {
                     scanRanges = Maps.newHashMap();
-                    perNodeSharedScans = Maps.newHashMap();
                 }
                 if (!res.containsKey(instanceExecParam.host)) {
                     TPipelineFragmentParams params = new 
TPipelineFragmentParams();
@@ -3142,7 +3143,6 @@ public class Coordinator implements CoordInterface {
 
                     params.setFileScanParams(fileScanRangeParamsMap);
                     params.setNumBuckets(fragment.getBucketNum());
-                    params.setPerNodeSharedScans(perNodeSharedScans);
                     params.setTotalInstances(instanceExecParams.size());
                     if (ignoreDataDistribution) {
                         params.setParallelInstances(parallelTasksNum);
@@ -3167,7 +3167,6 @@ public class Coordinator implements CoordInterface {
 
                 
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
                 localParams.setPerNodeScanRanges(scanRanges);
-                localParams.setPerNodeSharedScans(perNodeSharedScans);
                 localParams.setSenderId(i);
                 localParams.setBackendNum(backendNum++);
                 localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
@@ -3315,7 +3314,6 @@ public class Coordinator implements CoordInterface {
         TUniqueId instanceId;
         TNetworkAddress host;
         Map<Integer, List<TScanRangeParams>> perNodeScanRanges = 
Maps.newHashMap();
-        Map<Integer, Boolean> perNodeSharedScans = Maps.newHashMap();
 
         int perFragmentInstanceIdx;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index 4f5af3762c5..6cb42a116e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -140,7 +140,6 @@ public class NereidsCoordinator extends Coordinator {
             ScanNode scanNode = scanNodeIdToReplicaIds.getKey();
             ScanRanges scanReplicas = scanNodeIdToReplicaIds.getValue();
             instanceExecParam.perNodeScanRanges.put(scanNode.getId().asInt(), 
scanReplicas.params);
-            instanceExecParam.perNodeSharedScans.put(scanNode.getId().asInt(), 
isShareScan);
         }
     }
 
@@ -157,7 +156,6 @@ public class NereidsCoordinator extends Coordinator {
                 List<TScanRangeParams> scanBucketTablets = 
instanceExecParam.perNodeScanRanges.computeIfAbsent(
                         scanNode.getId().asInt(), id -> Lists.newArrayList());
                 scanBucketTablets.addAll(scanRanges.params);
-                
instanceExecParam.perNodeSharedScans.put(scanNode.getId().asInt(), isShareScan);
 
                 if (scanNode instanceof OlapScanNode) {
                     OlapScanNode olapScanNode = (OlapScanNode) scanNode;
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 11924f7ba8b..f3f9e7668d1 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -772,7 +772,7 @@ struct TPipelineInstanceParams {
   4: optional i32 sender_id
   5: optional TRuntimeFilterParams runtime_filter_params
   6: optional i32 backend_num
-  7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
+  7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
   8: optional list<i32> topn_filter_source_node_ids // deprecated after we set 
topn_filter_descs
   9: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs
 }
@@ -816,7 +816,7 @@ struct TPipelineFragmentParams {
   33: optional i32 num_local_sink
   34: optional i32 num_buckets
   35: optional map<i32, i32> bucket_seq_to_instance_idx
-  36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
+  36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
   37: optional i32 parallel_instances
   38: optional i32 total_instances
   39: optional map<i32, i32> shuffle_idx_to_instance_idx


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to