This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bde84e4  [Bug][SQL] Fix bug that query failed when SQL contains Union 
and Colocate join (#4842)
bde84e4 is described below

commit bde84e4ae5d0597343e9dfb52a27e00c2d520137
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Nov 5 20:57:11 2020 +0800

    [Bug][SQL] Fix bug that query failed when SQL contains Union and Colocate 
join (#4842)
    
    SQL like:
    `select a join b union select c join d`;
    
    if a b is colocate join, and c d is also colocate join, the query may failed
    with error like:
    
    `failed to get tablet. tablet_id=26846, with schema_hash=398972982, 
reason=tablet does not exist`
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 65 ++++++++++++----
 .../java/org/apache/doris/qe/CoordinatorTest.java  | 86 +++++++++++++++-------
 2 files changed, 110 insertions(+), 41 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ef2bcd1..4d4f851 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -486,6 +486,7 @@ public class Coordinator {
                                     fragment.getFragmentId().asInt(), jobId);
                         }
                     }
+
                     futures.add(Pair.create(execState, 
execState.execRemoteFragmentAsync()));
 
                     backendId++;
@@ -1140,6 +1141,7 @@ public class Coordinator {
 
     private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, 
int parallelExecInstanceNum, FragmentExecParams params) {
         Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(fragmentId);
+        Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
 
         // 1. count each node in one fragment should scan how many tablet, 
gather them in one list
         Map<TNetworkAddress, List<Map<Integer, List<TScanRangeParams>>>> 
addressToScanRanges = Maps.newHashMap();
@@ -1147,10 +1149,18 @@ public class Coordinator {
             TNetworkAddress address = 
bucketSeqToAddress.get(scanRanges.getKey());
             Map<Integer, List<TScanRangeParams>> nodeScanRanges = 
scanRanges.getValue();
 
+            // We only care about the node scan ranges of scan nodes which 
belong to this fragment
+            Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = 
Maps.newHashMap();
+            for (Integer scanNodeId : nodeScanRanges.keySet()) {
+                if (scanNodeIds.contains(scanNodeId)) {
+                    filteredNodeScanRanges.put(scanNodeId, 
nodeScanRanges.get(scanNodeId));
+                }
+            }
+
             if (!addressToScanRanges.containsKey(address)) {
                 addressToScanRanges.put(address, Lists.newArrayList());
             }
-            addressToScanRanges.get(address).add(nodeScanRanges);
+            addressToScanRanges.get(address).add(filteredNodeScanRanges);
         }
 
         for (Map.Entry<TNetworkAddress, List<Map<Integer, 
List<TScanRangeParams>>>> addressScanRange : addressToScanRanges.entrySet()) {
@@ -1195,8 +1205,14 @@ public class Coordinator {
                 continue;
             }
 
-            FragmentScanRangeAssignment assignment =
-                    
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
+            Set<Integer> scanNodeIds = 
fragmentIdToScanNodeIds.get(scanNode.getFragmentId());
+            if (scanNodeIds == null) {
+                scanNodeIds = Sets.newHashSet();
+                fragmentIdToScanNodeIds.put(scanNode.getFragmentId(), 
scanNodeIds);
+            }
+            scanNodeIds.add(scanNode.getId().asInt());
+
+            FragmentScanRangeAssignment assignment = 
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
             if (isColocateJoin(scanNode.getFragment().getPlanRoot())) {
                 computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, 
assignment);
             } else if 
(bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(),
 scanNode.getFragment().getPlanRoot())) {
@@ -1427,6 +1443,7 @@ public class Coordinator {
             extends HashMap<TNetworkAddress, Map<Integer, 
List<TScanRangeParams>>> {
     }
 
+    // Bucket sequence -> (scan node id -> list of TScanRangeParams)
     class BucketSeqToScanRange extends HashMap<Integer, Map<Integer, 
List<TScanRangeParams>>> {
 
     }
@@ -1444,6 +1461,13 @@ public class Coordinator {
         // cache the bucketShuffleFragmentIds
         private Set<Integer> bucketShuffleFragmentIds = new HashSet<>();
 
+        private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds;
+
+        // TODO(cmy): Should refactor this Controller to unify bucket shuffle 
join and colocate join
+        public BucketShuffleJoinController(Map<PlanFragmentId, Set<Integer>> 
fragmentIdToScanNodeIds) {
+            this.fragmentIdToScanNodeIds = fragmentIdToScanNodeIds;
+        }
+
         // check whether the node fragment is bucket shuffle join fragment
         private boolean isBucketShuffleJoin(int fragmentId, PlanNode node) {
             if (ConnectContext.get() != null) {
@@ -1553,21 +1577,31 @@ public class Coordinator {
         private void computeInstanceParam(PlanFragmentId fragmentId, int 
parallelExecInstanceNum, FragmentExecParams params) {
             Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(fragmentId);
             BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
+            Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
 
             // 1. count each node in one fragment should scan how many tablet, 
gather them in one list
-            Map<TNetworkAddress, List<Map.Entry<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressToScanRanges = Maps.newHashMap();
+            Map<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressToScanRanges = Maps.newHashMap();
             for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> 
scanRanges : bucketSeqToScanRange.entrySet()) {
                 TNetworkAddress address = 
bucketSeqToAddress.get(scanRanges.getKey());
                 Map<Integer, List<TScanRangeParams>> nodeScanRanges = 
scanRanges.getValue();
 
+                // We only care about the node scan ranges of scan nodes which 
belong to this fragment
+                Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = 
Maps.newHashMap();
+                for (Integer scanNodeId : nodeScanRanges.keySet()) {
+                    if (scanNodeIds.contains(scanNodeId)) {
+                        filteredNodeScanRanges.put(scanNodeId, 
nodeScanRanges.get(scanNodeId));
+                    }
+                }
+                Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
filteredScanRanges = Pair.create(scanRanges.getKey(), filteredNodeScanRanges);
+
                 if (!addressToScanRanges.containsKey(address)) {
                     addressToScanRanges.put(address, Lists.newArrayList());
                 }
-                addressToScanRanges.get(address).add(scanRanges);
+                addressToScanRanges.get(address).add(filteredScanRanges);
             }
 
-            for (Map.Entry<TNetworkAddress, List<Map.Entry<Integer, 
Map<Integer, List<TScanRangeParams>>>>> addressScanRange : 
addressToScanRanges.entrySet()) {
-                List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>> 
scanRange = addressScanRange.getValue();
+            for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressScanRange : addressToScanRanges.entrySet()) {
+                List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> 
scanRange = addressScanRange.getValue();
                 int expectedInstanceNum = 1;
                 if (parallelExecInstanceNum > 1) {
                     //the scan instance num should not larger than the tablets 
num
@@ -1575,16 +1609,16 @@ public class Coordinator {
                 }
 
                 // 2. split how many scanRange one instance should scan
-                List<List<Map.Entry<Integer, Map<Integer, 
List<TScanRangeParams>>>>> perInstanceScanRanges = 
ListUtil.splitBySize(scanRange,
+                List<List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> perInstanceScanRanges = 
ListUtil.splitBySize(scanRange,
                         expectedInstanceNum);
 
-                // 3.constuct instanceExecParam add the scanRange should be 
scan by instance
-                for (List<Map.Entry<Integer, Map<Integer, 
List<TScanRangeParams>>>> perInstanceScanRange : perInstanceScanRanges) {
+                // 3.construct instanceExecParam add the scanRange should be 
scan by instance
+                for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> 
perInstanceScanRange : perInstanceScanRanges) {
                     FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
 
-                    for (Map.Entry<Integer, Map<Integer, 
List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
-                        instanceParam.addBucketSeq(nodeScanRangeMap.getKey());
-                        for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange : nodeScanRangeMap.getValue().entrySet()) {
+                    for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : perInstanceScanRange) {
+                        instanceParam.addBucketSeq(nodeScanRangeMap.first);
+                        for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange : nodeScanRangeMap.second.entrySet()) {
                             if 
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
                                 
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), 
nodeScanRange.getValue());
                             } else {
@@ -1598,11 +1632,13 @@ public class Coordinator {
         }
     }
 
-    private BucketShuffleJoinController bucketShuffleJoinController = new 
BucketShuffleJoinController();
 
     private BucketSeqToScanRange bucketSeqToScanRange = new 
BucketSeqToScanRange();
     private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
fragmentIdToSeqToAddressMap = Maps.newHashMap();
+    // cache the fragment id to its scan node ids. Used for colocate join.
+    private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = 
Maps.newHashMap();
     private Set<Integer> colocateFragmentIds = new HashSet<>();
+    private BucketShuffleJoinController bucketShuffleJoinController = new 
BucketShuffleJoinController(fragmentIdToScanNodeIds);
 
     // record backend execute state
     // TODO(zhaochun): add profile information and others
@@ -1951,3 +1987,4 @@ public class Coordinator {
     }
 }
 
+
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 11dc8d1..eb1bad3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.doris.qe;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import mockit.Mocked;
+
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.Expr;
@@ -34,33 +32,33 @@ import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.EmptySetNode;
-import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.HashJoinNode;
-import org.apache.doris.planner.MysqlScanNode;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.Planner;
-import org.apache.doris.planner.ScanNode;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPartitionType;
-import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class CoordinatorTest extends Coordinator {
     static Planner planner = new Planner();
@@ -87,13 +85,19 @@ public class CoordinatorTest extends Coordinator {
     public void testComputeColocateJoinInstanceParam()  {
         Coordinator coordinator = new Coordinator(context, analyzer, planner);
 
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        int scanNodeId = 1;
+        Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new 
HashMap<>();
+        fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+        fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+        Deencapsulation.setField(coordinator, "fragmentIdToScanNodeIds", 
fragmentIdToScanNodeIds);
+
         // 1. set fragmentToBucketSeqToAddress in coordinator
         Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
         TNetworkAddress address = new TNetworkAddress();
         for (int i = 0; i < 3; i++) {
             bucketSeqToAddress.put(i, address);
         }
-        PlanFragmentId planFragmentId = new PlanFragmentId(1);
         Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
fragmentToBucketSeqToAddress = new HashMap<>();
         fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
         Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", 
fragmentToBucketSeqToAddress);
@@ -101,7 +105,7 @@ public class CoordinatorTest extends Coordinator {
         // 2. set bucketSeqToScanRange in coordinator
         BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
         Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
-        ScanRangeMap.put(1, new ArrayList<>());
+        ScanRangeMap.put(scanNodeId, new ArrayList<>());
         for (int i = 0; i < 3; i++) {
             bucketSeqToScanRange.put(i, ScanRangeMap);
         }
@@ -126,9 +130,18 @@ public class CoordinatorTest extends Coordinator {
 
     @Test
     public void testIsBucketShuffleJoin()  {
-        Coordinator.BucketShuffleJoinController bucketShuffleJoinController = 
new Coordinator.BucketShuffleJoinController();
-
-        PlanNodeId testPaloNodeId = new PlanNodeId(-1);
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        int scanNodeId = 1;
+        // The "fragmentIdToScanNodeIds" we created here is useless in this 
test.
+        // It is only for creating the BucketShuffleJoinController.
+        // So the fragment id and scan node id in it is meaningless.
+        Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new 
HashMap<>();
+        fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+        fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+        Coordinator.BucketShuffleJoinController bucketShuffleJoinController
+                = new 
Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
+
+        PlanNodeId testPlanNodeId = new PlanNodeId(-1);
         TupleId testTupleId = new TupleId(-1);
         ArrayList<TupleId> tupleIdArrayList = new ArrayList<>();
         tupleIdArrayList.add(testTupleId);
@@ -137,8 +150,8 @@ public class CoordinatorTest extends Coordinator {
         BinaryPredicate binaryPredicate = new BinaryPredicate();
         testJoinexprs.add(binaryPredicate);
 
-        HashJoinNode hashJoinNode = new HashJoinNode(testPaloNodeId, new 
EmptySetNode(testPaloNodeId, tupleIdArrayList),
-                new EmptySetNode(testPaloNodeId, tupleIdArrayList) , new 
TableRef(), testJoinexprs, new ArrayList<>());
+        HashJoinNode hashJoinNode = new HashJoinNode(testPlanNodeId, new 
EmptySetNode(testPlanNodeId, tupleIdArrayList),
+                new EmptySetNode(testPlanNodeId, tupleIdArrayList), new 
TableRef(), testJoinexprs, new ArrayList<>());
         hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), 
hashJoinNode,
                 new 
DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
 
@@ -146,7 +159,7 @@ public class CoordinatorTest extends Coordinator {
         Assert.assertEquals(false,
                 Deencapsulation.invoke(bucketShuffleJoinController, 
"isBucketShuffleJoin", -1, hashJoinNode));
 
-        // the fragment id is differernt from hash join node
+        // the fragment id is different from hash join node
         hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2), 
hashJoinNode,
                 new 
DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
         
hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
@@ -158,7 +171,7 @@ public class CoordinatorTest extends Coordinator {
         Assert.assertEquals(true,
                 Deencapsulation.invoke(bucketShuffleJoinController, 
"isBucketShuffleJoin", -1, hashJoinNode));
 
-        // the framgent id is in cache, so not do check node again
+        // the fragment id is in cache, so not do check node again
         Assert.assertEquals(true,
                 Deencapsulation.invoke(bucketShuffleJoinController, 
"isBucketShuffleJoin", -1));
 
@@ -166,7 +179,13 @@ public class CoordinatorTest extends Coordinator {
 
     @Test
     public void testComputeScanRangeAssignmentByBucketq()  {
-        Coordinator.BucketShuffleJoinController bucketShuffleJoinController = 
new Coordinator.BucketShuffleJoinController();
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        int scanNodeId = 1;
+        Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new 
HashMap<>();
+        fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+        fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+        Coordinator.BucketShuffleJoinController bucketShuffleJoinController
+                = new 
Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
 
         // init olap scan node of bucket shuffle join
         TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
@@ -175,7 +194,7 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(olapTable, "defaultDistributionInfo", 
hashDistributionInfo);
         tupleDescriptor.setTable(olapTable);
 
-        OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1), 
tupleDescriptor, "test");
+        OlapScanNode olapScanNode = new OlapScanNode(new 
PlanNodeId(scanNodeId), tupleDescriptor, "test");
         ArrayListMultimap<Integer, TScanRangeLocations> bucketseq2localtion = 
ArrayListMultimap.create();
 
         // each olaptable bucket have the same TScanRangeLocations, be id is 
{0, 1, 2}
@@ -196,10 +215,9 @@ public class CoordinatorTest extends Coordinator {
         }
 
         Deencapsulation.setField(olapScanNode, "bucketSeq2locations", 
bucketseq2localtion);
-        olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1), 
olapScanNode,
+        olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode,
                 new DataPartition(TPartitionType.UNPARTITIONED)));
 
-
         // init all backend
         Backend backend0 = new Backend();
         backend0.setAlive(true);
@@ -240,7 +258,13 @@ public class CoordinatorTest extends Coordinator {
 
     @Test
     public void testComputeScanRangeAssignmentByBucket()  {
-        Coordinator.BucketShuffleJoinController bucketShuffleJoinController = 
new Coordinator.BucketShuffleJoinController();
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        int scanNodeId = 1;
+        Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new 
HashMap<>();
+        fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+        fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+        Coordinator.BucketShuffleJoinController bucketShuffleJoinController
+                = new 
Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
 
         // init olap scan node of bucket shuffle join
         TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
@@ -249,7 +273,7 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(olapTable, "defaultDistributionInfo", 
hashDistributionInfo);
         tupleDescriptor.setTable(olapTable);
 
-        OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1), 
tupleDescriptor, "test");
+        OlapScanNode olapScanNode = new OlapScanNode(new 
PlanNodeId(scanNodeId), tupleDescriptor, "test");
         ArrayListMultimap<Integer, TScanRangeLocations> bucketseq2localtion = 
ArrayListMultimap.create();
 
         // each olaptable bucket have the same TScanRangeLocations, be id is 
{0, 1, 2}
@@ -270,7 +294,7 @@ public class CoordinatorTest extends Coordinator {
         }
 
         Deencapsulation.setField(olapScanNode, "bucketSeq2locations", 
bucketseq2localtion);
-        olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1), 
olapScanNode,
+        olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode,
                 new DataPartition(TPartitionType.UNPARTITIONED)));
 
 
@@ -315,7 +339,15 @@ public class CoordinatorTest extends Coordinator {
 
     @Test
     public void testComputeBucketShuffleJoinInstanceParam()  {
-        Coordinator.BucketShuffleJoinController bucketShuffleJoinController = 
new Coordinator.BucketShuffleJoinController();
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        int scanNodeId = 1;
+
+        // set fragment id to scan node ids map
+        Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new 
HashMap<>();
+        fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+        fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+        Coordinator.BucketShuffleJoinController bucketShuffleJoinController
+                = new 
Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
 
         // 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController
         Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
@@ -323,7 +355,6 @@ public class CoordinatorTest extends Coordinator {
         for (int i = 0; i < 3; i++) {
             bucketSeqToAddress.put(i, address);
         }
-        PlanFragmentId planFragmentId = new PlanFragmentId(1);
         Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
fragmentToBucketSeqToAddress = new HashMap<>();
         fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
         Deencapsulation.setField(bucketShuffleJoinController, 
"fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress);
@@ -332,13 +363,14 @@ public class CoordinatorTest extends Coordinator {
         Map<PlanFragmentId, BucketSeqToScanRange> 
fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
         BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
         Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
-        ScanRangeMap.put(1, new ArrayList<>());
+        ScanRangeMap.put(scanNodeId, new ArrayList<>());
         for (int i = 0; i < 3; i++) {
             bucketSeqToScanRange.put(i, ScanRangeMap);
         }
         fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, 
bucketSeqToScanRange);
         Deencapsulation.setField(bucketShuffleJoinController, 
"fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
 
+
         FragmentExecParams params = new FragmentExecParams(null);
         Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params);
         Assert.assertEquals(1, params.instanceExecParams.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to