This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa90a3bce5 [pipelineX](local shuffle) Fix bucket hash shuffle (#28202)
5aa90a3bce5 is described below
commit 5aa90a3bce5ac55ef480c940a01667774064d10c
Author: Gabriel <[email protected]>
AuthorDate: Sun Dec 10 00:35:00 2023 +0800
[pipelineX](local shuffle) Fix bucket hash shuffle (#28202)
---
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a2877e7538c..cd498ccf9bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3596,6 +3596,7 @@ public class Coordinator implements CoordInterface {
}
Map<TNetworkAddress, TPipelineFragmentParams> res = new HashMap();
+ Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam =
instanceExecParams.get(i);
if (!res.containsKey(instanceExecParam.host)) {
@@ -3625,10 +3626,16 @@ public class Coordinator implements CoordInterface {
params.setNumBuckets(fragment.getBucketNum());
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer,
Integer>());
+ instanceIdx.put(instanceExecParam.host, 0);
}
+ // Set each bucket belongs to which instance on this BE.
+ // This is used for LocalExchange(BUCKET_HASH_SHUFFLE).
+ int instanceId = instanceIdx.get(instanceExecParam.host);
for (int bucket : instanceExecParam.bucketSeqSet) {
-
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, i);
+
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket,
instanceId);
+
}
+ instanceIdx.replace(instanceExecParam.host, ++instanceId);
TPipelineFragmentParams params =
res.get(instanceExecParam.host);
TPipelineInstanceParams localParams = new
TPipelineInstanceParams();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]