DRILL-672: Queries against HBase table do not close after the data is returned.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8490d743
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8490d743
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8490d743

Branch: refs/heads/master
Commit: 8490d7433f9d9171971ae6e1af02cb67215cd8ce
Parents: d929faa
Author: Aditya Kishore <[email protected]>
Authored: Wed May 28 04:57:07 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Mon Jun 2 09:15:29 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/hbase/HBaseGroupScan.java  | 215 ++++++++++---
 .../exec/store/hbase/HBaseRecordReader.java     |   4 +-
 .../drill/exec/store/hbase/HBaseSubScan.java    |  14 +-
 .../org/apache/drill/hbase/BaseHBaseTest.java   |  17 +-
 .../org/apache/drill/hbase/HBaseTestsSuite.java |   8 +-
 .../hbase/TestHBaseRegionScanAssignments.java   | 299 +++++++++++++++++++
 6 files changed, 505 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 809aa86..f3ff64c 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -18,12 +18,18 @@
 package org.apache.drill.exec.store.hbase;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
@@ -38,12 +44,12 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
 
 import parquet.org.codehaus.jackson.annotate.JsonCreator;
 
@@ -51,16 +57,26 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 @JsonTypeName("hbase-scan")
 public class HBaseGroupScan extends AbstractGroupScan implements 
DrillHBaseConstants {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
 
+  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR 
= new Comparator<List<HBaseSubScanSpec>>() {
+    @Override
+    public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> 
list2) {
+      return list1.size() - list2.size();
+    }
+  };
+
+  private static final Comparator<List<HBaseSubScanSpec>> 
LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+
   private HBaseStoragePluginConfig storagePluginConfig;
 
   private List<SchemaPath> columns;
@@ -70,9 +86,11 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
   private HBaseStoragePlugin storagePlugin;
 
   private Stopwatch watch = new Stopwatch();
-  private ArrayListMultimap<Integer, HBaseSubScan.HBaseSubScanSpec> mappings;
-  private List<EndpointAffinity> endpointAffinities;
-  private NavigableMap<HRegionInfo,ServerName> regionsToScan;
+
+  private Map<Integer, List<HBaseSubScanSpec>> endpointFragmentMapping;
+
+  private NavigableMap<HRegionInfo, ServerName> regionsToScan;
+
   private HTableDescriptor hTableDesc;
 
   private boolean filterPushedDown = false;
@@ -85,9 +103,9 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), 
hbaseScanSpec, columns);
   }
 
