Author: hashutosh
Date: Tue Nov 22 22:35:37 2011
New Revision: 1205202
URL: http://svn.apache.org/viewvc?rev=1205202&view=rev
Log:
HCATALOG-154 && HCATALOG-157. (toffer via hashutosh)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Nov 22 22:35:37 2011
@@ -79,6 +79,10 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-157. HBaseOutputFormat assumes hbase table name is hcat table name
(toffer via hashutosh)
+
+ HCAT-154. HBase bulkOSD and directOSD return inconsistent path for
getOutputLocation() (toffer via hashutosh)
+
HCAT-124. null pointer execption on 'use no_such_db' (hashutosh)
HCAT-125. HCat doesn't support hive's describe database DDL (hashutosh)
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
Tue Nov 22 22:35:37 2011
@@ -73,7 +73,7 @@ abstract class HBaseBaseOutputStorageDr
if(revision == null) {
outputJobInfo.getProperties()
.setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
- new
Path(outputJobInfo.getLocation()).getName());
+
Long.toString(System.currentTimeMillis()));
}
tableInfo = outputJobInfo.getTableInfo();
@@ -89,7 +89,7 @@ abstract class HBaseBaseOutputStorageDr
converter = new HBaseSerDeResultConverter(schema,
outputSchema,
hcatProperties);
-
context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName());
+
context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,
HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo));
}
@Override
@@ -125,7 +125,6 @@ abstract class HBaseBaseOutputStorageDr
@Override
public String getOutputLocation(JobContext jobContext, String
tableLocation, List<String> partitionCols, Map<String, String> partitionValues,
String dynHash) throws IOException {
- //TODO figure out a way to include user specified revision number as
part of dir
- return new Path(tableLocation,
Long.toString(System.currentTimeMillis())).toString();
+ return null;
}
}
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
Tue Nov 22 22:35:37 2011
@@ -24,9 +24,13 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
@@ -36,15 +40,29 @@ import java.util.Properties;
* efficient for large batch writes in comparison to
HBaseDirectOutputStorageDriver.
*/
public class HBaseBulkOutputStorageDriver extends HBaseBaseOutputStorageDriver
{
+ private String PROPERTY_TABLE_LOCATION =
"hcat.hbase.mapreduce.table.location";
+ private String PROPERTY_INT_OUTPUT_LOCATION =
"hcat.hbase.mapreduce.intermediateOutputLocation";
private OutputFormat outputFormat;
private final static ImmutableBytesWritable EMPTY_KEY = new
ImmutableBytesWritable(new byte[0]);
@Override
public void initialize(JobContext context, Properties hcatProperties)
throws IOException {
super.initialize(context, hcatProperties);
- Path outputDir = new Path(outputJobInfo.getLocation());
- context.getConfiguration().set("mapred.output.dir",
outputDir.toString());
+
+ //initialize() gets called multiple time in the lifecycle of an MR
job, client, mapper, reducer, etc
+ //depending on the case we have to make sure for some context
variables we set here that they don't get set again
+
if(!outputJobInfo.getProperties().containsKey(PROPERTY_INT_OUTPUT_LOCATION)) {
+ String location = new
Path(context.getConfiguration().get(PROPERTY_TABLE_LOCATION),
+
"REVISION_"+outputJobInfo.getProperties()
+
.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)).toString();
+
outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
location);
+ //We are writing out an intermediate sequenceFile hence location
is not passed in OutputJobInfo.getLocation()
+ //TODO replace this with a mapreduce constant when available
+ context.getConfiguration().set("mapred.output.dir", location);
+ }
+
outputFormat = new HBaseBulkOutputFormat();
+ context.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
HCatUtil.serialize(outputJobInfo));
}
@Override
@@ -57,4 +75,11 @@ public class HBaseBulkOutputStorageDrive
return EMPTY_KEY;
}
+ @Override
+ public String getOutputLocation(JobContext jobContext, String
tableLocation, List<String> partitionCols, Map<String, String> partitionValues,
String dynHash) throws IOException {
+ //TODO have HCatalog common objects expose more information
+ //this is the only way to pickup table location for storageDrivers
+ jobContext.getConfiguration().set(PROPERTY_TABLE_LOCATION,
tableLocation);
+ return null;
+ }
}
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
Tue Nov 22 22:35:37 2011
@@ -47,11 +47,12 @@ public class HBaseOutputStorageDriver ex
public String getOutputLocation(JobContext jobContext, String
tableLocation, List<String> partitionCols, Map<String, String> partitionValues,
String dynHash) throws IOException {
//sanity check since we can't determine which will be used till
initialize
//and this method gets called before that
- String location = bulkOSD.getOutputLocation(jobContext, tableLocation,
partitionCols, partitionValues, dynHash);
- if(!location.equals(directOSD.getOutputLocation(jobContext,
tableLocation, partitionCols, partitionValues, dynHash))) {
- throw new IOException("bulkOSD and directOSD return inconsistent
path for getOutputLocation()");
+ String l1 = bulkOSD.getOutputLocation(jobContext, tableLocation,
partitionCols, partitionValues, dynHash);
+ String l2 = directOSD.getOutputLocation(jobContext, tableLocation,
partitionCols, partitionValues, dynHash);
+ if(l1 != null || l2 != null) {
+ throw new IOException("bulkOSD or directOSD returns a non-null
path for getOutputLocation()");
}
- return location;
+ return null;
}
@Override
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Tue Nov 22 22:35:37 2011
@@ -11,32 +11,24 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.hbase.HBaseSerDe;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde.Constants;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
@@ -44,7 +36,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -56,67 +47,27 @@ import static org.junit.Assert.assertTru
* Including ImprtSequenceFile, HBaseOutputStorageDrivers and
HBaseBulkOutputFormat
*/
public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest {
- private final String suiteName = "TestHBaseBulkOutputStorageDriver";
+ private final HiveConf allConf;
+ private final HCatDriver hcatDriver;
- private void registerHBaseTable(String tableName) throws Exception {
+ public TestHBaseBulkOutputStorageDriver() {
+ allConf = getHiveConf();
+ allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ allConf.set(HiveConf.ConfVars.HADOOPFS.varname,
getFileSystem().getUri().toString());
+ allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new
Path(getTestDir(),"warehouse").toString());
+
+ //Add hbase properties
+ for (Map.Entry<String, String> el : getHbaseConf())
+ allConf.set(el.getKey(), el.getValue());
+ for (Map.Entry<String, String> el : getJobConf())
+ allConf.set(el.getKey(), el.getValue());
- String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ;
- HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf());
-
- try {
- client.dropTable(databaseName, tableName);
- } catch(Exception e) {
- } //can fail with NoSuchObjectException
-
-
- Table tbl = new Table();
- tbl.setDbName(databaseName);
- tbl.setTableName(tableName);
- tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
- StorageDescriptor sd = new StorageDescriptor();
- sd.setLocation(getTestDir()+"/"+suiteName+"/"+tableName);
- sd.setCols(getTableColumns());
- tbl.setPartitionKeys(new ArrayList<FieldSchema>());
-
- tbl.setSd(sd);
-
- sd.setBucketCols(new ArrayList<String>(2));
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setParameters(new HashMap<String, String>());
- sd.getSerdeInfo().getParameters().put(
- Constants.SERIALIZATION_FORMAT, "1");
- sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
- sd.setInputFormat("fillme");
- sd.setOutputFormat(HBaseBulkOutputFormat.class.getName());
-
- Map<String, String> tableParams = new HashMap<String, String>();
- tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme");
- tableParams.put(HCatConstants.HCAT_OSD_CLASS,
HBaseOutputStorageDriver.class.getName());
-
tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish");
- tbl.setParameters(tableParams);
-
- client.createTable(tbl);
- }
-
- protected List<FieldSchema> getTableColumns() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, ""));
- fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""));
- fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""));
- return fields;
- }
-
- private static List<HCatFieldSchema> generateDataColumns() throws
HCatException {
- List<HCatFieldSchema> dataColumns = new ArrayList<HCatFieldSchema>();
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("key", Constants.INT_TYPE_NAME, "")));
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("english", Constants.STRING_TYPE_NAME, "")));
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("spanish", Constants.STRING_TYPE_NAME, "")));
- return dataColumns;
+ SessionState.start(new CliSessionState(allConf));
+ hcatDriver = new HCatDriver();
}
public static class MapWrite extends Mapper<LongWritable, Text,
ImmutableBytesWritable, Put> {
-
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String vals[] = value.toString().split(",");
@@ -134,8 +85,9 @@ public class TestHBaseBulkOutputStorageD
public static class MapHCatWrite extends Mapper<LongWritable, Text,
ImmutableBytesWritable, HCatRecord> {
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
+ OutputJobInfo jobInfo =
(OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
HCatRecord record = new DefaultHCatRecord(3);
- HCatSchema schema = new HCatSchema(generateDataColumns());
+ HCatSchema schema = jobInfo.getOutputSchema();
String vals[] = value.toString().split(",");
record.setInteger("key",schema,Integer.parseInt(vals[0]));
for(int i=1;i<vals.length;i++) {
@@ -148,18 +100,16 @@ public class TestHBaseBulkOutputStorageD
@Test
public void hbaseBulkOutputFormatTest() throws IOException,
ClassNotFoundException, InterruptedException {
- String tableName = newTableName("hbaseBulkOutputFormatTest");
+ String testName = "hbaseBulkOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
- Configuration conf = new Configuration(getJobConf());
- for(Map.Entry<String,String> el: getHbaseConf()) {
- if(el.getKey().startsWith("hbase.")) {
- conf.set(el.getKey(),el.getValue());
- }
- }
+ Configuration conf = new Configuration(allConf);
//create table
conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
@@ -172,17 +122,17 @@ public class TestHBaseBulkOutputStorageD
// input/output settings
- Path inputPath = new Path(getTestDir(),
"mapred/testHCatMapReduceInput/");
+ Path inputPath = new Path(methodTestDir,"mr_input");
getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
for(String line: data)
os.write(Bytes.toBytes(line + "\n"));
os.close();
- Path interPath = new
Path(getTestDir()+"/hbaseBulkOutputFormatTest/inter");
+ Path interPath = new Path(methodTestDir,"inter");
//create job
- Job job = new Job(conf, "bulk write");
- job.setWorkingDirectory(new
Path(getTestDir(),"hbaseBulkOutputFormatTest_MR"));
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWrite.class);
@@ -205,7 +155,7 @@ public class TestHBaseBulkOutputStorageD
//verify
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes("my_family"));
+ scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index=0;
for(Result result: scanner) {
@@ -225,18 +175,16 @@ public class TestHBaseBulkOutputStorageD
@Test
public void importSequenceFileTest() throws IOException,
ClassNotFoundException, InterruptedException {
- String tableName = newTableName("importSequenceFileTest");
+ String testName = "importSequenceFileTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
- Configuration conf = new Configuration(getJobConf());
- for(Map.Entry<String,String> el: getHbaseConf()) {
- if(el.getKey().startsWith("hbase.")) {
- conf.set(el.getKey(),el.getValue());
- }
- }
+ Configuration conf = new Configuration(allConf);
//create table
createTable(tableName,new String[]{familyName});
@@ -248,19 +196,19 @@ public class TestHBaseBulkOutputStorageD
// input/output settings
- Path inputPath = new Path(getTestDir(),
"mapred/testHCatMapReduceInput/");
+ Path inputPath = new Path(methodTestDir,"mr_input");
getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
for(String line: data)
os.write(Bytes.toBytes(line + "\n"));
os.close();
- Path interPath = new
Path(getTestDir()+"/ImportSequenceFileTest/inter");
- Path scratchPath = new
Path(getTestDir()+"/ImportSequenceFileTest/scratch");
+ Path interPath = new Path(methodTestDir,"inter");
+ Path scratchPath = new Path(methodTestDir,"scratch");
//create job
- Job job = new Job(conf, "sequence file write");
- job.setWorkingDirectory(new
Path(getTestDir(),"importSequenceFileTest_MR"));
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWrite.class);
@@ -285,7 +233,7 @@ public class TestHBaseBulkOutputStorageD
//verify
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes("my_family"));
+ scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index=0;
for(Result result: scanner) {
@@ -304,36 +252,143 @@ public class TestHBaseBulkOutputStorageD
}
@Test
- public void hbaseOutputStorageDriverTestWithRevision() throws Exception {
- String tableName = newTableName("mrtest");
+ public void hbaseBulkOutputStorageDriverTest() throws Exception {
+ String testName = "hbaseBulkOutputStorageDriverTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String databaseName = testName.toLowerCase();
+ String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
- Configuration conf = new Configuration(getJobConf());
- for(Map.Entry<String,String> el: getHbaseConf()) {
- if(el.getKey().startsWith("hbase.")) {
- conf.set(el.getKey(),el.getValue());
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(allConf.getAllProperties()));
+
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + "
LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string)
STORED BY " +
+
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES
('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver', " +
+
"'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver'," +
+
"'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir,"mr_input");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
+ for(String line: data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+
+ //create job
+ Job job = new Job(conf,testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapHCatWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
+ HCatOutputFormat.setOutput(job,outputJobInfo);
+
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+
+ job.setNumReduceTasks(0);
+
+ long ubTimestamp = System.currentTimeMillis();
+ long lbTimestamp = ubTimestamp;
+
+ assertTrue(job.waitForCompletion(true));
+
+ ubTimestamp = System.currentTimeMillis();
+
+ //verify
+ HTable table = new HTable(conf, databaseName+"."+tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ long prevTimestamp = -1;
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+
assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+
assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+
+ //verify revision
+ long timestamp =
result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
+ if(prevTimestamp == -1) {
+ prevTimestamp = timestamp;
+ }
+ else {
+ assertEquals(prevTimestamp+"="+timestamp,
+ prevTimestamp,
+ timestamp);
+ }
+ assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
+ timestamp >= lbTimestamp && timestamp <=
ubTimestamp);
}
+ index++;
}
+ //test if load count is the same
+ assertEquals(data.length,index);
+ }
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(getHiveConf().getAllProperties()));
+ @Test
+ public void hbaseBulkOutputStorageDriverTestWithRevision() throws
Exception {
+ String testName = "hbaseBulkOutputStorageDriverTestWithRevision";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String databaseName = testName.toLowerCase();
+ String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ byte[] tableNameBytes = Bytes.toBytes(tableName);
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
- //create table
- createTable(tableName,new String[]{familyName});
- registerHBaseTable(tableName);
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(allConf.getAllProperties()));
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:ONE,spanish:DOS",
- "3,english:ONE,spanish:TRES"};
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + "
LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string)
STORED BY " +
+
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES
('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver', " +
+
"'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver'," +
+
"'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
// input/output settings
- Path inputPath = new Path(getTestDir(),
"mapred/testHCatMapReduceInput/");
+ Path inputPath = new Path(methodTestDir,"mr_input");
getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
for(String line: data)
@@ -341,8 +396,8 @@ public class TestHBaseBulkOutputStorageD
os.close();
//create job
- Job job = new Job(conf, "hcat mapreduce write test");
- job.setWorkingDirectory(new
Path(getTestDir(),"hbaseOutputStorageDriverTest_MR"));
+ Job job = new Job(conf,testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
@@ -351,7 +406,7 @@ public class TestHBaseBulkOutputStorageD
job.setOutputFormatClass(HCatOutputFormat.class);
- OutputJobInfo outputJobInfo =
OutputJobInfo.create(null,tableName,null,null,null);
+ OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
"1");
HCatOutputFormat.setOutput(job,outputJobInfo);
@@ -366,9 +421,9 @@ public class TestHBaseBulkOutputStorageD
assertTrue(job.waitForCompletion(true));
//verify
- HTable table = new HTable(conf, tableName);
+ HTable table = new HTable(conf, databaseName+"."+tableName);
Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes("my_family"));
+ scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index=0;
for(Result result: scanner) {
@@ -386,47 +441,49 @@ public class TestHBaseBulkOutputStorageD
}
@Test
- public void hbaseOutputStorageDriverTest() throws Exception {
- String tableName = newTableName("mrtest");
+ public void hbaseBulkOutputStorageDriverTestWithDefaultDB() throws
Exception {
+ String testName = "hbaseBulkOutputStorageDriverTestWithDefaultDB";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String databaseName = "default";
+ String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
- Configuration conf = new Configuration(getJobConf());
- for(Map.Entry<String,String> el: getHbaseConf()) {
- if(el.getKey().startsWith("hbase.")) {
- conf.set(el.getKey(),el.getValue());
- }
- }
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(allConf.getAllProperties()));
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(getHiveConf().getAllProperties()));
- //create table
- createTable(tableName,new String[]{familyName});
- registerHBaseTable(tableName);
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + "
LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string)
STORED BY " +
+
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES
('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver', " +
+
"'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver'," +
+
"'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:ONE,spanish:DOS",
- "3,english:ONE,spanish:TRES"};
-
-
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
// input/output settings
- Path inputPath = new Path(getTestDir(),
"mapred/testHCatMapReduceInput/");
+ Path inputPath = new Path(methodTestDir,"mr_input");
getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
for(String line: data)
os.write(Bytes.toBytes(line + "\n"));
os.close();
- long ubTimestamp = System.currentTimeMillis();
- long lbTimestamp = ubTimestamp;
//create job
- Job job = new Job(conf, "hcat mapreduce write test");
- job.setWorkingDirectory(new
Path(getTestDir(),"hbaseOutputStorageDriverTest_MR"));
+ Job job = new Job(conf,testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
@@ -435,11 +492,8 @@ public class TestHBaseBulkOutputStorageD
job.setOutputFormatClass(HCatOutputFormat.class);
- OutputJobInfo outputJobInfo =
OutputJobInfo.create(null,tableName,null,null,null);
+ OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
HCatOutputFormat.setOutput(job,outputJobInfo);
- ubTimestamp = System.currentTimeMillis();
- System.out.println("ub: "+ubTimestamp);
-
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
@@ -454,25 +508,15 @@ public class TestHBaseBulkOutputStorageD
//verify
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes("my_family"));
+ scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index=0;
- Long prevTimestamp = null;
for(Result result: scanner) {
String vals[] = data[index].toString().split(",");
for(int i=1;i<vals.length;i++) {
String pair[] = vals[i].split(":");
assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
- Long timestamp =
result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
- if(prevTimestamp == null)
- prevTimestamp = timestamp;
- else
- assertEquals(prevTimestamp+"="+timestamp,
- prevTimestamp,
- timestamp);
- assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
- timestamp >= lbTimestamp && timestamp <=
ubTimestamp);
}
index++;
}