Author: hashutosh
Date: Tue Nov 22 22:48:38 2011
New Revision: 1205212
URL: http://svn.apache.org/viewvc?rev=1205212&view=rev
Log:
HCATALOG-155. HBase bulkOSD requires value to be Put rather than HCatRecord
(toffer via hashutosh)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1205212&r1=1205211&r2=1205212&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Nov 22 22:48:38 2011
@@ -79,6 +79,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-155. HBase bulkOSD requires value to be Put rather than HCatRecord
(toffer via hashutosh)
+
HCAT-157. HBaseOutputFormat assumes hbase table name is hcat table name
(toffer via hashutosh)
HCAT-154. HBase bulkOSD and directOSD return inconsistent path for
getOutputLocation() (toffer via hashutosh)
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1205212&r1=1205211&r2=1205212&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
Tue Nov 22 22:48:38 2011
@@ -5,11 +5,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -20,24 +22,60 @@ import java.io.IOException;
* are created by the MR job using HFileOutputFormat and then later "moved"
into
* the appropriate region server.
*/
-class HBaseBulkOutputFormat extends
SequenceFileOutputFormat<ImmutableBytesWritable,Put> {
+class HBaseBulkOutputFormat extends OutputFormat<WritableComparable<?>,Put> {
+ private final static ImmutableBytesWritable EMPTY_LIST = new
ImmutableBytesWritable(new byte[0]);
+ private SequenceFileOutputFormat<WritableComparable<?>,Put>
baseOutputFormat;
+
+ public HBaseBulkOutputFormat() {
+ baseOutputFormat = new
SequenceFileOutputFormat<WritableComparable<?>,Put>();
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
+ baseOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public RecordWriter<WritableComparable<?>, Put>
getRecordWriter(TaskAttemptContext context) throws IOException,
InterruptedException {
+ //TODO use a constant/static setter when available
+
context.getConfiguration().setClass("mapred.output.key.class",ImmutableBytesWritable.class,Object.class);
+
context.getConfiguration().setClass("mapred.output.value.class",Put.class,Object.class);
+ return new
HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(context));
+ }
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException {
- return new
HBaseBulkOutputCommitter(FileOutputFormat.getOutputPath(context),context,(FileOutputCommitter)super.getOutputCommitter(context));
+ return new
HBaseBulkOutputCommitter(baseOutputFormat.getOutputCommitter(context));
}
- private static class HBaseBulkOutputCommitter extends FileOutputCommitter {
- FileOutputCommitter baseOutputCommitter;
+ private static class HBaseBulkRecordWriter extends
RecordWriter<WritableComparable<?>,Put> {
+ private RecordWriter<WritableComparable<?>,Put> baseWriter;
- public HBaseBulkOutputCommitter(Path outputPath, TaskAttemptContext
taskAttemptContext,
- FileOutputCommitter
baseOutputCommitter) throws IOException {
- super(outputPath, taskAttemptContext);
+ public HBaseBulkRecordWriter(RecordWriter<WritableComparable<?>,Put>
baseWriter) {
+ this.baseWriter = baseWriter;
+ }
+
+ @Override
+ public void write(WritableComparable<?> key, Put value) throws
IOException, InterruptedException {
+ //we ignore the key
+ baseWriter.write(EMPTY_LIST, value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
+ baseWriter.close(context);
+ }
+ }
+
+ private static class HBaseBulkOutputCommitter extends OutputCommitter {
+ private OutputCommitter baseOutputCommitter;
+
+ public HBaseBulkOutputCommitter(OutputCommitter baseOutputCommitter)
throws IOException {
this.baseOutputCommitter = baseOutputCommitter;
}
@Override
- public void abortTask(TaskAttemptContext context) {
+ public void abortTask(TaskAttemptContext context) throws IOException {
baseOutputCommitter.abortTask(context);
}
@@ -47,11 +85,6 @@ class HBaseBulkOutputFormat extends Sequ
}
@Override
- public Path getWorkPath() throws IOException {
- return baseOutputCommitter.getWorkPath();
- }
-
- @Override
public boolean needsTaskCommit(TaskAttemptContext context) throws
IOException {
return baseOutputCommitter.needsTaskCommit(context);
}
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1205212&r1=1205211&r2=1205212&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Tue Nov 22 22:48:38 2011
@@ -14,11 +14,10 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -34,8 +33,6 @@ import org.apache.hcatalog.mapreduce.Out
import org.junit.Test;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -82,7 +79,7 @@ public class TestHBaseBulkOutputStorageD
}
}
- public static class MapHCatWrite extends Mapper<LongWritable, Text,
ImmutableBytesWritable, HCatRecord> {
+ public static class MapHCatWrite extends Mapper<LongWritable, Text,
BytesWritable, HCatRecord> {
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
OutputJobInfo jobInfo =
(OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
@@ -143,10 +140,10 @@ public class TestHBaseBulkOutputStorageD
SequenceFileOutputFormat.setOutputPath(job,interPath);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(Put.class);
+ job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
+ job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
@@ -287,10 +284,12 @@ public class TestHBaseBulkOutputStorageD
// input/output settings
Path inputPath = new Path(methodTestDir,"mr_input");
getFileSystem().mkdirs(inputPath);
- FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
- for(String line: data)
- os.write(Bytes.toBytes(line + "\n"));
- os.close();
+ //create multiple files so we can test with multiple mappers
+ for(int i=0;i<data.length;i++) {
+ FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile"+i+".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
//create job
Job job = new Job(conf,testName);
@@ -306,11 +305,11 @@ public class TestHBaseBulkOutputStorageD
OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
HCatOutputFormat.setOutput(job,outputJobInfo);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
@@ -390,10 +389,12 @@ public class TestHBaseBulkOutputStorageD
// input/output settings
Path inputPath = new Path(methodTestDir,"mr_input");
getFileSystem().mkdirs(inputPath);
- FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
- for(String line: data)
- os.write(Bytes.toBytes(line + "\n"));
- os.close();
+ //create multiple files so we can test with multiple mappers
+ for(int i=0;i<data.length;i++) {
+ FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile"+i+".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
//create job
Job job = new Job(conf,testName);
@@ -410,11 +411,11 @@ public class TestHBaseBulkOutputStorageD
outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
"1");
HCatOutputFormat.setOutput(job,outputJobInfo);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
@@ -495,11 +496,11 @@ public class TestHBaseBulkOutputStorageD
OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
HCatOutputFormat.setOutput(job,outputJobInfo);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1205212&r1=1205211&r2=1205212&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Tue Nov 22 22:48:38 2011
@@ -183,10 +183,10 @@ public class TestHBaseDirectOutputStorag
job.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,
tableName);
job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(Put.class);
+ job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(Put.class);
+ job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));
@@ -280,7 +280,7 @@ public class TestHBaseDirectOutputStorag
job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(Put.class);
+ job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));