This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new bde84e4 [Bug][SQL] Fix bug that query failed when SQL contains Union
and Colocate join (#4842)
bde84e4 is described below
commit bde84e4ae5d0597343e9dfb52a27e00c2d520137
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Nov 5 20:57:11 2020 +0800
[Bug][SQL] Fix bug that query failed when SQL contains Union and Colocate
join (#4842)
SQL like:
`select a join b union select c join d`;
if a b is colocate join, and c d is also colocate join, the query may failed
with error like:
`failed to get tablet. tablet_id=26846, with schema_hash=398972982,
reason=tablet does not exist`
---
.../main/java/org/apache/doris/qe/Coordinator.java | 65 ++++++++++++----
.../java/org/apache/doris/qe/CoordinatorTest.java | 86 +++++++++++++++-------
2 files changed, 110 insertions(+), 41 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ef2bcd1..4d4f851 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -486,6 +486,7 @@ public class Coordinator {
fragment.getFragmentId().asInt(), jobId);
}
}
+
futures.add(Pair.create(execState,
execState.execRemoteFragmentAsync()));
backendId++;
@@ -1140,6 +1141,7 @@ public class Coordinator {
private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
int parallelExecInstanceNum, FragmentExecParams params) {
Map<Integer, TNetworkAddress> bucketSeqToAddress =
fragmentIdToSeqToAddressMap.get(fragmentId);
+ Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
// 1. count each node in one fragment should scan how many tablet,
gather them in one list
Map<TNetworkAddress, List<Map<Integer, List<TScanRangeParams>>>>
addressToScanRanges = Maps.newHashMap();
@@ -1147,10 +1149,18 @@ public class Coordinator {
TNetworkAddress address =
bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges =
scanRanges.getValue();
+ // We only care about the node scan ranges of scan nodes which
belong to this fragment
+ Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges =
Maps.newHashMap();
+ for (Integer scanNodeId : nodeScanRanges.keySet()) {
+ if (scanNodeIds.contains(scanNodeId)) {
+ filteredNodeScanRanges.put(scanNodeId,
nodeScanRanges.get(scanNodeId));
+ }
+ }
+
if (!addressToScanRanges.containsKey(address)) {
addressToScanRanges.put(address, Lists.newArrayList());
}
- addressToScanRanges.get(address).add(nodeScanRanges);
+ addressToScanRanges.get(address).add(filteredNodeScanRanges);
}
for (Map.Entry<TNetworkAddress, List<Map<Integer,
List<TScanRangeParams>>>> addressScanRange : addressToScanRanges.entrySet()) {
@@ -1195,8 +1205,14 @@ public class Coordinator {
continue;
}
- FragmentScanRangeAssignment assignment =
-
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
+ Set<Integer> scanNodeIds =
fragmentIdToScanNodeIds.get(scanNode.getFragmentId());
+ if (scanNodeIds == null) {
+ scanNodeIds = Sets.newHashSet();
+ fragmentIdToScanNodeIds.put(scanNode.getFragmentId(),
scanNodeIds);
+ }
+ scanNodeIds.add(scanNode.getId().asInt());
+
+ FragmentScanRangeAssignment assignment =
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
if (isColocateJoin(scanNode.getFragment().getPlanRoot())) {
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode,
assignment);
} else if
(bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(),
scanNode.getFragment().getPlanRoot())) {
@@ -1427,6 +1443,7 @@ public class Coordinator {
extends HashMap<TNetworkAddress, Map<Integer,
List<TScanRangeParams>>> {
}
+ // Bucket sequence -> (scan node id -> list of TScanRangeParams)
class BucketSeqToScanRange extends HashMap<Integer, Map<Integer,
List<TScanRangeParams>>> {
}
@@ -1444,6 +1461,13 @@ public class Coordinator {
// cache the bucketShuffleFragmentIds
private Set<Integer> bucketShuffleFragmentIds = new HashSet<>();
+ private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds;
+
+ // TODO(cmy): Should refactor this Controller to unify bucket shuffle
join and colocate join
+ public BucketShuffleJoinController(Map<PlanFragmentId, Set<Integer>>
fragmentIdToScanNodeIds) {
+ this.fragmentIdToScanNodeIds = fragmentIdToScanNodeIds;
+ }
+
// check whether the node fragment is bucket shuffle join fragment
private boolean isBucketShuffleJoin(int fragmentId, PlanNode node) {
if (ConnectContext.get() != null) {
@@ -1553,21 +1577,31 @@ public class Coordinator {
private void computeInstanceParam(PlanFragmentId fragmentId, int
parallelExecInstanceNum, FragmentExecParams params) {
Map<Integer, TNetworkAddress> bucketSeqToAddress =
fragmentIdToSeqToAddressMap.get(fragmentId);
BucketSeqToScanRange bucketSeqToScanRange =
fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
+ Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
// 1. count each node in one fragment should scan how many tablet,
gather them in one list
- Map<TNetworkAddress, List<Map.Entry<Integer, Map<Integer,
List<TScanRangeParams>>>>> addressToScanRanges = Maps.newHashMap();
+ Map<TNetworkAddress, List<Pair<Integer, Map<Integer,
List<TScanRangeParams>>>>> addressToScanRanges = Maps.newHashMap();
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>
scanRanges : bucketSeqToScanRange.entrySet()) {
TNetworkAddress address =
bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges =
scanRanges.getValue();
+ // We only care about the node scan ranges of scan nodes which
belong to this fragment
+ Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges =
Maps.newHashMap();
+ for (Integer scanNodeId : nodeScanRanges.keySet()) {
+ if (scanNodeIds.contains(scanNodeId)) {
+ filteredNodeScanRanges.put(scanNodeId,
nodeScanRanges.get(scanNodeId));
+ }
+ }
+ Pair<Integer, Map<Integer, List<TScanRangeParams>>>
filteredScanRanges = Pair.create(scanRanges.getKey(), filteredNodeScanRanges);
+
if (!addressToScanRanges.containsKey(address)) {
addressToScanRanges.put(address, Lists.newArrayList());
}
- addressToScanRanges.get(address).add(scanRanges);
+ addressToScanRanges.get(address).add(filteredScanRanges);
}
- for (Map.Entry<TNetworkAddress, List<Map.Entry<Integer,
Map<Integer, List<TScanRangeParams>>>>> addressScanRange :
addressToScanRanges.entrySet()) {
- List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>
scanRange = addressScanRange.getValue();
+ for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer,
List<TScanRangeParams>>>>> addressScanRange : addressToScanRanges.entrySet()) {
+ List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>
scanRange = addressScanRange.getValue();
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets
num
@@ -1575,16 +1609,16 @@ public class Coordinator {
}
// 2. split how many scanRange one instance should scan
- List<List<Map.Entry<Integer, Map<Integer,
List<TScanRangeParams>>>>> perInstanceScanRanges =
ListUtil.splitBySize(scanRange,
+ List<List<Pair<Integer, Map<Integer,
List<TScanRangeParams>>>>> perInstanceScanRanges =
ListUtil.splitBySize(scanRange,
expectedInstanceNum);
- // 3.constuct instanceExecParam add the scanRange should be
scan by instance
- for (List<Map.Entry<Integer, Map<Integer,
List<TScanRangeParams>>>> perInstanceScanRange : perInstanceScanRanges) {
+ // 3.construct instanceExecParam add the scanRange should be
scan by instance
+ for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>
perInstanceScanRange : perInstanceScanRanges) {
FInstanceExecParam instanceParam = new
FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
- for (Map.Entry<Integer, Map<Integer,
List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
- instanceParam.addBucketSeq(nodeScanRangeMap.getKey());
- for (Map.Entry<Integer, List<TScanRangeParams>>
nodeScanRange : nodeScanRangeMap.getValue().entrySet()) {
+ for (Pair<Integer, Map<Integer, List<TScanRangeParams>>>
nodeScanRangeMap : perInstanceScanRange) {
+ instanceParam.addBucketSeq(nodeScanRangeMap.first);
+ for (Map.Entry<Integer, List<TScanRangeParams>>
nodeScanRange : nodeScanRangeMap.second.entrySet()) {
if
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(),
nodeScanRange.getValue());
} else {
@@ -1598,11 +1632,13 @@ public class Coordinator {
}
}
- private BucketShuffleJoinController bucketShuffleJoinController = new
BucketShuffleJoinController();
private BucketSeqToScanRange bucketSeqToScanRange = new
BucketSeqToScanRange();
private Map<PlanFragmentId, Map<Integer, TNetworkAddress>>
fragmentIdToSeqToAddressMap = Maps.newHashMap();
+ // cache the fragment id to its scan node ids. Used for colocate join.
+ private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds =
Maps.newHashMap();
private Set<Integer> colocateFragmentIds = new HashSet<>();
+ private BucketShuffleJoinController bucketShuffleJoinController = new
BucketShuffleJoinController(fragmentIdToScanNodeIds);
// record backend execute state
// TODO(zhaochun): add profile information and others
@@ -1951,3 +1987,4 @@ public class Coordinator {
}
}
+
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 11dc8d1..eb1bad3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -17,10 +17,8 @@
package org.apache.doris.qe;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import mockit.Mocked;
+
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
@@ -34,33 +32,33 @@ import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.EmptySetNode;
-import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.HashJoinNode;
-import org.apache.doris.planner.MysqlScanNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.Planner;
-import org.apache.doris.planner.ScanNode;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPartitionType;
-import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TUniqueId;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class CoordinatorTest extends Coordinator {
static Planner planner = new Planner();
@@ -87,13 +85,19 @@ public class CoordinatorTest extends Coordinator {
public void testComputeColocateJoinInstanceParam() {
Coordinator coordinator = new Coordinator(context, analyzer, planner);
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ int scanNodeId = 1;
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new
HashMap<>();
+ fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+ fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+ Deencapsulation.setField(coordinator, "fragmentIdToScanNodeIds",
fragmentIdToScanNodeIds);
+
// 1. set fragmentToBucketSeqToAddress in coordinator
Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
TNetworkAddress address = new TNetworkAddress();
for (int i = 0; i < 3; i++) {
bucketSeqToAddress.put(i, address);
}
- PlanFragmentId planFragmentId = new PlanFragmentId(1);
Map<PlanFragmentId, Map<Integer, TNetworkAddress>>
fragmentToBucketSeqToAddress = new HashMap<>();
fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap",
fragmentToBucketSeqToAddress);
@@ -101,7 +105,7 @@ public class CoordinatorTest extends Coordinator {
// 2. set bucketSeqToScanRange in coordinator
BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
- ScanRangeMap.put(1, new ArrayList<>());
+ ScanRangeMap.put(scanNodeId, new ArrayList<>());
for (int i = 0; i < 3; i++) {
bucketSeqToScanRange.put(i, ScanRangeMap);
}
@@ -126,9 +130,18 @@ public class CoordinatorTest extends Coordinator {
@Test
public void testIsBucketShuffleJoin() {
- Coordinator.BucketShuffleJoinController bucketShuffleJoinController =
new Coordinator.BucketShuffleJoinController();
-
- PlanNodeId testPaloNodeId = new PlanNodeId(-1);
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ int scanNodeId = 1;
+ // The "fragmentIdToScanNodeIds" we created here is useless in this
test.
+ // It is only for creating the BucketShuffleJoinController.
+ // So the fragment id and scan node id in it is meaningless.
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new
HashMap<>();
+ fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+ fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+ Coordinator.BucketShuffleJoinController bucketShuffleJoinController
+ = new
Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
+
+ PlanNodeId testPlanNodeId = new PlanNodeId(-1);
TupleId testTupleId = new TupleId(-1);
ArrayList<TupleId> tupleIdArrayList = new ArrayList<>();
tupleIdArrayList.add(testTupleId);
@@ -137,8 +150,8 @@ public class CoordinatorTest extends Coordinator {
BinaryPredicate binaryPredicate = new BinaryPredicate();
testJoinexprs.add(binaryPredicate);
- HashJoinNode hashJoinNode = new HashJoinNode(testPaloNodeId, new
EmptySetNode(testPaloNodeId, tupleIdArrayList),
- new EmptySetNode(testPaloNodeId, tupleIdArrayList) , new
TableRef(), testJoinexprs, new ArrayList<>());
+ HashJoinNode hashJoinNode = new HashJoinNode(testPlanNodeId, new
EmptySetNode(testPlanNodeId, tupleIdArrayList),
+ new EmptySetNode(testPlanNodeId, tupleIdArrayList), new
TableRef(), testJoinexprs, new ArrayList<>());
hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1),
hashJoinNode,
new
DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
@@ -146,7 +159,7 @@ public class CoordinatorTest extends Coordinator {
Assert.assertEquals(false,
Deencapsulation.invoke(bucketShuffleJoinController,
"isBucketShuffleJoin", -1, hashJoinNode));
- // the fragment id is differernt from hash join node
+ // the fragment id is different from hash join node
hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2),
hashJoinNode,
new
DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
@@ -158,7 +171,7 @@ public class CoordinatorTest extends Coordinator {
Assert.assertEquals(true,
Deencapsulation.invoke(bucketShuffleJoinController,
"isBucketShuffleJoin", -1, hashJoinNode));
- // the framgent id is in cache, so not do check node again
+ // the fragment id is in cache, so not do check node again
Assert.assertEquals(true,
Deencapsulation.invoke(bucketShuffleJoinController,
"isBucketShuffleJoin", -1));
@@ -166,7 +179,13 @@ public class CoordinatorTest extends Coordinator {
@Test
public void testComputeScanRangeAssignmentByBucketq() {
- Coordinator.BucketShuffleJoinController bucketShuffleJoinController =
new Coordinator.BucketShuffleJoinController();
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ int scanNodeId = 1;
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new
HashMap<>();
+ fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+ fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+ Coordinator.BucketShuffleJoinController bucketShuffleJoinController
+ = new
Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
// init olap scan node of bucket shuffle join
TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
@@ -175,7 +194,7 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.setField(olapTable, "defaultDistributionInfo",
hashDistributionInfo);
tupleDescriptor.setTable(olapTable);
- OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1),
tupleDescriptor, "test");
+ OlapScanNode olapScanNode = new OlapScanNode(new
PlanNodeId(scanNodeId), tupleDescriptor, "test");
ArrayListMultimap<Integer, TScanRangeLocations> bucketseq2localtion =
ArrayListMultimap.create();
// each olaptable bucket have the same TScanRangeLocations, be id is
{0, 1, 2}
@@ -196,10 +215,9 @@ public class CoordinatorTest extends Coordinator {
}
Deencapsulation.setField(olapScanNode, "bucketSeq2locations",
bucketseq2localtion);
- olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1),
olapScanNode,
+ olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED)));
-
// init all backend
Backend backend0 = new Backend();
backend0.setAlive(true);
@@ -240,7 +258,13 @@ public class CoordinatorTest extends Coordinator {
@Test
public void testComputeScanRangeAssignmentByBucket() {
- Coordinator.BucketShuffleJoinController bucketShuffleJoinController =
new Coordinator.BucketShuffleJoinController();
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ int scanNodeId = 1;
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new
HashMap<>();
+ fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+ fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+ Coordinator.BucketShuffleJoinController bucketShuffleJoinController
+ = new
Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
// init olap scan node of bucket shuffle join
TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
@@ -249,7 +273,7 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.setField(olapTable, "defaultDistributionInfo",
hashDistributionInfo);
tupleDescriptor.setTable(olapTable);
- OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1),
tupleDescriptor, "test");
+ OlapScanNode olapScanNode = new OlapScanNode(new
PlanNodeId(scanNodeId), tupleDescriptor, "test");
ArrayListMultimap<Integer, TScanRangeLocations> bucketseq2localtion =
ArrayListMultimap.create();
// each olaptable bucket have the same TScanRangeLocations, be id is
{0, 1, 2}
@@ -270,7 +294,7 @@ public class CoordinatorTest extends Coordinator {
}
Deencapsulation.setField(olapScanNode, "bucketSeq2locations",
bucketseq2localtion);
- olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1),
olapScanNode,
+ olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED)));
@@ -315,7 +339,15 @@ public class CoordinatorTest extends Coordinator {
@Test
public void testComputeBucketShuffleJoinInstanceParam() {
- Coordinator.BucketShuffleJoinController bucketShuffleJoinController =
new Coordinator.BucketShuffleJoinController();
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ int scanNodeId = 1;
+
+ // set fragment id to scan node ids map
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new
HashMap<>();
+ fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+ fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+ Coordinator.BucketShuffleJoinController bucketShuffleJoinController
+ = new
Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
// 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController
Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
@@ -323,7 +355,6 @@ public class CoordinatorTest extends Coordinator {
for (int i = 0; i < 3; i++) {
bucketSeqToAddress.put(i, address);
}
- PlanFragmentId planFragmentId = new PlanFragmentId(1);
Map<PlanFragmentId, Map<Integer, TNetworkAddress>>
fragmentToBucketSeqToAddress = new HashMap<>();
fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
Deencapsulation.setField(bucketShuffleJoinController,
"fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress);
@@ -332,13 +363,14 @@ public class CoordinatorTest extends Coordinator {
Map<PlanFragmentId, BucketSeqToScanRange>
fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
- ScanRangeMap.put(1, new ArrayList<>());
+ ScanRangeMap.put(scanNodeId, new ArrayList<>());
for (int i = 0; i < 3; i++) {
bucketSeqToScanRange.put(i, ScanRangeMap);
}
fragmentIdBucketSeqToScanRangeMap.put(planFragmentId,
bucketSeqToScanRange);
Deencapsulation.setField(bucketShuffleJoinController,
"fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
+
FragmentExecParams params = new FragmentExecParams(null);
Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params);
Assert.assertEquals(1, params.instanceExecParams.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]