This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.21 in repository https://gitbox.apache.org/repos/asf/drill.git
commit e21f10a4c853f3136ba154c2389c53b71932a48e Author: shfshihuafeng <shfshihuaf...@163.com> AuthorDate: Sun Mar 3 08:30:50 2024 +0800 DRILL-8482:Assign region throw exception when some region is deployed on affinity node and some on non-affinity node (#2885) --- .../drill/exec/store/hbase/HBaseGroupScan.java | 2 +- .../hbase/TestHBaseRegionScanAssignments.java | 28 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index cef696ed03..34897ec306 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -267,7 +267,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR); PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR_REV); for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) { - if (listOfScan.size() < minPerEndpointSlot) { + if (listOfScan.size() < maxPerEndpointSlot) { minHeap.offer(listOfScan); } else if (listOfScan.size() > minPerEndpointSlot) { maxHeap.offer(listOfScan); diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java index af6790994f..d0ed91ca19 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java @@ -188,6 +188,34 @@ public class TestHBaseRegionScanAssignments extends BaseHBaseTest { testParallelizationWidth(scan, endpoints.size()); } + @Test + public void testHBaseGroupScanAssignmentSomeAfinedAndSomeWithOrphans() throws Exception { + NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap(); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[0], splits[1]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[1], splits[2]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[2], splits[3]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[3], splits[4]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[6], splits[7]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[7], splits[8]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[8], splits[9]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[9], splits[10]), SERVER_D); + final List<DrillbitEndpoint> endpoints = Lists.newArrayList(); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build()); + + HBaseGroupScan scan = new HBaseGroupScan(); + scan.setRegionsToScan(regionsToScan); + scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME_STR, splits[0], splits[0], null)); + scan.applyAssignments(endpoints); + + int i = 0; + assertEquals(3, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A' + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B' + assertEquals(3, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C' + testParallelizationWidth(scan, i); + } + @Test public void testHBaseGroupScanAssignmentOneEach() throws Exception { NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();