DRILL-4199: Add Support for HBase 1.X
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1882d938 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1882d938 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1882d938 Branch: refs/heads/master Commit: 1882d938f0ed94679f1a2766181eca2a02eb555a Parents: 5fa9ba3 Author: Aditya <adi...@mapr.com> Authored: Wed Mar 30 17:55:59 2016 -0700 Committer: Aditya Kishore <a...@apache.org> Committed: Fri Sep 9 10:08:41 2016 -0700 ---------------------------------------------------------------------- contrib/format-maprdb/pom.xml | 2 +- .../exec/store/mapr/db/MapRDBFormatPlugin.java | 20 +++++++++- .../exec/store/mapr/db/MapRDBGroupScan.java | 4 +- .../store/mapr/db/MapRDBScanBatchCreator.java | 6 +-- .../drill/exec/store/mapr/db/MapRDBSubScan.java | 39 +++++++++++--------- .../mapr/db/binary/BinaryTableGroupScan.java | 30 +++++++-------- .../db/binary/CompareFunctionsProcessor.java | 12 +++--- .../drill/maprdb/tests/json/BaseJsonTest.java | 2 +- 8 files changed, 67 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/pom.xml b/contrib/format-maprdb/pom.xml index d14a2c5..fb01818 100644 --- a/contrib/format-maprdb/pom.xml +++ b/contrib/format-maprdb/pom.xml @@ -32,7 +32,7 @@ <properties> <maprdb-storage-plugin.mapr.version>5.1.0.37817-mapr</maprdb-storage-plugin.mapr.version> - <maprdb-storage-plugin.hbase.version>0.98.12-mapr-1506</maprdb-storage-plugin.hbase.version> + <maprdb-storage-plugin.hbase.version>1.1.1-mapr-1602-m7-5.1.0</maprdb-storage-plugin.hbase.version> <maprdb-storage-plugin.hadoop.version>2.7.0-mapr-1602</maprdb-storage-plugin.hadoop.version> <maprdb.TestSuite>**/MaprDBTestsSuite.class</maprdb.TestSuite> </properties> http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java index 9fe16e4..755ae4f 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java @@ -35,6 +35,9 @@ import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec; import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.ImmutableSet; @@ -44,11 +47,16 @@ public class MapRDBFormatPlugin extends TableFormatPlugin { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBFormatPlugin.class); private final MapRDBFormatMatcher matcher; + private final Configuration hbaseConf; + private final Connection connection; public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf, - StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) { + StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) throws IOException { super(name, context, fsConf, storageConfig, formatConfig); matcher = new MapRDBFormatMatcher(this); + hbaseConf = HBaseConfiguration.create(fsConf); + hbaseConf.set(ConnectionFactory.DEFAULT_DB, ConnectionFactory.MAPR_ENGINE2); + connection = ConnectionFactory.createConnection(hbaseConf); } @Override @@ -79,4 +87,14 @@ public class MapRDBFormatPlugin extends TableFormatPlugin { } } + @JsonIgnore + public Configuration getHBaseConf() { + return hbaseConf; + } + + @JsonIgnore + public Connection getConnection() { + return connection; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java index 8563b78..e6c71e0 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java @@ -49,9 +49,9 @@ import com.google.common.collect.Sets; public abstract class MapRDBGroupScan extends AbstractGroupScan { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class); - private FileSystemPlugin storagePlugin; + protected FileSystemPlugin storagePlugin; - private MapRDBFormatPlugin formatPlugin; + protected MapRDBFormatPlugin formatPlugin; protected MapRDBFormatPluginConfig formatPluginConfig; http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java index 1d51223..c989bb0 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java @@ -29,8 +29,6 @@ import org.apache.drill.exec.store.hbase.HBaseRecordReader; import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec; import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan; import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -42,11 +40,11 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{ public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); List<RecordReader> readers = Lists.newArrayList(); - Configuration conf = HBaseConfiguration.create(); for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){ try { if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) { - readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context)); + readers.add(new HBaseRecordReader(subScan.getFormatPlugin().getConnection(), + getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context)); } else { readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context)); } http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java index dea6867..794141c 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java @@ -44,37 +44,37 @@ public class MapRDBSubScan extends AbstractBase implements SubScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class); @JsonProperty - public final StoragePluginConfig storage; + public final StoragePluginConfig storageConfig; @JsonIgnore - private final MapRDBFormatPluginConfig fsFormatPluginConfig; - private final FileSystemPlugin fsStoragePlugin; + private final MapRDBFormatPluginConfig formatPluginConfig; + private final FileSystemPlugin storagePlugin; private final List<MapRDBSubScanSpec> regionScanSpecList; private final List<SchemaPath> columns; private final String tableType; + private final MapRDBFormatPlugin formatPlugin; + @JsonCreator public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, @JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig, - @JsonProperty("storage") StoragePluginConfig storage, + @JsonProperty("storageConfig") StoragePluginConfig storage, @JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList, @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("tableType") String tableType) throws ExecutionSetupException { - super(userName); - this.fsFormatPluginConfig = formatPluginConfig; - this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage); - this.regionScanSpecList = regionScanSpecList; - this.storage = storage; - this.columns = columns; - this.tableType = tableType; + this(userName, formatPluginConfig, + (FileSystemPlugin) registry.getPlugin(storage), + storage, regionScanSpecList, columns, tableType); } - public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config, + public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig storageConfig, List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) { super(userName); - fsFormatPluginConfig = formatPluginConfig; - fsStoragePlugin = storagePlugin; - storage = config; + this.storageConfig = storageConfig; + this.storagePlugin = storagePlugin; + this.formatPluginConfig = formatPluginConfig; + this.formatPlugin = (MapRDBFormatPlugin) storagePlugin.getFormatPlugin(formatPluginConfig); + this.regionScanSpecList = maprSubScanSpecs; this.columns = columns; this.tableType = tableType; @@ -101,7 +101,7 @@ public class MapRDBSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); - return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType); + return new MapRDBSubScan(getUserName(), formatPluginConfig, storagePlugin, storageConfig, regionScanSpecList, columns, tableType); } @Override @@ -119,7 +119,12 @@ public class MapRDBSubScan extends AbstractBase implements SubScan { } public MapRDBFormatPluginConfig getFormatPluginConfig() { - return fsFormatPluginConfig; + return formatPluginConfig; + } + + @JsonIgnore + public MapRDBFormatPlugin getFormatPlugin() { + return formatPlugin; } } http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java index a597995..4eaeee7 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java @@ -21,8 +21,6 @@ import static org.apache.drill.exec.store.mapr.db.util.CommonFns.isNullOrEmpty; import java.io.IOException; import java.util.List; -import java.util.Map.Entry; -import java.util.NavigableMap; import java.util.TreeMap; import org.apache.drill.common.exceptions.DrillRuntimeException; @@ -45,11 +43,12 @@ import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; import org.apache.drill.exec.store.mapr.db.MapRDBTableStats; import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.RegionLocator; import org.codehaus.jackson.annotate.JsonCreator; import com.fasterxml.jackson.annotation.JacksonInject; @@ -112,23 +111,22 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC private void init() { logger.debug("Getting region locations"); - try { - Configuration conf = HBaseConfiguration.create(); - HTable table = new HTable(conf, hbaseScanSpec.getTableName()); - tableStats = new MapRDBTableStats(conf, hbaseScanSpec.getTableName()); - this.hTableDesc = table.getTableDescriptor(); - NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); - table.close(); + TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName()); + try (Admin admin = formatPlugin.getConnection().getAdmin(); + RegionLocator locator = formatPlugin.getConnection().getRegionLocator(tableName)) { + hTableDesc = admin.getTableDescriptor(tableName); + tableStats = new MapRDBTableStats(getHBaseConf(), hbaseScanSpec.getTableName()); boolean foundStartRegion = false; regionsToScan = new TreeMap<TabletFragmentInfo, String>(); - for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) { - HRegionInfo regionInfo = mapEntry.getKey(); + List<HRegionLocation> regionLocations = locator.getAllRegionLocations(); + for (HRegionLocation regionLocation : regionLocations) { + HRegionInfo regionInfo = regionLocation.getRegionInfo(); if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) { continue; } foundStartRegion = true; - regionsToScan.put(new TabletFragmentInfo(regionInfo), mapEntry.getValue().getHostname()); + regionsToScan.put(new TabletFragmentInfo(regionInfo), regionLocation.getHostname()); if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) { break; } @@ -191,7 +189,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC @JsonIgnore public Configuration getHBaseConf() { - return HBaseConfiguration.create(); + return getFormatPlugin().getHBaseConf(); } @JsonIgnore http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java index 0c901d7..a83abf3 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java @@ -40,7 +40,7 @@ import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; import org.apache.hadoop.hbase.util.Order; import org.apache.hadoop.hbase.util.PositionedByteRange; -import org.apache.hadoop.hbase.util.SimplePositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange; import org.apache.drill.exec.store.hbase.DrillHBaseConstants; import org.apache.hadoop.hbase.HConstants; @@ -256,7 +256,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "DOUBLE_OBD": if (valueArg instanceof DoubleExpression) { bb = newByteBuf(9, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING); @@ -271,7 +271,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "FLOAT_OBD": if (valueArg instanceof FloatExpression) { bb = newByteBuf(5, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, ((FloatExpression)valueArg).getFloat(), Order.DESCENDING); @@ -286,7 +286,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "BIGINT_OBD": if (valueArg instanceof LongExpression) { bb = newByteBuf(9, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, ((LongExpression)valueArg).getLong(), Order.DESCENDING); @@ -301,7 +301,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "INT_OBD": if (valueArg instanceof IntExpression) { bb = newByteBuf(5, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, ((IntExpression)valueArg).getInt(), Order.DESCENDING); @@ -317,7 +317,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr if (valueArg instanceof QuotedString) { int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length; bb = newByteBuf(stringLen + 2, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, stringLen + 2); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br, ((QuotedString)valueArg).value, Order.DESCENDING); http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java index 6aafed3..b503b00 100644 --- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.drill.BaseTestQuery; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.rpc.user.QueryDataBatch; -import org.apache.drill.hbase.GuavaPatcher; +import org.apache.drill.exec.util.GuavaPatcher; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass;