This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch 1.21
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e21f10a4c853f3136ba154c2389c53b71932a48e
Author: shfshihuafeng <shfshihuaf...@163.com>
AuthorDate: Sun Mar 3 08:30:50 2024 +0800

    DRILL-8482:Assign region throw exception when some region is deployed on 
affinity node and some on non-affinity node (#2885)
---
 .../drill/exec/store/hbase/HBaseGroupScan.java     |  2 +-
 .../hbase/TestHBaseRegionScanAssignments.java      | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index cef696ed03..34897ec306 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -267,7 +267,7 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     PriorityQueue<List<HBaseSubScanSpec>> minHeap = new 
PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR);
     PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new 
PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR_REV);
     for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
-      if (listOfScan.size() < minPerEndpointSlot) {
+      if (listOfScan.size() < maxPerEndpointSlot) {
         minHeap.offer(listOfScan);
       } else if (listOfScan.size() > minPerEndpointSlot) {
         maxHeap.offer(listOfScan);
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
index af6790994f..d0ed91ca19 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
@@ -188,6 +188,34 @@ public class TestHBaseRegionScanAssignments extends 
BaseHBaseTest {
     testParallelizationWidth(scan, endpoints.size());
   }
 
+  @Test
+  public void testHBaseGroupScanAssignmentSomeAfinedAndSomeWithOrphans() 
throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[0], splits[1]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[1], splits[2]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[2], splits[3]), 
SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[3], splits[4]), 
SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[6], splits[7]), 
SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[7], splits[8]), 
SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[8], splits[9]), 
SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[9], splits[10]), 
SERVER_D);
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME_STR, splits[0], 
splits[0], null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(3, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'A'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'B'
+    assertEquals(3, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'C'
+    testParallelizationWidth(scan, i);
+  }
+
   @Test
   public void testHBaseGroupScanAssignmentOneEach() throws Exception {
     NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();

Reply via email to