DRILL-672: Queries against HBase table do not close after the data is returned.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8490d743 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8490d743 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8490d743 Branch: refs/heads/master Commit: 8490d7433f9d9171971ae6e1af02cb67215cd8ce Parents: d929faa Author: Aditya Kishore <[email protected]> Authored: Wed May 28 04:57:07 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jun 2 09:15:29 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hbase/HBaseGroupScan.java | 215 ++++++++++--- .../exec/store/hbase/HBaseRecordReader.java | 4 +- .../drill/exec/store/hbase/HBaseSubScan.java | 14 +- .../org/apache/drill/hbase/BaseHBaseTest.java | 17 +- .../org/apache/drill/hbase/HBaseTestsSuite.java | 8 +- .../hbase/TestHBaseRegionScanAssignments.java | 299 +++++++++++++++++++ 6 files changed, 505 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 809aa86..f3ff64c 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -18,12 +18,18 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -38,12 +44,12 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.util.Bytes; import parquet.org.codehaus.jackson.annotate.JsonCreator; @@ -51,16 +57,26 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; @JsonTypeName("hbase-scan") public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class); + private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<HBaseSubScanSpec>>() { + @Override + public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) { + return list1.size() - list2.size(); + } + }; + + private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR); + private HBaseStoragePluginConfig storagePluginConfig; private List<SchemaPath> columns; @@ -70,9 +86,11 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst private HBaseStoragePlugin storagePlugin; private Stopwatch watch = new Stopwatch(); - private ArrayListMultimap<Integer, HBaseSubScan.HBaseSubScanSpec> mappings; - private List<EndpointAffinity> endpointAffinities; - private NavigableMap<HRegionInfo,ServerName> regionsToScan; + + private Map<Integer, List<HBaseSubScanSpec>> endpointFragmentMapping; + + private NavigableMap<HRegionInfo, ServerName> regionsToScan; + private HTableDescriptor hTableDesc; private boolean filterPushedDown = false; @@ -85,9 +103,9 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns); } - public HBaseGroupScan(HBaseStoragePlugin storageEngine, HBaseScanSpec scanSpec, List<SchemaPath> columns) { - this.storagePlugin = storageEngine; - this.storagePluginConfig = storageEngine.getConfig(); + public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) { + this.storagePlugin = storagePlugin; + this.storagePluginConfig = storagePlugin.getConfig(); this.hbaseScanSpec = scanSpec; this.columns = columns; init(); @@ -95,13 +113,12 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst /** * Private constructor, used for cloning. - * @param that The + * @param that The HBaseGroupScan to clone */ private HBaseGroupScan(HBaseGroupScan that) { this.columns = that.columns; - this.endpointAffinities = that.endpointAffinities; this.hbaseScanSpec = that.hbaseScanSpec; - this.mappings = that.mappings; + this.endpointFragmentMapping = that.endpointFragmentMapping; this.regionsToScan = that.regionsToScan; this.storagePlugin = that.storagePlugin; this.storagePluginConfig = that.storagePluginConfig; @@ -165,8 +182,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>(); for (ServerName sn : regionsToScan.values()) { - String host = sn.getHostname(); - DrillbitEndpoint ep = endpointMap.get(host); + DrillbitEndpoint ep = endpointMap.get(sn.getHostname()); if (ep != null) { EndpointAffinity affinity = affinityMap.get(ep); if (affinity == null) { @@ -176,9 +192,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst } } } - this.endpointAffinities = Lists.newArrayList(affinityMap.values()); - logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS)); - return this.endpointAffinities; + logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000); + return Lists.newArrayList(affinityMap.values()); } /** @@ -189,42 +204,135 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { watch.reset(); watch.start(); - Preconditions.checkArgument(incomingEndpoints.size() <= regionsToScan.size(), - String.format("Incoming endpoints %d is greater than number of row groups %d", incomingEndpoints.size(), regionsToScan.size())); - mappings = ArrayListMultimap.create(); - ArrayListMultimap<String, Integer> incomingEndpointMap = ArrayListMultimap.create(); - for (int i = 0; i < incomingEndpoints.size(); i++) { - incomingEndpointMap.put(incomingEndpoints.get(i).getAddress(), i); + final int numSlots = incomingEndpoints.size(); + Preconditions.checkArgument(numSlots <= regionsToScan.size(), + String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size())); + + /* + * Minimum/Maximum number of assignment per slot + */ + final int minPerEndpointSlot = (int) Math.floor((double)regionsToScan.size() / numSlots); + final int maxPerEndpointSlot = (int) Math.ceil((double)regionsToScan.size() / numSlots); + + /* + * initialize (endpoint index => HBaseSubScanSpec list) map + */ + endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots); + + /* + * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list + */ + Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap(); + + /* + * Initialize these two maps + */ + for (int i = 0; i < numSlots; ++i) { + endpointFragmentMapping.put(i, new ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot)); + String hostname = incomingEndpoints.get(i).getAddress(); + Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname); + if (hostIndexQueue == null) { + hostIndexQueue = Lists.newLinkedList(); + endpointHostIndexListMap.put(hostname, hostIndexQueue); + } + hostIndexQueue.add(i); } - Map<String, Iterator<Integer>> mapIterator = new HashMap<String, Iterator<Integer>>(); - for (String s : incomingEndpointMap.keySet()) { - Iterator<Integer> ints = Iterators.cycle(incomingEndpointMap.get(s)); - mapIterator.put(s, ints); + + Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet()); + + /* + * First, we assign regions which are hosted on region servers running on drillbit endpoints + */ + for (Iterator<Entry<HRegionInfo, ServerName>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) { + Entry<HRegionInfo, ServerName> regionEntry = regionsIterator.next(); + /* + * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region + */ + Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue().getHostname()); + if (endpointIndexlist != null) { + Integer slotIndex = endpointIndexlist.poll(); + List<HBaseSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex); + endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey())); + // add to the tail of the slot list, to add more later in round robin fashion + endpointIndexlist.offer(slotIndex); + // this region has been assigned + regionsIterator.remove(); + } } - Iterator<Integer> nullIterator = Iterators.cycle(incomingEndpointMap.values()); - for (HRegionInfo regionInfo : regionsToScan.keySet()) { - logger.debug("creating read entry. start key: {} end key: {}", Bytes.toStringBinary(regionInfo.getStartKey()), Bytes.toStringBinary(regionInfo.getEndKey())); - HBaseSubScan.HBaseSubScanSpec p = new HBaseSubScan.HBaseSubScanSpec() - .setTableName(hbaseScanSpec.getTableName()) - .setStartRow((hbaseScanSpec.getStartRow() != null && regionInfo.containsRow(hbaseScanSpec.getStartRow())) ? hbaseScanSpec.getStartRow() : regionInfo.getStartKey()) - .setStopRow((hbaseScanSpec.getStopRow() != null && regionInfo.containsRow(hbaseScanSpec.getStopRow())) ? hbaseScanSpec.getStopRow() : regionInfo.getEndKey()) - .setSerializedFilter(hbaseScanSpec.getSerializedFilter()); - String host = regionsToScan.get(regionInfo).getHostname(); - Iterator<Integer> indexIterator = mapIterator.get(host); - if (indexIterator == null) { - indexIterator = nullIterator; + + /* + * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more. + */ + PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR); + PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV); + for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) { + if (listOfScan.size() < minPerEndpointSlot) { + minHeap.offer(listOfScan); + } else if (listOfScan.size() > minPerEndpointSlot){ + maxHeap.offer(listOfScan); } - mappings.put(indexIterator.next(), p); } + + /* + * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments. + */ + if (regionsToAssignSet.size() > 0) { + for (Entry<HRegionInfo, ServerName> regionEntry : regionsToAssignSet) { + List<HBaseSubScanSpec> smallestList = minHeap.poll(); + smallestList.add(regionInfoToSubScanSpec(regionEntry.getKey())); + if (smallestList.size() < minPerEndpointSlot) { + minHeap.offer(smallestList); + } + } + } + + /* + * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more. + */ + while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) { + List<HBaseSubScanSpec> smallestList = minHeap.poll(); + List<HBaseSubScanSpec> largestList = maxHeap.poll(); + smallestList.add(largestList.remove(largestList.size()-1)); + if (largestList.size() > minPerEndpointSlot) { + maxHeap.offer(largestList); + } + if (smallestList.size() < minPerEndpointSlot) { + minHeap.offer(smallestList); + } + } + + /* no slot should be empty at this point */ + assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format( + "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.", + incomingEndpoints, endpointFragmentMapping.toString()); + + logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}", + watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString()); + } + + private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) { + HBaseScanSpec spec = hbaseScanSpec; + return new HBaseSubScanSpec() + .setTableName(spec.getTableName()) + .setRegionServer(regionsToScan.get(ri).getHostname()) + .setStartRow((!isNullOrEmpty(spec.getStartRow()) && ri.containsRow(spec.getStartRow())) ? spec.getStartRow() : ri.getStartKey()) + .setStopRow((!isNullOrEmpty(spec.getStopRow()) && ri.containsRow(spec.getStopRow())) ? spec.getStopRow() : ri.getEndKey()) + .setSerializedFilter(spec.getSerializedFilter()); + } + + private boolean isNullOrEmpty(byte[] key) { + return key == null || key.length == 0; } @Override public HBaseSubScan getSpecificScan(int minorFragmentId) { - return new HBaseSubScan(storagePlugin, storagePluginConfig, mappings.get(minorFragmentId), columns); + assert minorFragmentId < endpointFragmentMapping.size() : String.format( + "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), + minorFragmentId); + return new HBaseSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns); } - @Override public int getMaxParallelizationWidth() { return regionsToScan.size(); @@ -313,4 +421,27 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst return filterPushedDown; } + /** + * Empty constructor, do not use, only for testing. + */ + @VisibleForTesting + public HBaseGroupScan() { } + + /** + * Do not use, only for testing. + */ + @VisibleForTesting + public void setHBaseScanSpec(HBaseScanSpec hbaseScanSpec) { + this.hbaseScanSpec = hbaseScanSpec; + } + + /** + * Do not use, only for testing. + */ + @JsonIgnore + @VisibleForTesting + public void setRegionsToScan(NavigableMap<HRegionInfo, ServerName> regionsToScan) { + this.regionsToScan = regionsToScan; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index caee8ed..204d486 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -30,8 +30,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.PathSegment.NameSegment; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; @@ -58,7 +56,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class); private static final int TARGET_RECORD_COUNT = 4000; - + private LinkedHashSet<SchemaPath> columns; private OutputMutator outputMutator; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java index d9f2b7c..fafe9c9 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java @@ -125,12 +125,14 @@ public class HBaseSubScan extends AbstractBase implements SubScan { public static class HBaseSubScanSpec { protected String tableName; + protected String regionServer; protected byte[] startRow; protected byte[] stopRow; protected byte[] serializedFilter; @parquet.org.codehaus.jackson.annotate.JsonCreator public HBaseSubScanSpec(@JsonProperty("tableName") String tableName, + @JsonProperty("regionServer") String regionServer, @JsonProperty("startRow") byte[] startRow, @JsonProperty("stopRow") byte[] stopRow, @JsonProperty("serializedFilter") byte[] serializedFilter, @@ -139,6 +141,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan { throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time."); } this.tableName = tableName; + this.regionServer = regionServer; this.startRow = startRow; this.stopRow = stopRow; if (serializedFilter != null) { @@ -170,6 +173,15 @@ public class HBaseSubScan extends AbstractBase implements SubScan { return this; } + public String getRegionServer() { + return regionServer; + } + + public HBaseSubScanSpec setRegionServer(String regionServer) { + this.regionServer = regionServer; + return this; + } + public byte[] getStartRow() { return startRow; } @@ -204,7 +216,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan { + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow)) + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow)) + ", filter=" + (getScanFilter() == null ? null : getScanFilter().toString()) - + "]"; + + ", regionServer=" + regionServer + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java index 96f0c4a..dbeced3 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.store.hbase.HBaseStoragePlugin; +import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig; import org.apache.drill.exec.util.VectorUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -43,6 +44,10 @@ public class BaseHBaseTest extends BaseTestQuery { protected static Configuration conf = HBaseConfiguration.create(); + protected static HBaseStoragePlugin storagePlugin; + + protected static HBaseStoragePluginConfig storagePluginConfig; + @Rule public TestName TEST_NAME = new TestName(); private int[] columnWidths = new int[] { 8 }; @@ -58,11 +63,13 @@ public class BaseHBaseTest extends BaseTestQuery { * Change the following to HBaseTestsSuite.configure(false, true) * if you want to test against an externally running HBase cluster. */ - HBaseTestsSuite.configure(false, true); - + HBaseTestsSuite.configure(true, true); HBaseTestsSuite.initCluster(); - HBaseStoragePlugin plugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase"); - plugin.getConfig().setZookeeperPort(HBaseTestsSuite.getZookeeperPort()); + + storagePlugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase"); + storagePluginConfig = storagePlugin.getConfig(); + + storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort()); } @AfterClass @@ -77,7 +84,7 @@ public class BaseHBaseTest extends BaseTestQuery { protected void setColumnWidths(int[] columnWidths) { this.columnWidths = columnWidths; } - + protected String getPlanText(String planFile, String tableName) throws IOException { return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8) .replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", "\"hbase.zookeeper.property.clientPort\" : " + HBaseTestsSuite.getZookeeperPort()) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java index e30f79e..67e6f87 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java @@ -36,7 +36,9 @@ import org.junit.runners.Suite.SuiteClasses; @SuiteClasses({ HBaseRecordReaderTest.class, TestHBaseFilterPushDown.class, - TestHBaseProjectPushDown.class}) + TestHBaseProjectPushDown.class, + TestHBaseRegionScanAssignments.class +}) public class HBaseTestsSuite { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class); @@ -147,4 +149,8 @@ public class HBaseTestsSuite { HBaseTestsSuite.createTables = createTables; } + public static HBaseAdmin getAdmin() { + return admin; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java new file mode 100644 index 0000000..71cb604 --- /dev/null +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.LinkedList; +import java.util.List; +import java.util.NavigableMap; + +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.hbase.HBaseGroupScan; +import org.apache.drill.exec.store.hbase.HBaseScanSpec; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class TestHBaseRegionScanAssignments extends BaseHBaseTest { + static final String HOST_A = "A"; + static final String HOST_B = "B"; + static final String HOST_C = "C"; + static final String HOST_D = "D"; + static final String HOST_E = "E"; + static final String HOST_F = "F"; + static final String HOST_G = "G"; + static final String HOST_H = "H"; + static final String HOST_I = "I"; + static final String HOST_J = "J"; + static final String HOST_K = "K"; + static final String HOST_L = "L"; + static final String HOST_M = "M"; + + static final String HOST_X = "X"; + + static final String PORT_AND_STARTTIME = ",60020,1400265190186"; + + static final ServerName SERVER_A = new ServerName(HOST_A + PORT_AND_STARTTIME); + static final ServerName SERVER_B = new ServerName(HOST_B + PORT_AND_STARTTIME); + static final ServerName SERVER_C = new ServerName(HOST_C + PORT_AND_STARTTIME); + static final ServerName SERVER_D = new ServerName(HOST_D + PORT_AND_STARTTIME); + static final ServerName SERVER_E = new ServerName(HOST_E + PORT_AND_STARTTIME); + static final ServerName SERVER_F = new ServerName(HOST_F + PORT_AND_STARTTIME); + static final ServerName SERVER_G = new ServerName(HOST_G + PORT_AND_STARTTIME); + static final ServerName SERVER_H = new ServerName(HOST_H + PORT_AND_STARTTIME); + static final ServerName SERVER_I = new ServerName(HOST_I + PORT_AND_STARTTIME); + + static final ServerName SERVER_X = new ServerName(HOST_X + PORT_AND_STARTTIME); + + static final byte[][] splits = {{}, + "10".getBytes(), "15".getBytes(), "20".getBytes(), "25".getBytes(), "30".getBytes(), "35".getBytes(), + "40".getBytes(), "45".getBytes(), "50".getBytes(), "55".getBytes(), "60".getBytes(), "65".getBytes(), + "70".getBytes(), "75".getBytes(), "80".getBytes(), "85".getBytes(), "90".getBytes(), "95".getBytes()}; + + static final String TABLE_NAME = "TestTable"; + static final byte[] TABLE_NAME_BYTES = TABLE_NAME.getBytes(); + + /** + * Has the same name as the {@link BeforeClass} method of the parent so that + * we do not start MiniHBase cluster as it is not required for these tests. + * @throws Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // do nothing + } + + @Test + public void testHBaseGroupScanAssignmentMix() throws Exception { + NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap(); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_C); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_D); + + final List<DrillbitEndpoint> endpoints = Lists.newArrayList(); + final DrillbitEndpoint DB_A = DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build(); + endpoints.add(DB_A); + endpoints.add(DB_A); + final DrillbitEndpoint DB_B = DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build(); + endpoints.add(DB_B); + final DrillbitEndpoint DB_D = DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build(); + endpoints.add(DB_D); + final DrillbitEndpoint DB_X = DrillbitEndpoint.newBuilder().setAddress(HOST_X).setControlPort(1234).build(); + endpoints.add(DB_X); + + HBaseGroupScan scan = new HBaseGroupScan(); + scan.setRegionsToScan(regionsToScan); + scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null)); + scan.applyAssignments(endpoints); + + int i = 0; + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A' + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B' + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'X' + testParallelizationWidth(scan, i); + } + + @Test + public void testHBaseGroupScanAssignmentSomeAfinedWithOrphans() throws Exception { + NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap(); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_C); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_C); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[8]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[8], splits[9]), SERVER_E); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[9], splits[10]), SERVER_E); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[10], splits[11]), SERVER_F); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[11], splits[12]), SERVER_F); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[12], splits[13]), SERVER_G); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[13], splits[14]), SERVER_G); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[14], splits[15]), SERVER_H); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[15], splits[16]), SERVER_H); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[16], splits[17]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[17], splits[0]), SERVER_A); + + final List<DrillbitEndpoint> endpoints = Lists.newArrayList(); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_I).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_J).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_K).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_L).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_M).setControlPort(1234).build()); + + HBaseGroupScan scan = new HBaseGroupScan(); + scan.setRegionsToScan(regionsToScan); + scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null)); + scan.applyAssignments(endpoints); + + LinkedList<Integer> sizes = Lists.newLinkedList(); + sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); + sizes.add(2); sizes.add(2); sizes.add(2); sizes.add(2); sizes.add(2); + for (int i = 0; i < endpoints.size(); i++) { + assertTrue(sizes.remove((Integer)scan.getSpecificScan(i).getRegionScanSpecList().size())); + } + assertEquals(0, sizes.size()); + testParallelizationWidth(scan, endpoints.size()); + } + + @Test + public void testHBaseGroupScanAssignmentOneEach() throws Exception { + NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap(); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[8]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[8], splits[0]), SERVER_A); + + final List<DrillbitEndpoint> endpoints = Lists.newArrayList(); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build()); + + HBaseGroupScan scan = new HBaseGroupScan(); + scan.setRegionsToScan(regionsToScan); + scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null)); + scan.applyAssignments(endpoints); + + int i = 0; + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'E' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'F' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'G' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'H' + testParallelizationWidth(scan, i); + } + + @Test + public void testHBaseGroupScanAssignmentNoAfinity() throws Exception { + NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap(); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_X); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_X); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_X); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_X); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_X); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_X); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_X); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_X); + + final List<DrillbitEndpoint> endpoints = Lists.newArrayList(); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build()); + + HBaseGroupScan scan = new HBaseGroupScan(); + scan.setRegionsToScan(regionsToScan); + scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null)); + scan.applyAssignments(endpoints); + + int i = 0; + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'E' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'F' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'G' + assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'H' + testParallelizationWidth(scan, i); + } + + @Test + public void testHBaseGroupScanAssignmentAllPreferred() throws Exception { + NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap(); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_C); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_C); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_D); + + final List<DrillbitEndpoint> endpoints = Lists.newArrayList(); + final DrillbitEndpoint DB_A = DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build(); + endpoints.add(DB_A); + final DrillbitEndpoint DB_B = DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build(); + endpoints.add(DB_B); + final DrillbitEndpoint DB_D = DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build(); + endpoints.add(DB_D); + final DrillbitEndpoint DB_X = DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build(); + endpoints.add(DB_X); + + HBaseGroupScan scan = new HBaseGroupScan(); + scan.setRegionsToScan(regionsToScan); + scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null)); + scan.applyAssignments(endpoints); + + int i = 0; + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A' + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B' + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C' + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D' + testParallelizationWidth(scan, i); + } + + private void testParallelizationWidth(HBaseGroupScan scan, int i) { + try { + scan.getSpecificScan(i); + fail("Should not have " + i + "th assignment or you have not enabled Java assertion."); + } catch (AssertionError e) { } + } +}