-  public HBaseGroupScan(HBaseStoragePlugin storageEngine, HBaseScanSpec 
scanSpec, List<SchemaPath> columns) {
-    this.storagePlugin = storageEngine;
-    this.storagePluginConfig = storageEngine.getConfig();
+  public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec 
scanSpec, List<SchemaPath> columns) {
+    this.storagePlugin = storagePlugin;
+    this.storagePluginConfig = storagePlugin.getConfig();
     this.hbaseScanSpec = scanSpec;
     this.columns = columns;
     init();
@@ -95,13 +113,12 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
 
   /**
    * Private constructor, used for cloning.
-   * @param that The
+   * @param that The HBaseGroupScan to clone
    */
   private HBaseGroupScan(HBaseGroupScan that) {
     this.columns = that.columns;
-    this.endpointAffinities = that.endpointAffinities;
     this.hbaseScanSpec = that.hbaseScanSpec;
-    this.mappings = that.mappings;
+    this.endpointFragmentMapping = that.endpointFragmentMapping;
     this.regionsToScan = that.regionsToScan;
     this.storagePlugin = that.storagePlugin;
     this.storagePluginConfig = that.storagePluginConfig;
@@ -165,8 +182,7 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
 
     Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new 
HashMap<DrillbitEndpoint, EndpointAffinity>();
     for (ServerName sn : regionsToScan.values()) {
-      String host = sn.getHostname();
-      DrillbitEndpoint ep = endpointMap.get(host);
+      DrillbitEndpoint ep = endpointMap.get(sn.getHostname());
       if (ep != null) {
         EndpointAffinity affinity = affinityMap.get(ep);
         if (affinity == null) {
@@ -176,9 +192,8 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
         }
       }
     }
-    this.endpointAffinities = Lists.newArrayList(affinityMap.values());
-    logger.debug("Took {} ms to get operator affinity", 
watch.elapsed(TimeUnit.MILLISECONDS));
-    return this.endpointAffinities;
+    logger.debug("Took {} µs to get operator affinity", 
watch.elapsed(TimeUnit.NANOSECONDS)/1000);
+    return Lists.newArrayList(affinityMap.values());
   }
 
   /**
@@ -189,42 +204,135 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
     watch.reset();
     watch.start();
-    Preconditions.checkArgument(incomingEndpoints.size() <= 
regionsToScan.size(),
-        String.format("Incoming endpoints %d is greater than number of row 
groups %d", incomingEndpoints.size(), regionsToScan.size()));
 
-    mappings = ArrayListMultimap.create();
-    ArrayListMultimap<String, Integer> incomingEndpointMap = 
ArrayListMultimap.create();
-    for (int i = 0; i < incomingEndpoints.size(); i++) {
-      incomingEndpointMap.put(incomingEndpoints.get(i).getAddress(), i);
+    final int numSlots = incomingEndpoints.size();
+    Preconditions.checkArgument(numSlots <= regionsToScan.size(),
+        String.format("Incoming endpoints %d is greater than number of scan 
regions %d", numSlots, regionsToScan.size()));
+
+    /*
+     * Minimum/Maximum number of assignment per slot
+     */
+    final int minPerEndpointSlot = (int) 
Math.floor((double)regionsToScan.size() / numSlots);
+    final int maxPerEndpointSlot = (int) 
Math.ceil((double)regionsToScan.size() / numSlots);
+
+    /*
+     * initialize (endpoint index => HBaseSubScanSpec list) map
+     */
+    endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
+
+    /*
+     * another map with endpoint (hostname => corresponding index list) in 
'incomingEndpoints' list
+     */
+    Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+
+    /*
+     * Initialize these two maps
+     */
+    for (int i = 0; i < numSlots; ++i) {
+      endpointFragmentMapping.put(i, new 
ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot));
+      String hostname = incomingEndpoints.get(i).getAddress();
+      Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
+      if (hostIndexQueue == null) {
+        hostIndexQueue = Lists.newLinkedList();
+        endpointHostIndexListMap.put(hostname, hostIndexQueue);
+      }
+      hostIndexQueue.add(i);
     }
-    Map<String, Iterator<Integer>> mapIterator = new HashMap<String, 
Iterator<Integer>>();
-    for (String s : incomingEndpointMap.keySet()) {
-      Iterator<Integer> ints = Iterators.cycle(incomingEndpointMap.get(s));
-      mapIterator.put(s, ints);
+
+    Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = 
Sets.newHashSet(regionsToScan.entrySet());
+
+    /*
+     * First, we assign regions which are hosted on region servers running on 
drillbit endpoints
+     */
+    for (Iterator<Entry<HRegionInfo, ServerName>> regionsIterator = 
regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
+      Entry<HRegionInfo, ServerName> regionEntry = regionsIterator.next();
+      /*
+       * Test if there is a drillbit endpoint which is also an HBase 
RegionServer that hosts the current HBase region
+       */
+      Queue<Integer> endpointIndexlist = 
endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
+      if (endpointIndexlist != null) {
+        Integer slotIndex = endpointIndexlist.poll();
+        List<HBaseSubScanSpec> endpointSlotScanList = 
endpointFragmentMapping.get(slotIndex);
+        
endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+        // add to the tail of the slot list, to add more later in round robin 
fashion
+        endpointIndexlist.offer(slotIndex);
+        // this region has been assigned
+        regionsIterator.remove();
+      }
     }
-    Iterator<Integer> nullIterator = 
Iterators.cycle(incomingEndpointMap.values());
-    for (HRegionInfo regionInfo : regionsToScan.keySet()) {
-      logger.debug("creating read entry. start key: {} end key: {}", 
Bytes.toStringBinary(regionInfo.getStartKey()), 
Bytes.toStringBinary(regionInfo.getEndKey()));
-      HBaseSubScan.HBaseSubScanSpec p = new HBaseSubScan.HBaseSubScanSpec()
-          .setTableName(hbaseScanSpec.getTableName())
-          .setStartRow((hbaseScanSpec.getStartRow() != null && 
regionInfo.containsRow(hbaseScanSpec.getStartRow())) ? 
hbaseScanSpec.getStartRow() : regionInfo.getStartKey())
-          .setStopRow((hbaseScanSpec.getStopRow() != null && 
regionInfo.containsRow(hbaseScanSpec.getStopRow())) ? 
hbaseScanSpec.getStopRow() : regionInfo.getEndKey())
-          .setSerializedFilter(hbaseScanSpec.getSerializedFilter());
-      String host = regionsToScan.get(regionInfo).getHostname();
-      Iterator<Integer> indexIterator = mapIterator.get(host);
-      if (indexIterator == null) {
-        indexIterator = nullIterator;
+
+    /*
+     * Build priority queues of slots, with ones which has tasks lesser than 
'minPerEndpointSlot' and another which have more.
+     */
+    PriorityQueue<List<HBaseSubScanSpec>> minHeap = new 
PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
+    PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new 
PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+    for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+      if (listOfScan.size() < minPerEndpointSlot) {
+        minHeap.offer(listOfScan);
+      } else if (listOfScan.size() > minPerEndpointSlot){
+        maxHeap.offer(listOfScan);
       }
-      mappings.put(indexIterator.next(), p);
     }
+
+    /*
+     * Now, let's process any regions which remain unassigned and assign them 
to slots with minimum number of assignments.
+     */
+    if (regionsToAssignSet.size() > 0) {
+      for (Entry<HRegionInfo, ServerName> regionEntry : regionsToAssignSet) {
+        List<HBaseSubScanSpec> smallestList = minHeap.poll();
+        smallestList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+        if (smallestList.size() < minPerEndpointSlot) {
+          minHeap.offer(smallestList);
+        }
+      }
+    }
+
+    /*
+     * While there are slots with lesser than 'minPerEndpointSlot' unit work, 
balance from those with more.
+     */
+    while(minHeap.peek() != null && minHeap.peek().size() < 
minPerEndpointSlot) {
+      List<HBaseSubScanSpec> smallestList = minHeap.poll();
+      List<HBaseSubScanSpec> largestList = maxHeap.poll();
+      smallestList.add(largestList.remove(largestList.size()-1));
+      if (largestList.size() > minPerEndpointSlot) {
+        maxHeap.offer(largestList);
+      }
+      if (smallestList.size() < minPerEndpointSlot) {
+        minHeap.offer(smallestList);
+      }
+    }
+
+    /* no slot should be empty at this point */
+    assert (minHeap.peek() == null || minHeap.peek().size() > 0) : 
String.format(
+        "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment 
Map: {}.",
+        incomingEndpoints, endpointFragmentMapping.toString());
+
+    logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment 
Map: {}",
+        watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, 
endpointFragmentMapping.toString());
+  }
+
+  private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) {
+    HBaseScanSpec spec = hbaseScanSpec;
+    return new HBaseSubScanSpec()
+        .setTableName(spec.getTableName())
+        .setRegionServer(regionsToScan.get(ri).getHostname())
+        .setStartRow((!isNullOrEmpty(spec.getStartRow()) && 
ri.containsRow(spec.getStartRow())) ? spec.getStartRow() : ri.getStartKey())
+        .setStopRow((!isNullOrEmpty(spec.getStopRow()) && 
ri.containsRow(spec.getStopRow())) ? spec.getStopRow() : ri.getEndKey())
+        .setSerializedFilter(spec.getSerializedFilter());
+  }
+
+  private boolean isNullOrEmpty(byte[] key) {
+    return key == null || key.length == 0;
   }
 
   @Override
   public HBaseSubScan getSpecificScan(int minorFragmentId) {
-    return new HBaseSubScan(storagePlugin, storagePluginConfig, 
mappings.get(minorFragmentId), columns);
+    assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+        "Mappings length [%d] should be greater than minor fragment id [%d] 
but it isn't.", endpointFragmentMapping.size(),
+        minorFragmentId);
+    return new HBaseSubScan(storagePlugin, storagePluginConfig, 
endpointFragmentMapping.get(minorFragmentId), columns);
   }
 
-
   @Override
   public int getMaxParallelizationWidth() {
     return regionsToScan.size();
@@ -313,4 +421,27 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     return filterPushedDown;
   }
 
+  /**
+   * Empty constructor, do not use, only for testing.
+   */
+  @VisibleForTesting
+  public HBaseGroupScan() { }
+
+  /**
+   * Do not use, only for testing.
+   */
+  @VisibleForTesting
+  public void setHBaseScanSpec(HBaseScanSpec hbaseScanSpec) {
+    this.hbaseScanSpec = hbaseScanSpec;
+  }
+
+  /**
+   * Do not use, only for testing.
+   */
+  @JsonIgnore
+  @VisibleForTesting
+  public void setRegionsToScan(NavigableMap<HRegionInfo, ServerName> 
regionsToScan) {
+    this.regionsToScan = regionsToScan;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index caee8ed..204d486 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -30,8 +30,6 @@ import 
org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -58,7 +56,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
 
   private static final int TARGET_RECORD_COUNT = 4000;
-  
+
   private LinkedHashSet<SchemaPath> columns;
   private OutputMutator outputMutator;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index d9f2b7c..fafe9c9 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -125,12 +125,14 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
   public static class HBaseSubScanSpec {
 
     protected String tableName;
+    protected String regionServer;
     protected byte[] startRow;
     protected byte[] stopRow;
     protected byte[] serializedFilter;
 
     @parquet.org.codehaus.jackson.annotate.JsonCreator
     public HBaseSubScanSpec(@JsonProperty("tableName") String tableName,
+                            @JsonProperty("regionServer") String regionServer,
                             @JsonProperty("startRow") byte[] startRow,
                             @JsonProperty("stopRow") byte[] stopRow,
                             @JsonProperty("serializedFilter") byte[] 
serializedFilter,
@@ -139,6 +141,7 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
         throw new IllegalArgumentException("The parameters 'serializedFilter' 
or 'filterString' cannot be specified at the same time.");
       }
       this.tableName = tableName;
+      this.regionServer = regionServer;
       this.startRow = startRow;
       this.stopRow = stopRow;
       if (serializedFilter != null) {
@@ -170,6 +173,15 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
       return this;
     }
 
+    public String getRegionServer() {
+      return regionServer;
+    }
+
+    public HBaseSubScanSpec setRegionServer(String regionServer) {
+      this.regionServer = regionServer;
+      return this;
+    }
+
     public byte[] getStartRow() {
       return startRow;
     }
@@ -204,7 +216,7 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
           + ", startRow=" + (startRow == null ? null : 
Bytes.toStringBinary(startRow))
           + ", stopRow=" + (stopRow == null ? null : 
Bytes.toStringBinary(stopRow))
           + ", filter=" + (getScanFilter() == null ? null : 
getScanFilter().toString())
-          + "]";
+          + ", regionServer=" + regionServer + "]";
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index 96f0c4a..dbeced3 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.store.hbase.HBaseStoragePlugin;
+import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig;
 import org.apache.drill.exec.util.VectorUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -43,6 +44,10 @@ public class BaseHBaseTest extends BaseTestQuery {
 
   protected static Configuration conf = HBaseConfiguration.create();
 
+  protected static HBaseStoragePlugin storagePlugin;
+
+  protected static HBaseStoragePluginConfig storagePluginConfig;
+
   @Rule public TestName TEST_NAME = new TestName();
 
   private int[] columnWidths = new int[] { 8 };
@@ -58,11 +63,13 @@ public class BaseHBaseTest extends BaseTestQuery {
      * Change the following to HBaseTestsSuite.configure(false, true)
      * if you want to test against an externally running HBase cluster.
      */
-    HBaseTestsSuite.configure(false, true);
-
+    HBaseTestsSuite.configure(true, true);
     HBaseTestsSuite.initCluster();
-    HBaseStoragePlugin plugin = (HBaseStoragePlugin) 
bit.getContext().getStorage().getPlugin("hbase");
-    plugin.getConfig().setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
+
+    storagePlugin = (HBaseStoragePlugin) 
bit.getContext().getStorage().getPlugin("hbase");
+    storagePluginConfig = storagePlugin.getConfig();
+
+    storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
   }
 
   @AfterClass
@@ -77,7 +84,7 @@ public class BaseHBaseTest extends BaseTestQuery {
   protected void setColumnWidths(int[] columnWidths) {
     this.columnWidths = columnWidths;
   }
-  
+
   protected String getPlanText(String planFile, String tableName) throws 
IOException {
     return Files.toString(FileUtils.getResourceAsFile(planFile), 
Charsets.UTF_8)
         
.replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", 
"\"hbase.zookeeper.property.clientPort\" : " + 
HBaseTestsSuite.getZookeeperPort())

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index e30f79e..67e6f87 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -36,7 +36,9 @@ import org.junit.runners.Suite.SuiteClasses;
 @SuiteClasses({
   HBaseRecordReaderTest.class,
   TestHBaseFilterPushDown.class,
-  TestHBaseProjectPushDown.class})
+  TestHBaseProjectPushDown.class,
+  TestHBaseRegionScanAssignments.class
+})
 public class HBaseTestsSuite {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class);
 
@@ -147,4 +149,8 @@ public class HBaseTestsSuite {
     HBaseTestsSuite.createTables = createTables;
   }
 
+  public static HBaseAdmin getAdmin() {
+    return admin;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
new file mode 100644
index 0000000..71cb604
--- /dev/null
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.hbase.HBaseGroupScan;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestHBaseRegionScanAssignments extends BaseHBaseTest {
+  static final String HOST_A = "A";
+  static final String HOST_B = "B";
+  static final String HOST_C = "C";
+  static final String HOST_D = "D";
+  static final String HOST_E = "E";
+  static final String HOST_F = "F";
+  static final String HOST_G = "G";
+  static final String HOST_H = "H";
+  static final String HOST_I = "I";
+  static final String HOST_J = "J";
+  static final String HOST_K = "K";
+  static final String HOST_L = "L";
+  static final String HOST_M = "M";
+
+  static final String HOST_X = "X";
+
+  static final String PORT_AND_STARTTIME = ",60020,1400265190186";
+
+  static final ServerName SERVER_A = new ServerName(HOST_A + 
PORT_AND_STARTTIME);
+  static final ServerName SERVER_B = new ServerName(HOST_B + 
PORT_AND_STARTTIME);
+  static final ServerName SERVER_C = new ServerName(HOST_C + 
PORT_AND_STARTTIME);
+  static final ServerName SERVER_D = new ServerName(HOST_D + 
PORT_AND_STARTTIME);
+  static final ServerName SERVER_E = new ServerName(HOST_E + 
PORT_AND_STARTTIME);
+  static final ServerName SERVER_F = new ServerName(HOST_F + 
PORT_AND_STARTTIME);
+  static final ServerName SERVER_G = new ServerName(HOST_G + 
PORT_AND_STARTTIME);
+  static final ServerName SERVER_H = new ServerName(HOST_H + 
PORT_AND_STARTTIME);
+  static final ServerName SERVER_I = new ServerName(HOST_I + 
PORT_AND_STARTTIME);
+
+  static final ServerName SERVER_X = new ServerName(HOST_X + 
PORT_AND_STARTTIME);
+
+  static final byte[][] splits = {{},
+    "10".getBytes(), "15".getBytes(), "20".getBytes(), "25".getBytes(), 
"30".getBytes(), "35".getBytes(),
+    "40".getBytes(), "45".getBytes(), "50".getBytes(), "55".getBytes(), 
"60".getBytes(), "65".getBytes(),
+    "70".getBytes(), "75".getBytes(), "80".getBytes(), "85".getBytes(), 
"90".getBytes(), "95".getBytes()};
+
+  static final String TABLE_NAME = "TestTable";
+  static final byte[] TABLE_NAME_BYTES = TABLE_NAME.getBytes();
+
+  /**
+   * Has the same name as the {@link BeforeClass} method of the parent so that
+   * we do not start MiniHBase cluster as it is not required for these tests.
+   * @throws Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // do nothing
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentMix() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), 
SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), 
SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), 
SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), 
SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), 
SERVER_D);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    final DrillbitEndpoint DB_A = 
DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build();
+    endpoints.add(DB_A);
+    endpoints.add(DB_A);
+    final DrillbitEndpoint DB_B = 
DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build();
+    endpoints.add(DB_B);
+    final DrillbitEndpoint DB_D = 
DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build();
+    endpoints.add(DB_D);
+    final DrillbitEndpoint DB_X = 
DrillbitEndpoint.newBuilder().setAddress(HOST_X).setControlPort(1234).build();
+    endpoints.add(DB_X);
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], 
null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'A'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'A'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'B'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'D'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'X'
+    testParallelizationWidth(scan, i);
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentSomeAfinedWithOrphans() throws 
Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), 
SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), 
SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), 
SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), 
SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), 
SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[8]), 
SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[8], splits[9]), 
SERVER_E);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[9], 
splits[10]), SERVER_E);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[10], 
splits[11]), SERVER_F);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[11], 
splits[12]), SERVER_F);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[12], 
splits[13]), SERVER_G);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[13], 
splits[14]), SERVER_G);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[14], 
splits[15]), SERVER_H);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[15], 
splits[16]), SERVER_H);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[16], 
splits[17]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[17], 
splits[0]), SERVER_A);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_I).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_J).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_K).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_L).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_M).setControlPort(1234).build());
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], 
null));
+    scan.applyAssignments(endpoints);
+
+    LinkedList<Integer> sizes = Lists.newLinkedList();
+    sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); 
sizes.add(1); sizes.add(1); sizes.add(1);
+    sizes.add(2); sizes.add(2); sizes.add(2); sizes.add(2); sizes.add(2);
+    for (int i = 0; i < endpoints.size(); i++) {
+      
assertTrue(sizes.remove((Integer)scan.getSpecificScan(i).getRegionScanSpecList().size()));
+    }
+    assertEquals(0, sizes.size());
+    testParallelizationWidth(scan, endpoints.size());
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentOneEach() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[8]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[8], splits[0]), 
SERVER_A);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], 
null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'A'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'A'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'B'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'C'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'D'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'E'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'F'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'G'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'H'
+    testParallelizationWidth(scan, i);
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentNoAfinity() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), 
SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), 
SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), 
SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), 
SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), 
SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), 
SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), 
SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), 
SERVER_X);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+    
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], 
null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'A'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'B'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'C'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'D'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'E'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'F'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'G'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'H'
+    testParallelizationWidth(scan, i);
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentAllPreferred() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), 
SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), 
SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), 
SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), 
SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), 
SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), 
SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), 
SERVER_D);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    final DrillbitEndpoint DB_A = 
DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build();
+    endpoints.add(DB_A);
+    final DrillbitEndpoint DB_B = 
DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build();
+    endpoints.add(DB_B);
+    final DrillbitEndpoint DB_D = 
DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build();
+    endpoints.add(DB_D);
+    final DrillbitEndpoint DB_X = 
DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build();
+    endpoints.add(DB_X);
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], 
null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'A'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'B'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'C'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); 
// 'D'
+    testParallelizationWidth(scan, i);
+  }
+
+  private void testParallelizationWidth(HBaseGroupScan scan, int i) {
+    try {
+      scan.getSpecificScan(i);
+      fail("Should not have " + i + "th assignment or you have not enabled 
Java assertion.");
+    } catch (AssertionError e) { }
+  }
+}

Reply via email to