Author: khorgath
Date: Mon Oct 10 21:58:07 2011
New Revision: 1181260
URL: http://svn.apache.org/viewvc?rev=1181260&view=rev
Log:
HCATALOG-75 Input storage driver for HBase (avandana via khorgath)
Added:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1181260&r1=1181259&r2=1181260&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Oct 10 21:58:07 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-75. Input storage driver for HBase (avandana via khorgath)
+
HCAT-73. Output Storage Driver for HBase (Direct PUTs) (toffer via khorgath)
HCAT-74. ResultConverter for HBase Storage Drivers (avandana via khorgath)
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1181260&r1=1181259&r2=1181260&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
Mon Oct 10 21:58:07 2011
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -48,22 +49,12 @@ public class InitializeInput {
/** The prefix for keys used for storage driver arguments */
static final String HCAT_KEY_PREFIX = "hcat.";
- private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class);
+ private static HiveConf hiveConf;
private static HiveMetaStoreClient createHiveMetaClient(Configuration conf,
InputJobInfo inputJobInfo) throws Exception {
- if (inputJobInfo.getServerUri() != null){
- hiveConf.set("hive.metastore.local", "false");
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname,
inputJobInfo.getServerUri());
- }
-
- String kerberosPrincipal = inputJobInfo.getServerKerberosPrincipal();
- if(kerberosPrincipal != null){
- hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname,
true);
- hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
kerberosPrincipal);
- }
-
- return new HiveMetaStoreClient(hiveConf,null);
+ hiveConf = getHiveConf(inputJobInfo, conf);
+ return new HiveMetaStoreClient(hiveConf, null);
}
/**
@@ -207,4 +198,82 @@ public class InitializeInput {
return new StorerInfo(inputSDClass, outputSDClass, hcatProperties);
}
+ static HiveConf getHiveConf(InputJobInfo iInfo, Configuration conf)
+ throws IOException {
+
+ HiveConf hiveConf = new HiveConf(HCatInputFormat.class);
+
+ if (iInfo.getServerUri() != null) {
+ // User specified a thrift url
+
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.set(ConfVars.METASTOREURIS.varname, iInfo.getServerUri());
+
+ String kerberosPrincipal = iInfo.getServerKerberosPrincipal();
+ if (kerberosPrincipal != null) {
+ hiveConf.setBoolean(
+ HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname,
+ true);
+ hiveConf.set(
+ HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+ kerberosPrincipal);
+ } else {
+
+ kerberosPrincipal = conf
+ .get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
+
+ if (kerberosPrincipal == null) {
+ kerberosPrincipal = conf
+
.get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);
+ }
+ if (kerberosPrincipal != null) {
+ hiveConf.setBoolean(
+ ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
+ hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+ kerberosPrincipal);
+ }
+
+ if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ hiveConf.set("hive.metastore.token.signature",
+ conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
+ }
+ }
+
+ } else {
+ // Thrift url is null, copy the hive conf into the job conf and
+ // restore it
+ // in the backend context
+
+ if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) {
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+ HCatUtil.serialize(hiveConf.getAllProperties()));
+ } else {
+ // Copy configuration properties into the hive conf
+ Properties properties = (Properties) HCatUtil.deserialize(conf
+ .get(HCatConstants.HCAT_KEY_HIVE_CONF));
+
+ for (Map.Entry<Object, Object> prop : properties.entrySet()) {
+ if (prop.getValue() instanceof String) {
+ hiveConf.set((String) prop.getKey(),
+ (String) prop.getValue());
+ } else if (prop.getValue() instanceof Integer) {
+ hiveConf.setInt((String) prop.getKey(),
+ (Integer) prop.getValue());
+ } else if (prop.getValue() instanceof Boolean) {
+ hiveConf.setBoolean((String) prop.getKey(),
+ (Boolean) prop.getValue());
+ } else if (prop.getValue() instanceof Long) {
+ hiveConf.setLong((String) prop.getKey(),
+ (Long) prop.getValue());
+ } else if (prop.getValue() instanceof Float) {
+ hiveConf.setFloat((String) prop.getKey(),
+ (Float) prop.getValue());
+ }
+ }
+ }
+
+ }
+ return hiveConf;
+ }
+
}
Added:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1181260&view=auto
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
(added)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
Mon Oct 10 21:58:07 2011
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
+ */
+class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
+
+ private final TableInputFormat inputFormat;
+
+ public HBaseInputFormat() {
+ inputFormat = new TableInputFormat();
+ }
+
+ /*
+ * @param instance of InputSplit
+ *
+ * @param instance of TaskAttemptContext
+ *
+ * @return RecordReader
+ *
+ * @throws IOException
+ *
+ * @throws InterruptedException
+ *
+ * @see
+ * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
+ * .hadoop.mapreduce.InputSplit,
+ * org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+ InputSplit split, TaskAttemptContext tac) throws IOException,
+ InterruptedException {
+ return inputFormat.createRecordReader(split, tac);
+ }
+
+ /*
+ * @param jobContext
+ *
+ * @return List of InputSplit
+ *
+ * @throws IOException
+ *
+ * @throws InterruptedException
+ *
+ * @see
+ *
org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce
+ * .JobContext)
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ return inputFormat.getSplits(jobContext);
+ }
+
+ public void setConf(Configuration conf) {
+ inputFormat.setConf(conf);
+ }
+
+ public Scan getScan() {
+ return inputFormat.getScan();
+ }
+
+ public void setScan(Scan scan) {
+ inputFormat.setScan(scan);
+ }
+
+}
Added:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java?rev=1181260&view=auto
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
(added)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
Mon Oct 10 21:58:07 2011
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+
+/**
+ * The Class HBaseInputStorageDriver enables reading of HBase tables through
+ * HCatalog.
+ */
+public class HBaseInputStorageDriver extends HCatInputStorageDriver {
+ private HCatTableInfo tableInfo;
+ private ResultConverter converter;
+ private HCatSchema outputColSchema;
+ private HCatSchema dataSchema;
+ private Configuration jobConf;
+
+ /*
+ * @param JobContext
+ *
+ * @param hcatProperties
+ *
+ * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
+ * #initialize(org.apache.hadoop.mapreduce.JobContext,
java.util.Properties)
+ */
+ @Override
+ public void initialize(JobContext context, Properties hcatProperties) {
+ jobConf = context.getConfiguration();
+ try {
+ String jobString = context.getConfiguration().get(
+ HCatConstants.HCAT_KEY_JOB_INFO);
+ if (jobString == null) {
+ throw new IOException(
+ "InputJobInfo information not found in JobContext. "
+ + "HCatInputFormat.setInput() not called?");
+ }
+ InputJobInfo jobInfo = (InputJobInfo) HCatUtil
+ .deserialize(jobString);
+ tableInfo = jobInfo.getTableInfo();
+ dataSchema = tableInfo.getDataColumns();
+ List<FieldSchema> fields = HCatUtil
+ .getFieldSchemaList(outputColSchema.getFields());
+ hcatProperties.setProperty(Constants.LIST_COLUMNS,
+ MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+ hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
+ MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+ converter = new HBaseSerDeResultConverter(dataSchema,
+ outputColSchema, hcatProperties);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ /*
+ * @param hcatProperties
+ *
+ * @return InputFormat
+ *
+ * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
+ * #getInputFormat(java.util.Properties)
+ */
+ @Override
+ public InputFormat<ImmutableBytesWritable, Result> getInputFormat(
+ Properties hcatProperties) {
+ HBaseInputFormat tableInputFormat = new HBaseInputFormat();
+ jobConf.set(TableInputFormat.INPUT_TABLE, tableInfo.getTableName());
+ tableInputFormat.setConf(jobConf);
+ // TODO: Make the caching configurable by the user
+ tableInputFormat.getScan().setCaching(200);
+ tableInputFormat.getScan().setCacheBlocks(false);
+ return tableInputFormat;
+ }
+
+ /*
+ * @param baseKey
+ *
+ * @param baseValue
+ *
+ * @return HCatRecord
+ *
+ * @throws IOException
+ *
+ * @see
+ * org.apache.hcatalog.mapreduce.HCatInputStorageDriver#convertToHCatRecord
+ * (org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
+ */
+ @Override
+ public HCatRecord convertToHCatRecord(WritableComparable baseKey,
+ Writable baseValue) throws IOException {
+ return this.converter.convert((Result) baseValue);
+ }
+
+ /*
+ * @param jobContext
+ *
+ * @param howlSchema
+ *
+ * @throws IOException
+ *
+ * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver#
+ * setOutputSchema(org.apache.hadoop.mapreduce.JobContext,
+ * org.apache.hcatalog.data.schema.HCatSchema)
+ */
+ @Override
+ public void setOutputSchema(JobContext jobContext, HCatSchema howlSchema)
+ throws IOException {
+ outputColSchema = howlSchema;
+ }
+
+ /*
+ * @param jobContext
+ *
+ * @param partitionValues
+ *
+ * @throws IOException
+ *
+ * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
+ * #setPartitionValues(org.apache.hadoop.mapreduce.JobContext,
+ * java.util.Map)
+ */
+ @Override
+ public void setPartitionValues(JobContext jobContext,
+ Map<String, String> partitionValues) throws IOException {
+ }
+
+ /*
+ * @param jobContext
+ *
+ * @param hcatSchema
+ *
+ * @throws IOException
+ *
+ * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
+ * #setOriginalSchema(org.apache.hadoop.mapreduce.JobContext,
+ * org.apache.hcatalog.data.schema.HCatSchema)
+ */
+ @Override
+ public void setOriginalSchema(JobContext jobContext, HCatSchema hcatSchema)
+ throws IOException {
+ this.dataSchema = hcatSchema;
+ }
+}
Added:
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java?rev=1181260&view=auto
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
(added)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
Mon Oct 10 21:58:07 2011
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.junit.Test;
+
+public class TestHBaseInputStorageDriver extends SkeletonHBaseTest {
+
+ private final byte[] FAMILY = Bytes.toBytes("testFamily");
+ private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
+ private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2");
+ private final String tableName = "mytesttable";
+
+ List<Put> generatePuts(int num) {
+ List<Put> myPuts = new ArrayList<Put>();
+ for (int i = 0; i < num; i++) {
+ Put put = new Put(Bytes.toBytes("testRow" + i));
+ put.add(FAMILY, QUALIFIER1, 0,
+ Bytes.toBytes("testQualifier1-" + "textValue-" + i));
+ put.add(FAMILY, QUALIFIER2, 0,
+ Bytes.toBytes("testQualifier2-" + "textValue-" + i));
+ myPuts.add(put);
+ }
+ return myPuts;
+ }
+
+ private void registerHBaseTable(String tableName) throws Exception {
+
+ String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+ HiveMetaStoreClient client = getCluster().getHiveMetaStoreClient();
+ try {
+ client.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ } // can fail with NoSuchObjectException
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
+ tbl.setPartitionKeys(new ArrayList<FieldSchema>());
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tableParams.put(HCatConstants.HCAT_ISD_CLASS,
+ HBaseInputStorageDriver.class.getName());
+ tableParams.put(HCatConstants.HCAT_OSD_CLASS, "NotRequired");
+ tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,
+ ":key,testFamily:testQualifier1,testFamily:testQualifier2");
+ tableParams.put(Constants.SERIALIZATION_FORMAT, "9");
+ tableParams.put(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
+ tbl.setParameters(tableParams);
+
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(HCatUtil.getFieldSchemaList(getSchema().getFields()));
+ sd.setBucketCols(new ArrayList<String>(3));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters()
+ .put(Constants.SERIALIZATION_FORMAT, "9");
+ sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
+ sd.setInputFormat(HBaseInputFormat.class.getName());
+ sd.setOutputFormat("NotRequired");
+
+ tbl.setSd(sd);
+ client.createTable(tbl);
+
+ }
+
+ public void populateTable() throws IOException {
+ List<Put> myPuts = generatePuts(10);
+ HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tableName));
+ table.put(myPuts);
+ }
+
+ @Test
+ public void TestHBaseTableReadMR() throws Exception {
+
+ Configuration conf = new Configuration();
+ // include hbase config in conf file
+ for (Map.Entry<String, String> el : getHbaseConf()) {
+ if (el.getKey().startsWith("hbase.")) {
+ conf.set(el.getKey(), el.getValue());
+ }
+ }
+
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+ HCatUtil.serialize(getHiveConf().getAllProperties()));
+
+ // create Hbase table using admin
+ createTable(tableName, new String[] { "testFamily" });
+ registerHBaseTable(tableName);
+ populateTable();
+ // output settings
+ Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead");
+ FileSystem fs = getFileSystem();
+ if (fs.exists(outputDir)) {
+ fs.delete(outputDir, true);
+ }
+ // create job
+ Job job = new Job(conf, "hbase-mr-read-test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapReadHTable.class);
+
+ job.getConfiguration().set(TableInputFormat.INPUT_TABLE, tableName);
+
+ job.setInputFormatClass(HCatInputFormat.class);
+ InputJobInfo inputJobInfo = InputJobInfo.create(
+ MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null,
+ null);
+ HCatInputFormat.setOutputSchema(job, getSchema());
+ HCatInputFormat.setInput(job, inputJobInfo);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+ assertTrue(MapReadHTable.error == false);
+ }
+
+ public static class MapReadHTable
+ extends
+ Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable,
Text> {
+
+ static boolean error = false;
+
+ @Override
+ public void map(ImmutableBytesWritable key, HCatRecord value,
+ Context context) throws IOException, InterruptedException {
+ boolean correctValues = (value.size() == 3)
+ && (value.get(0).toString()).startsWith("testRow")
+ && (value.get(1).toString()).startsWith("testQualifier1")
+ && (value.get(2).toString()).startsWith("testQualifier2");
+
+ if (correctValues == false) {
+ error = true;
+ }
+ }
+ }
+
+ private HCatSchema getSchema() throws HCatException {
+
+ HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+ schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING,
+ ""));
+ schema.append(new HCatFieldSchema("testqualifier1",
+ HCatFieldSchema.Type.STRING, ""));
+ schema.append(new HCatFieldSchema("testqualifier2",
+ HCatFieldSchema.Type.STRING, ""));
+ return schema;
+ }
+}