Author: khorgath Date: Mon Aug 4 19:17:53 2014 New Revision: 1615730 URL: http://svn.apache.org/r1615730 Log: HIVE-6584 : Add HiveHBaseTableSnapshotInputFormat (Nick Dimiduk, reviewed by Navis Ryu, Sushanth Sowmyan)
Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java hive/trunk/pom.xml hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Aug 4 19:17:53 2014 @@ -1256,6 +1256,9 @@ public class HiveConf extends Configurat "Disabling this improves HBase write performance at the risk of lost writes in case of a crash."), HIVE_HBASE_GENERATE_HFILES("hive.hbase.generatehfiles", false, "True when HBaseStorageHandler should generate hfiles instead of operate against the online table."), + HIVE_HBASE_SNAPSHOT_NAME("hive.hbase.snapshot.name", null, "The HBase table snapshot name to use."), + HIVE_HBASE_SNAPSHOT_RESTORE_DIR("hive.hbase.snapshot.restoredir", "/tmp", "The directory in which to " + + "restore the HBase table snapshot."), // For har files HIVEARCHIVEENABLED("hive.archive.enabled", false, "Whether archiving operations are permitted"), Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java (original) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java Mon Aug 4 19:17:53 2014 @@ -31,46 +31,86 @@ import org.apache.hadoop.mapred.InputSpl * HBaseSplit augments FileSplit with HBase column mapping. */ public class HBaseSplit extends FileSplit implements InputSplit { - private final TableSplit split; + private final TableSplit tableSplit; + private final InputSplit snapshotSplit; + private boolean isTableSplit; // should be final but Writable + + /** + * For Writable + */ public HBaseSplit() { super((Path) null, 0, 0, (String[]) null); - split = new TableSplit(); + tableSplit = new TableSplit(); + snapshotSplit = HBaseTableSnapshotInputFormatUtil.createTableSnapshotRegionSplit(); } - public HBaseSplit(TableSplit split, Path dummyPath) { + public HBaseSplit(TableSplit tableSplit, Path dummyPath) { super(dummyPath, 0, 0, (String[]) null); - this.split = split; + this.tableSplit = tableSplit; + this.snapshotSplit = HBaseTableSnapshotInputFormatUtil.createTableSnapshotRegionSplit(); + this.isTableSplit = true; } - public TableSplit getSplit() { - return this.split; + /** + * TODO: use TableSnapshotRegionSplit HBASE-11555 is fixed. + */ + public HBaseSplit(InputSplit snapshotSplit, Path dummyPath) { + super(dummyPath, 0, 0, (String[]) null); + this.tableSplit = new TableSplit(); + this.snapshotSplit = snapshotSplit; + this.isTableSplit = false; } - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - split.readFields(in); + public TableSplit getTableSplit() { + assert isTableSplit; + return this.tableSplit; + } + + public InputSplit getSnapshotSplit() { + assert !isTableSplit; + return this.snapshotSplit; } @Override public String toString() { - return "TableSplit " + split; + return "" + (isTableSplit ? tableSplit : snapshotSplit); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.isTableSplit = in.readBoolean(); + if (this.isTableSplit) { + tableSplit.readFields(in); + } else { + snapshotSplit.readFields(in); + } } @Override public void write(DataOutput out) throws IOException { super.write(out); - split.write(out); + out.writeBoolean(isTableSplit); + if (isTableSplit) { + tableSplit.write(out); + } else { + snapshotSplit.write(out); + } } @Override public long getLength() { - return split.getLength(); + long val = 0; + try { + val = isTableSplit ? tableSplit.getLength() : snapshotSplit.getLength(); + } finally { + return val; + } } @Override public String[] getLocations() throws IOException { - return split.getLocations(); + return isTableSplit ? tableSplit.getLocations() : snapshotSplit.getLocations(); } } Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Mon Aug 4 19:17:53 2014 @@ -29,7 +29,10 @@ import java.util.Properties; import java.util.Set; import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -69,6 +72,19 @@ import org.apache.hadoop.util.StringUtil public class HBaseStorageHandler extends DefaultStorageHandler implements HiveMetaHook, HiveStoragePredicateHandler { + private static final Log LOG = LogFactory.getLog(HBaseStorageHandler.class); + + /** HBase-internal config by which input format receives snapshot name. */ + private static final String HBASE_SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; + /** HBase-internal config by which input format received restore dir before HBASE-11335. */ + private static final String HBASE_SNAPSHOT_TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir"; + /** HBase-internal config by which input format received restore dir after HBASE-11335. */ + private static final String HBASE_SNAPSHOT_RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; + /** HBase config by which a SlabCache is sized. */ + private static final String HBASE_OFFHEAP_PCT_KEY = "hbase.offheapcache.percentage"; + /** HBase config by which a BucketCache is sized. */ + private static final String HBASE_BUCKETCACHE_SIZE_KEY = "hbase.bucketcache.size"; + final static public String DEFAULT_PREFIX = "default."; //Check if the configure job properties is called from input @@ -258,6 +274,11 @@ public class HBaseStorageHandler extends @Override public Class<? extends InputFormat> getInputFormatClass() { + if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_NAME) != null) { + LOG.debug("Using TableSnapshotInputFormat"); + return HiveHBaseTableSnapshotInputFormat.class; + } + LOG.debug("Using HiveHBaseTableInputFormat"); return HiveHBaseTableInputFormat.class; } @@ -342,6 +363,37 @@ public class HBaseStorageHandler extends // do this for reconciling HBaseStorageHandler for use in HCatalog // check to see if this an input job or an outputjob if (this.configureInputJobProps) { + String snapshotName = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_NAME); + if (snapshotName != null) { + HBaseTableSnapshotInputFormatUtil.assertSupportsTableSnapshots(); + + try { + String restoreDir = + HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_RESTORE_DIR); + if (restoreDir == null) { + throw new IllegalArgumentException( + "Cannot process HBase snapshot without specifying " + HiveConf.ConfVars + .HIVE_HBASE_SNAPSHOT_RESTORE_DIR); + } + + HBaseTableSnapshotInputFormatUtil.configureJob(hbaseConf, snapshotName, new Path(restoreDir)); + // copy over configs touched by above method + jobProperties.put(HBASE_SNAPSHOT_NAME_KEY, hbaseConf.get(HBASE_SNAPSHOT_NAME_KEY)); + if (hbaseConf.get(HBASE_SNAPSHOT_TABLE_DIR_KEY, null) != null) { + jobProperties.put(HBASE_SNAPSHOT_TABLE_DIR_KEY, hbaseConf.get(HBASE_SNAPSHOT_TABLE_DIR_KEY)); + } else { + jobProperties.put(HBASE_SNAPSHOT_RESTORE_DIR_KEY, hbaseConf.get(HBASE_SNAPSHOT_RESTORE_DIR_KEY)); + } + + TableMapReduceUtil.resetCacheConfig(hbaseConf); + // copy over configs touched by above method + jobProperties.put(HBASE_OFFHEAP_PCT_KEY, hbaseConf.get(HBASE_OFFHEAP_PCT_KEY)); + jobProperties.put(HBASE_BUCKETCACHE_SIZE_KEY, hbaseConf.get(HBASE_BUCKETCACHE_SIZE_KEY)); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + for (String k : jobProperties.keySet()) { jobConf.set(k, jobProperties.get(k)); } @@ -415,7 +467,8 @@ public class HBaseStorageHandler extends * only need TableMapReduceUtil.addDependencyJars(jobConf) here. */ TableMapReduceUtil.addDependencyJars( - jobConf, HBaseStorageHandler.class, TableInputFormatBase.class); + jobConf, HBaseStorageHandler.class, TableInputFormatBase.class, + org.cliffc.high_scale_lib.Counter.class); // this will be removed for HBase 1.0 Set<String> merged = new LinkedHashSet<String>(jobConf.getStringCollection("tmpjars")); Job copy = new Job(jobConf); Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java?rev=1615730&view=auto ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java (added) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java Mon Aug 4 19:17:53 2014 @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * A helper class to isolate newer HBase features from users running against older versions of + * HBase that don't provide those features. + * + * TODO: remove this class when it's okay to drop support for earlier version of HBase. + */ +public class HBaseTableSnapshotInputFormatUtil { + + private static final Log LOG = LogFactory.getLog(HBaseTableSnapshotInputFormatUtil.class); + + /** The class we look for to determine if hbase snapshots are supported. */ + private static final String TABLESNAPSHOTINPUTFORMAT_CLASS + = "org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl"; + + private static final String TABLESNAPSHOTREGIONSPLIT_CLASS + = "org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat$TableSnapshotRegionSplit"; + + /** True when {@link #TABLESNAPSHOTINPUTFORMAT_CLASS} is present. */ + private static final boolean SUPPORTS_TABLE_SNAPSHOTS; + + static { + boolean support = false; + try { + Class<?> clazz = Class.forName(TABLESNAPSHOTINPUTFORMAT_CLASS); + support = clazz != null; + } catch (ClassNotFoundException e) { + // pass + } + SUPPORTS_TABLE_SNAPSHOTS = support; + } + + /** Return true when the HBase runtime supports {@link HiveHBaseTableSnapshotInputFormat}. */ + public static void assertSupportsTableSnapshots() { + if (!SUPPORTS_TABLE_SNAPSHOTS) { + throw new RuntimeException("This version of HBase does not support Hive over table " + + "snapshots. Please upgrade to at least HBase 0.98.3 or later. See HIVE-6584 for details."); + } + } + + /** + * Configures {@code conf} for the snapshot job. Call only when + * {@link #assertSupportsTableSnapshots()} returns true. + */ + public static void configureJob(Configuration conf, String snapshotName, Path restoreDir) + throws IOException { + TableSnapshotInputFormatImpl.setInput(conf, snapshotName, restoreDir); + } + + /** + * Create a bare TableSnapshotRegionSplit. Needed because Writables require a + * default-constructed instance to hydrate from the DataInput. + * + * TODO: remove once HBASE-11555 is fixed. + */ + public static InputSplit createTableSnapshotRegionSplit() { + try { + assertSupportsTableSnapshots(); + } catch (RuntimeException e) { + LOG.debug("Probably don't support table snapshots. Returning null instance.", e); + return null; + } + + try { + Class<? extends InputSplit> resultType = + (Class<? extends InputSplit>) Class.forName(TABLESNAPSHOTREGIONSPLIT_CLASS); + Constructor<? extends InputSplit> cxtor = resultType.getDeclaredConstructor(new Class[]{}); + cxtor.setAccessible(true); + return cxtor.newInstance(new Object[]{}); + } catch (ClassNotFoundException e) { + throw new UnsupportedOperationException( + "Unable to find " + TABLESNAPSHOTREGIONSPLIT_CLASS, e); + } catch (IllegalAccessException e) { + throw new UnsupportedOperationException( + "Unable to access specified class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e); + } catch (InstantiationException e) { + throw new UnsupportedOperationException( + "Unable to instantiate specified class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e); + } catch (InvocationTargetException e) { + throw new UnsupportedOperationException( + "Constructor threw an exception for " + TABLESNAPSHOTREGIONSPLIT_CLASS, e); + } catch (NoSuchMethodException e) { + throw new UnsupportedOperationException( + "Unable to find suitable constructor for class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e); + } + } +} Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java?rev=1615730&view=auto ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java (added) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java Mon Aug 4 19:17:53 2014 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat. + */ +class HiveHBaseInputFormatUtil { + + /** + * Parse {@code jobConf} to create the target {@link HTable} instance. + */ + public static HTable getTable(JobConf jobConf) throws IOException { + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); + return new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)); + } + + /** + * Parse {@code jobConf} to create a {@link Scan} instance. + */ + public static Scan getScan(JobConf jobConf) throws IOException { + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); + List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); + ColumnMappings columnMappings; + + try { + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); + } catch (SerDeException e) { + throw new IOException(e); + } + + if (columnMappings.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } + + boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); + Scan scan = new Scan(); + boolean empty = true; + + // The list of families that have been added to the scan + List<String> addedFamilies = new ArrayList<String>(); + + if (!readAllColumns) { + ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); + for (int i : readColIDs) { + ColumnMapping colMap = columnsMapping[i]; + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + addedFamilies.add(colMap.familyName); + } else { + if(!addedFamilies.contains(colMap.familyName)){ + // add only if the corresponding family has not already been added + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); + } + } + + empty = false; + } + } + + // The HBase table's row key maps to a Hive table column. In the corner case when only the + // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ + // column qualifier will have been added to the scan. We arbitrarily add at least one column + // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive + // tables column projection. + if (empty) { + for (ColumnMapping colMap: columnMappings) { + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + } else { + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); + } + + if (!readAllColumns) { + break; + } + } + } + + String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); + if (scanCache != null) { + scan.setCaching(Integer.valueOf(scanCache)); + } + String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); + if (scanCacheBlocks != null) { + scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); + } + String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); + if (scanBatch != null) { + scan.setBatch(Integer.valueOf(scanBatch)); + } + return scan; + } + + public static boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ + + String[] mapInfo = spec.split("#"); + boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat); + + switch (mapInfo.length) { + case 1: + return tblLevelDefault; + + case 2: + String storageType = mapInfo[1]; + if(storageType.equals("-")) { + return tblLevelDefault; + } else if ("string".startsWith(storageType)){ + return false; + } else if ("binary".startsWith(storageType)){ + return true; + } + + default: + throw new IOException("Malformed string: " + spec); + } + } +} Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Mon Aug 4 19:17:53 2014 @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.Ex import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -88,90 +87,11 @@ public class HiveHBaseTableInputFormat e final Reporter reporter) throws IOException { HBaseSplit hbaseSplit = (HBaseSplit) split; - TableSplit tableSplit = hbaseSplit.getSplit(); - String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); - String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); - boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); - List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); - ColumnMappings columnMappings; - - try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); - } catch (SerDeException e) { - throw new IOException(e); - } + TableSplit tableSplit = hbaseSplit.getTableSplit(); - if (columnMappings.size() < readColIDs.size()) { - throw new IOException("Cannot read more columns than the given table contains."); - } + setHTable(HiveHBaseInputFormatUtil.getTable(jobConf)); + setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); - boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); - Scan scan = new Scan(); - boolean empty = true; - - // The list of families that have been added to the scan - List<String> addedFamilies = new ArrayList<String>(); - - if (!readAllColumns) { - ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); - for (int i : readColIDs) { - ColumnMapping colMap = columnsMapping[i]; - if (colMap.hbaseRowKey) { - continue; - } - - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - addedFamilies.add(colMap.familyName); - } else { - if(!addedFamilies.contains(colMap.familyName)){ - // add only if the corresponding family has not already been added - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } - } - - empty = false; - } - } - - // The HBase table's row key maps to a Hive table column. In the corner case when only the - // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ - // column qualifier will have been added to the scan. We arbitrarily add at least one column - // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive - // tables column projection. - if (empty) { - for (ColumnMapping colMap: columnMappings) { - if (colMap.hbaseRowKey) { - continue; - } - - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - } else { - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } - - if (!readAllColumns) { - break; - } - } - } - - String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); - if (scanCache != null) { - scan.setCaching(Integer.valueOf(scanCache)); - } - String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); - if (scanCacheBlocks != null) { - scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); - } - String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); - if (scanBatch != null) { - scan.setBatch(Integer.valueOf(scanBatch)); - } - - setScan(scan); Job job = new Job(jobConf); TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext( job.getConfiguration(), reporter); @@ -443,12 +363,12 @@ public class HiveHBaseTableInputFormat e boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); if (hbaseColumnsMapping == null) { - throw new IOException("hbase.columns.mapping required for HBase Table."); + throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table."); } ColumnMappings columnMappings = null; try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching); + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } @@ -463,10 +383,9 @@ public class HiveHBaseTableInputFormat e // definition into account and excludes regions which don't satisfy // the start/stop row conditions (HBASE-1829). Scan scan = createFilterScan(jobConf, iKey, - getStorageFormatOfKey(keyMapping.mappingSpec, + HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); - // The list of families that have been added to the scan List<String> addedFamilies = new ArrayList<String>(); @@ -503,28 +422,4 @@ public class HiveHBaseTableInputFormat e return results; } - - private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ - - String[] mapInfo = spec.split("#"); - boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat) ? true : false; - - switch (mapInfo.length) { - case 1: - return tblLevelDefault; - - case 2: - String storageType = mapInfo[1]; - if(storageType.equals("-")) { - return tblLevelDefault; - } else if ("string".startsWith(storageType)){ - return false; - } else if ("binary".startsWith(storageType)){ - return true; - } - - default: - throw new IOException("Malformed string: " + spec); - } - } } Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java?rev=1615730&view=auto ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java (added) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java Mon Aug 4 19:17:53 2014 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableInputFormat; +import org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.List; + +public class HiveHBaseTableSnapshotInputFormat + implements InputFormat<ImmutableBytesWritable, ResultWritable> { + + TableSnapshotInputFormat delegate = new TableSnapshotInputFormat(); + + private static void setColumns(JobConf job) throws IOException { + // hbase mapred API doesn't support scan at the moment. + Scan scan = HiveHBaseInputFormatUtil.getScan(job); + byte[][] families = scan.getFamilies(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < families.length; i++) { + if (i > 0) sb.append(" "); + sb.append(Bytes.toString(families[i])); + } + job.set(TableInputFormat.COLUMN_LIST, sb.toString()); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + setColumns(job); + + // hive depends on FileSplits, so wrap in HBaseSplit + Path[] tablePaths = FileInputFormat.getInputPaths(job); + + InputSplit [] results = delegate.getSplits(job, numSplits); + for (int i = 0; i < results.length; i++) { + results[i] = new HBaseSplit(results[i], tablePaths[0]); + } + + return results; + } + + @Override + public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + setColumns(job); + final RecordReader<ImmutableBytesWritable, Result> rr = + delegate.getRecordReader(((HBaseSplit) split).getSnapshotSplit(), job, reporter); + + return new RecordReader<ImmutableBytesWritable, ResultWritable>() { + @Override + public boolean next(ImmutableBytesWritable key, ResultWritable value) throws IOException { + return rr.next(key, value.getResult()); + } + + @Override + public ImmutableBytesWritable createKey() { + return rr.createKey(); + } + + @Override + public ResultWritable createValue() { + return new ResultWritable(rr.createValue()); + } + + @Override + public long getPos() throws IOException { + return rr.getPos(); + } + + @Override + public void close() throws IOException { + rr.close(); + } + + @Override + public float getProgress() throws IOException { + return rr.getProgress(); + } + }; + } +} Added: hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q?rev=1615730&view=auto ============================================================================== --- hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q (added) +++ hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q Mon Aug 4 19:17:53 2014 @@ -0,0 +1,4 @@ +SET hive.hbase.snapshot.name=src_hbase_snapshot; +SET hive.hbase.snapshot.restoredir=/tmp; + +SELECT * FROM src_hbase LIMIT 5; Modified: hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out (original) +++ hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out Mon Aug 4 19:17:53 2014 @@ -63,8 +63,8 @@ Table Parameters: # Storage Information SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe -InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat -OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat +InputFormat: null +OutputFormat: null Compressed: No Num Buckets: -1 Bucket Columns: [] Modified: hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out (original) +++ hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out Mon Aug 4 19:17:53 2014 @@ -63,8 +63,8 @@ Table Parameters: # Storage Information SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe -InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat -OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat +InputFormat: null +OutputFormat: null Compressed: No Num Buckets: -1 Bucket Columns: [] @@ -238,8 +238,8 @@ Table Parameters: # Storage Information SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe -InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat -OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat +InputFormat: null +OutputFormat: null Compressed: No Num Buckets: -1 Bucket Columns: [] Added: hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out?rev=1615730&view=auto ============================================================================== --- hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out (added) +++ hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out Mon Aug 4 19:17:53 2014 @@ -0,0 +1,13 @@ +PREHOOK: query: SELECT * FROM src_hbase LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src_hbase +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM src_hbase LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_hbase +#### A masked pattern was here #### +0 val_0 +10 val_10 +100 val_100 +103 val_103 +104 val_104 Modified: hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm (original) +++ hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm Mon Aug 4 19:17:53 2014 @@ -27,7 +27,6 @@ import java.util.*; import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; import org.apache.hadoop.hive.hbase.HBaseQTestUtil; import org.apache.hadoop.hive.hbase.HBaseTestSetup; -import org.apache.hadoop.hive.ql.session.SessionState; public class $className extends TestCase { Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java (original) +++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java Mon Aug 4 19:17:53 2014 @@ -17,24 +17,98 @@ */ package org.apache.hadoop.hive.hbase; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; + +import java.util.List; /** * HBaseQTestUtil initializes HBase-specific test fixtures. */ public class HBaseQTestUtil extends QTestUtil { + + /** Name of the HBase table, in both Hive and HBase. */ + public static String HBASE_SRC_NAME = "src_hbase"; + + /** Name of the table snapshot. */ + public static String HBASE_SRC_SNAPSHOT_NAME = "src_hbase_snapshot"; + + /** A handle to this harness's cluster */ + private final HConnection conn; + public HBaseQTestUtil( String outDir, String logDir, MiniClusterType miniMr, HBaseTestSetup setup) throws Exception { super(outDir, logDir, miniMr, null); setup.preTest(conf); + this.conn = setup.getConnection(); super.init(); } + /** return true when HBase table snapshot exists, false otherwise. */ + private static boolean hbaseTableSnapshotExists(HBaseAdmin admin, String snapshotName) throws + Exception { + List<HBaseProtos.SnapshotDescription> snapshots = + admin.listSnapshots(".*" + snapshotName + ".*"); + for (HBaseProtos.SnapshotDescription sn : snapshots) { + if (sn.getName().equals(HBASE_SRC_SNAPSHOT_NAME)) { + return true; + } + } + return false; + } + @Override public void init() throws Exception { // defer } + + @Override + public void createSources() throws Exception { + super.createSources(); + + conf.setBoolean("hive.test.init.phase", true); + + // create and load the input data into the hbase table + runCreateTableCmd( + "CREATE TABLE " + HBASE_SRC_NAME + "(key INT, value STRING)" + + " STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:val')" + + " TBLPROPERTIES ('hbase.table.name' = '" + HBASE_SRC_NAME + "')" + ); + runCmd("INSERT OVERWRITE TABLE " + HBASE_SRC_NAME + " SELECT * FROM src"); + + // create a snapshot + HBaseAdmin admin = null; + try { + admin = new HBaseAdmin(conn.getConfiguration()); + admin.snapshot(HBASE_SRC_SNAPSHOT_NAME, HBASE_SRC_NAME); + } finally { + if (admin != null) admin.close(); + } + + conf.setBoolean("hive.test.init.phase", false); + } + + @Override + public void cleanUp() throws Exception { + super.cleanUp(); + + // drop in case leftover from unsuccessful run + db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, HBASE_SRC_NAME); + + HBaseAdmin admin = null; + try { + admin = new HBaseAdmin(conn.getConfiguration()); + if (hbaseTableSnapshotExists(admin, HBASE_SRC_SNAPSHOT_NAME)) { + admin.deleteSnapshot(HBASE_SRC_SNAPSHOT_NAME); + } + } finally { + if (admin != null) admin.close(); + } + } } Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java (original) +++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java Mon Aug 4 19:17:53 2014 @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.hbase; -import java.io.File; import java.io.IOException; import java.net.ServerSocket; import java.util.Arrays; @@ -29,12 +28,13 @@ import junit.framework.Test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.conf.HiveConf; @@ -50,6 +50,7 @@ public class HBaseTestSetup extends Test private MiniHBaseCluster hbaseCluster; private int zooKeeperPort; private String hbaseRoot; + private HConnection hbaseConn; private static final int NUM_REGIONSERVERS = 1; @@ -57,6 +58,10 @@ public class HBaseTestSetup extends Test super(test); } + public HConnection getConnection() { + return this.hbaseConn; + } + void preTest(HiveConf conf) throws Exception { setUpFixtures(conf); @@ -97,27 +102,23 @@ public class HBaseTestSetup extends Test hbaseConf.setInt("hbase.regionserver.info.port", -1); hbaseCluster = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS); conf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort()); + hbaseConn = HConnectionManager.createConnection(hbaseConf); + // opening the META table ensures that cluster is running - new HTable(hbaseConf, HConstants.META_TABLE_NAME); - createHBaseTable(hbaseConf); + HTableInterface meta = null; + try { + meta = hbaseConn.getTable(TableName.META_TABLE_NAME); + } finally { + if (meta != null) meta.close(); + } + createHBaseTable(); } - private void createHBaseTable(Configuration hbaseConf) throws IOException { + private void createHBaseTable() throws IOException { final String HBASE_TABLE_NAME = "HiveExternalTable"; HTableDescriptor htableDesc = new HTableDescriptor(HBASE_TABLE_NAME.getBytes()); HColumnDescriptor hcolDesc = new HColumnDescriptor("cf".getBytes()); htableDesc.addFamily(hcolDesc); - HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConf); - if(Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)){ - // if table is already in there, don't recreate. - return; - } - hbaseAdmin.createTable(htableDesc); - HTable htable = new HTable(hbaseConf, HBASE_TABLE_NAME); - - // data - Put [] puts = new Put [] { - new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes()) }; boolean [] booleans = new boolean [] { true, false, true }; byte [] bytes = new byte [] { Byte.MIN_VALUE, -1, Byte.MAX_VALUE }; @@ -128,18 +129,37 @@ public class HBaseTestSetup extends Test float [] floats = new float [] { Float.MIN_VALUE, -1.0F, Float.MAX_VALUE }; double [] doubles = new double [] { Double.MIN_VALUE, -1.0, Double.MAX_VALUE }; - // store data - for (int i = 0; i < puts.length; i++) { - puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i])); - puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte [] { bytes[i] }); - puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i])); - puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i])); - puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i])); - puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i])); - puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i])); - puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i])); - - htable.put(puts[i]); + HBaseAdmin hbaseAdmin = null; + HTableInterface htable = null; + try { + hbaseAdmin = new HBaseAdmin(hbaseConn.getConfiguration()); + if (Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)) { + // if table is already in there, don't recreate. + return; + } + hbaseAdmin.createTable(htableDesc); + htable = hbaseConn.getTable(HBASE_TABLE_NAME); + + // data + Put[] puts = new Put[]{ + new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes())}; + + // store data + for (int i = 0; i < puts.length; i++) { + puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i])); + puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte[]{bytes[i]}); + puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i])); + puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i])); + puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i])); + puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i])); + puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i])); + puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i])); + + htable.put(puts[i]); + } + } finally { + if (htable != null) htable.close(); + if (hbaseAdmin != null) hbaseAdmin.close(); } } @@ -152,6 +172,10 @@ public class HBaseTestSetup extends Test @Override protected void tearDown() throws Exception { + if (hbaseConn != null) { + hbaseConn.close(); + hbaseConn = null; + } if (hbaseCluster != null) { HConnectionManager.deleteAllConnections(true); hbaseCluster.shutdown(); Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original) +++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Mon Aug 4 19:17:53 2014 @@ -130,7 +130,7 @@ public class QTestUtil { public static final HashSet<String> srcTables = new HashSet<String>(); private static MiniClusterType clusterType = MiniClusterType.none; private ParseDriver pd; - private Hive db; + protected Hive db; protected HiveConf conf; private Driver drv; private BaseSemanticAnalyzer sem; @@ -630,7 +630,7 @@ public class QTestUtil { return; } - private void runCreateTableCmd(String createTableCmd) throws Exception { + protected void runCreateTableCmd(String createTableCmd) throws Exception { int ecode = 0; ecode = drv.run(createTableCmd).getResponseCode(); if (ecode != 0) { @@ -641,7 +641,7 @@ public class QTestUtil { return; } - private void runCmd(String cmd) throws Exception { + protected void runCmd(String cmd) throws Exception { int ecode = 0; ecode = drv.run(cmd).getResponseCode(); drv.close(); Modified: hive/trunk/pom.xml URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/pom.xml (original) +++ hive/trunk/pom.xml Mon Aug 4 19:17:53 2014 @@ -113,8 +113,8 @@ <hadoop-20S.version>1.2.1</hadoop-20S.version> <hadoop-23.version>2.4.0</hadoop-23.version> <hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path> - <hbase.hadoop1.version>0.96.0-hadoop1</hbase.hadoop1.version> - <hbase.hadoop2.version>0.96.0-hadoop2</hbase.hadoop2.version> + <hbase.hadoop1.version>0.98.3-hadoop1</hbase.hadoop1.version> + <hbase.hadoop2.version>0.98.3-hadoop2</hbase.hadoop2.version> <!-- httpcomponents are not always in version sync --> <httpcomponents.client.version>4.2.5</httpcomponents.client.version> <httpcomponents.core.version>4.2.5</httpcomponents.core.version> @@ -774,7 +774,7 @@ <test.warehouse.dir>${test.warehouse.scheme}${test.warehouse.dir}</test.warehouse.dir> <java.net.preferIPv4Stack>true</java.net.preferIPv4Stack> <!-- EnforceReadOnlyTables hook and QTestUtil --> - <test.src.tables>src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc</test.src.tables> + <test.src.tables>src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase</test.src.tables> <java.security.krb5.conf>${test.tmp.dir}/conf/krb5.conf</java.security.krb5.conf> </systemPropertyVariables> </configuration> Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1615730&r1=1615729&r2=1615730&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Mon Aug 4 19:17:53 2014 @@ -3924,12 +3924,16 @@ public class DDLTask extends Task<DDLWor tbl.setInputFormatClass(crtTbl.getInputFormat()); tbl.setOutputFormatClass(crtTbl.getOutputFormat()); - tbl.getTTable().getSd().setInputFormat( - tbl.getInputFormatClass().getName()); - tbl.getTTable().getSd().setOutputFormat( - tbl.getOutputFormatClass().getName()); + // only persist input/ouput format to metadata when it is explicitly specified. + // Otherwise, load lazily via StorageHandler at query time. + if (crtTbl.getInputFormat() != null && !crtTbl.getInputFormat().isEmpty()) { + tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName()); + } + if (crtTbl.getOutputFormat() != null && !crtTbl.getOutputFormat().isEmpty()) { + tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName()); + } - if (!Utilities.isDefaultNameNode(conf)) { + if (!Utilities.isDefaultNameNode(conf) && tbl.getTTable().getSd().isSetLocation()) { // If location is specified - ensure that it is a full qualified name makeLocationQualified(tbl.getDbName(), tbl.getTTable().getSd(), tbl.getTableName()); }