Author: hashutosh
Date: Wed Nov 23 18:23:51 2011
New Revision: 1205527
URL: http://svn.apache.org/viewvc?rev=1205527&view=rev
Log:
HCATALOG-160 : HBaseDirectOutputStorageDriver outputVersion isn't consitent
within the same MR job (toffer via hashutosh)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1205527&r1=1205526&r2=1205527&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Nov 23 18:23:51 2011
@@ -79,6 +79,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-160. HBaseDirectOutputStorageDriver outputVersion isn't consitent
within the same MR job (toffer via hashutosh)
+
HCAT-155. HBase bulkOSD requires value to be Put rather than HCatRecord
(toffer via hashutosh)
HCAT-157. HBaseOutputFormat assumes hbase table name is hcat table name
(toffer via hashutosh)
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
Wed Nov 23 18:23:51 2011
@@ -50,6 +50,14 @@ abstract class HBaseBaseOutputStorageDr
protected HCatSchema schema;
protected HCatSchema outputSchema;
+ /**
+ * Subclasses are required to serialize OutputJobInfo back into
jobContext.
+ * Since initialize() sets some properties in OutputJobInfo, requiring
+ * an update of the instance stored in jobContext.
+ * @param context the job context object
+ * @param hcatProperties the properties for the storage driver
+ * @throws IOException
+ */
@Override
public void initialize(JobContext context, Properties hcatProperties)
throws IOException {
hcatProperties = (Properties)hcatProperties.clone();
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
Wed Nov 23 18:23:51 2011
@@ -22,6 +22,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
import java.io.IOException;
import java.util.Properties;
@@ -36,6 +38,7 @@ public class HBaseDirectOutputStorageDri
@Override
public void initialize(JobContext context, Properties hcatProperties)
throws IOException {
super.initialize(context, hcatProperties);
+ context.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
HCatUtil.serialize(outputJobInfo));
outputFormat = new HBaseDirectOutputFormat();
outputFormat.setConf(context.getConfiguration());
}
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Wed Nov 23 18:23:51 2011
@@ -23,37 +23,27 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.hbase.HBaseSerDe;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -64,95 +54,39 @@ import static org.junit.Assert.assertTru
*/
public class TestHBaseDirectOutputStorageDriver extends SkeletonHBaseTest {
- private void registerHBaseTable(String tableName) throws Exception {
+ private final HiveConf allConf;
+ private final HCatDriver hcatDriver;
- String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ;
- HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf());
+ public TestHBaseDirectOutputStorageDriver() {
+ allConf = getHiveConf();
+ allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ allConf.set(HiveConf.ConfVars.HADOOPFS.varname,
getFileSystem().getUri().toString());
+ allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new
Path(getTestDir(),"warehouse").toString());
+
+ //Add hbase properties
+ for (Map.Entry<String, String> el : getHbaseConf())
+ allConf.set(el.getKey(), el.getValue());
+ for (Map.Entry<String, String> el : getJobConf())
+ allConf.set(el.getKey(), el.getValue());
- try {
- client.dropTable(databaseName, tableName);
- } catch(Exception e) {
- } //can fail with NoSuchObjectException
-
-
- Table tbl = new Table();
- tbl.setDbName(databaseName);
- tbl.setTableName(tableName);
- tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
- StorageDescriptor sd = new StorageDescriptor();
-
- sd.setCols(getTableColumns());
- tbl.setPartitionKeys(new ArrayList<FieldSchema>());
-
- tbl.setSd(sd);
-
- sd.setBucketCols(new ArrayList<String>(2));
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setParameters(new HashMap<String, String>());
- sd.getSerdeInfo().getParameters().put(
- Constants.SERIALIZATION_FORMAT, "1");
- sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
- sd.setInputFormat("fillme");
- sd.setOutputFormat(HBaseDirectOutputFormat.class.getName());
-
- Map<String, String> tableParams = new HashMap<String, String>();
- tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme");
- tableParams.put(HCatConstants.HCAT_OSD_CLASS,
HBaseDirectOutputStorageDriver.class.getName());
-
tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish");
- tbl.setParameters(tableParams);
-
- client.createTable(tbl);
- }
-
- protected List<FieldSchema> getTableColumns() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, ""));
- fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""));
- fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""));
- return fields;
- }
-
- private static List<HCatFieldSchema> generateDataColumns() throws
HCatException {
- List<HCatFieldSchema> dataColumns = new ArrayList<HCatFieldSchema>();
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("key", Constants.INT_TYPE_NAME, "")));
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("english", Constants.STRING_TYPE_NAME, "")));
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("spanish", Constants.STRING_TYPE_NAME, "")));
- return dataColumns;
- }
-
- public void test() throws IOException {
- Configuration conf = getHbaseConf();
- String tableName = "my_table";
- byte[] tableNameBytes = Bytes.toBytes(tableName);
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
- createTable(tableName,new String[]{familyName});
- HTable table = new HTable(getHbaseConf(),tableNameBytes);
- byte[] key = Bytes.toBytes("foo");
- byte[] qualifier = Bytes.toBytes("qualifier");
- byte[] val = Bytes.toBytes("bar");
- Put put = new Put(key);
- put.add(familyNameBytes, qualifier, val);
- table.put(put);
- Result result = table.get(new Get(key));
- assertTrue(Bytes.equals(val, result.getValue(familyNameBytes,
qualifier)));
+ SessionState.start(new CliSessionState(allConf));
+ hcatDriver = new HCatDriver();
}
@Test
public void directOutputFormatTest() throws IOException,
ClassNotFoundException, InterruptedException {
- String tableName = newTableName("mrTest");
+ String testName = "directOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
- Configuration conf = new Configuration(getJobConf());
- for(Map.Entry<String,String> el: getHbaseConf()) {
- if(el.getKey().startsWith("hbase.")) {
- conf.set(el.getKey(),el.getValue());
- }
- }
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(allConf.getAllProperties()));
//create table
createTable(tableName,new String[]{familyName});
@@ -164,7 +98,7 @@ public class TestHBaseDirectOutputStorag
// input/output settings
- Path inputPath = new Path(getTestDir(),
"mapred/testHCatMapReduceInput/");
+ Path inputPath = new Path(methodTestDir,"mr_input");
getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
for(String line: data)
@@ -172,7 +106,8 @@ public class TestHBaseDirectOutputStorag
os.close();
//create job
- Job job = new Job(conf, "hcat mapreduce write test");
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWrite.class);
@@ -194,7 +129,7 @@ public class TestHBaseDirectOutputStorag
//verify
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes("my_family"));
+ scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index=0;
for(Result result: scanner) {
@@ -209,61 +144,151 @@ public class TestHBaseDirectOutputStorag
assertEquals(data.length,index);
}
- public static class MapWrite extends Mapper<LongWritable, Text,
BytesWritable, Put> {
+ @Test
+ public void directOutputStorageDriverTest() throws Exception {
+ String testName = "directOutputStorageDriverTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
- @Override
- public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
- String vals[] = value.toString().split(",");
- Put put = new Put(Bytes.toBytes(vals[0]));
+ String databaseName = "default";
+ String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ byte[] tableNameBytes = Bytes.toBytes(tableName);
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(allConf.getAllProperties()));
+
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + "
LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string)
STORED BY " +
+
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES
('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver', " +
+
"'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver'," +
+
"'"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+
+
"'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir,"mr_input");
+ getFileSystem().mkdirs(inputPath);
+ //create multiple files so we can test with multiple mappers
+ for(int i=0;i<data.length;i++) {
+ FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile"+i+".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
+
+ //create job
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapHCatWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
+ long lbTimestamp = System.currentTimeMillis();
+ HCatOutputFormat.setOutput(job,outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Put.class);
+
+ job.setNumReduceTasks(0);
+
+ assertTrue(job.waitForCompletion(true));
+ long ubTimestamp = System.currentTimeMillis();
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index = 0;
+ long prevTimestamp = -1;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
for(int i=1;i<vals.length;i++) {
String pair[] = vals[i].split(":");
- put.add(Bytes.toBytes("my_family"),
- Bytes.toBytes(pair[0]),
- Bytes.toBytes(pair[1]));
+
assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+
assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ long timestamp =
result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
+ if(prevTimestamp < 1)
+ prevTimestamp = timestamp;
+ else
+ assertEquals(prevTimestamp+"="+timestamp,
+ prevTimestamp,
+ timestamp);
+ assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
+ timestamp >= lbTimestamp && timestamp <=
ubTimestamp);
}
- context.write(new BytesWritable(Bytes.toBytes(vals[0])),put);
+ index++;
}
+ assertEquals(data.length,index);
}
@Test
- public void directOutputStorageDriverTest() throws Exception {
- String tableName = newTableName("mrtest");
+ public void directOutputStorageDriverTestWithRevision() throws Exception {
+ String testName = "directOutputStorageDriverTestWithRevision";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String databaseName = "default";
+ String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
- Configuration conf = new Configuration(getJobConf());
- for(Map.Entry<String,String> el: getHbaseConf()) {
- if(el.getKey().startsWith("hbase.")) {
- conf.set(el.getKey(),el.getValue());
- }
- }
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(allConf.getAllProperties()));
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(getHiveConf().getAllProperties()));
- //create table
- createTable(tableName,new String[]{familyName});
- registerHBaseTable(tableName);
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + "
LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string)
STORED BY " +
+
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES
('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver', " +
+
"'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver'," +
+
"'"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+
+
"'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:ONE,spanish:DOS",
- "3,english:ONE,spanish:TRES"};
-
-
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
// input/output settings
- Path inputPath = new Path(getTestDir(),
"mapred/testHCatMapReduceInput/");
+ Path inputPath = new Path(methodTestDir,"mr_input");
getFileSystem().mkdirs(inputPath);
- FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
- for(String line: data)
- os.write(Bytes.toBytes(line + "\n"));
- os.close();
+ //create multiple files so we can test with multiple mappers
+ for(int i=0;i<data.length;i++) {
+ FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile"+i+".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
//create job
- Job job = new Job(conf, "hcat mapreduce write test");
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
@@ -272,7 +297,7 @@ public class TestHBaseDirectOutputStorag
job.setOutputFormatClass(HCatOutputFormat.class);
- OutputJobInfo outputJobInfo =
OutputJobInfo.create(null,tableName,null,null,null);
+ OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
"1");
HCatOutputFormat.setOutput(job,outputJobInfo);
@@ -288,7 +313,7 @@ public class TestHBaseDirectOutputStorag
//verify
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes("my_family"));
+ scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index=0;
for(Result result: scanner) {
@@ -308,8 +333,9 @@ public class TestHBaseDirectOutputStorag
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
+ OutputJobInfo jobInfo =
(OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
HCatRecord record = new DefaultHCatRecord(3);
- HCatSchema schema = new HCatSchema(generateDataColumns());
+ HCatSchema schema = jobInfo.getOutputSchema();
String vals[] = value.toString().split(",");
record.setInteger("key",schema,Integer.parseInt(vals[0]));
for(int i=1;i<vals.length;i++) {
@@ -319,4 +345,20 @@ public class TestHBaseDirectOutputStorag
context.write(null,record);
}
}
+
+ public static class MapWrite extends Mapper<LongWritable, Text,
BytesWritable, Put> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
+ String vals[] = value.toString().split(",");
+ Put put = new Put(Bytes.toBytes(vals[0]));
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ put.add(Bytes.toBytes("my_family"),
+ Bytes.toBytes(pair[0]),
+ Bytes.toBytes(pair[1]));
+ }
+ context.write(new BytesWritable(Bytes.toBytes(vals[0])),put);
+ }
+ }
}