Author: hashutosh
Date: Sat Jan 21 05:12:16 2012
New Revision: 1234272
URL: http://svn.apache.org/viewvc?rev=1234272&view=rev
Log:
HCATALOG-192: HBase output storage driver integration with zookeeper based
revision manager (toffer via hashutosh)
Modified:
incubator/hcatalog/branches/branch-0.3/CHANGES.txt
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
Modified: incubator/hcatalog/branches/branch-0.3/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/CHANGES.txt?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.3/CHANGES.txt Sat Jan 21 05:12:16 2012
@@ -23,6 +23,8 @@ Release 0.3.0 (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-192. HBase output storage driver integration with zookeeper based
revision manager (toffer via hashutosh)
+
HCAT-191. HBase input storage driver integration with zookeeper based
revision manager (avandana via toffer)
HCAT-193. Snapshot class for HCatalog tables. (avandana via toffer)
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
Sat Jan 21 05:12:16 2012
@@ -30,11 +30,14 @@ import org.apache.hcatalog.common.HCatCo
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -62,7 +65,6 @@ abstract class HBaseBaseOutputStorageDr
hcatProperties = (Properties)hcatProperties.clone();
super.initialize(context, hcatProperties);
-
String jobString =
context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
if( jobString == null ) {
throw new IOException("OutputJobInfo information not found in
JobContext. HCatInputFormat.setOutput() not called?");
@@ -75,16 +77,9 @@ abstract class HBaseBaseOutputStorageDr
outputJobInfo.getProperties().putAll(hcatProperties);
hcatProperties = outputJobInfo.getProperties();
-
- String revision =
outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY);
- if(revision == null) {
- outputJobInfo.getProperties()
-
.setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
-
Long.toString(System.currentTimeMillis()));
- }
-
tableInfo = outputJobInfo.getTableInfo();
schema = tableInfo.getDataColumns();
+ String qualifiedTableName =
HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
List<FieldSchema> fields =
HCatUtil.getFieldSchemaList(outputSchema.getFields());
hcatProperties.setProperty(Constants.LIST_COLUMNS,
@@ -92,11 +87,36 @@ abstract class HBaseBaseOutputStorageDr
hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
- //outputSchema should be set by HCatOutputFormat calling setSchema,
prior to initialize being called
- converter = new HBaseSerDeResultConverter(schema,
- outputSchema,
- hcatProperties);
-
context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,
HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo));
+
context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,
qualifiedTableName);
+
+ String txnString =
outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ if(txnString == null) {
+ //outputSchema should be set by HCatOutputFormat calling
setSchema, prior to initialize being called
+ //TODO reconcile output_revision passing to
HBaseSerDeResultConverter
+ //on the first call to this method hcatProperties will not contain
an OUTPUT_VERSION but that doesn't
+ //matter since we won't use any facilities that require that
property set during that run
+ converter = new HBaseSerDeResultConverter(schema,
+
outputSchema,
+
hcatProperties);
+ RevisionManager rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(context.getConfiguration());
+ Transaction txn = null;
+ try {
+ txn = rm.beginWriteTransaction(qualifiedTableName,
+
Arrays.asList(converter.getHBaseScanColumns().split(" ")));
+ } finally {
+ rm.close();
+ }
+ outputJobInfo.getProperties()
+ .setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+ }
+ else {
+ Transaction txn = (Transaction)HCatUtil.deserialize(txnString);
+ converter = new HBaseSerDeResultConverter(schema,
+
outputSchema,
+
hcatProperties,
+
txn.getRevisionNumber());
+ }
}
@Override
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
Sat Jan 21 05:12:16 2012
@@ -1,5 +1,7 @@
package org.apache.hcatalog.hbase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -17,6 +19,12 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
import java.io.IOException;
@@ -28,6 +36,7 @@ import java.io.IOException;
class HBaseBulkOutputFormat extends OutputFormat<WritableComparable<?>,Put> {
private final static ImmutableBytesWritable EMPTY_LIST = new
ImmutableBytesWritable(new byte[0]);
private SequenceFileOutputFormat<WritableComparable<?>,Put>
baseOutputFormat;
+ private final static Log LOG =
LogFactory.getLog(HBaseBulkOutputFormat.class);
public HBaseBulkOutputFormat() {
baseOutputFormat = new
SequenceFileOutputFormat<WritableComparable<?>,Put>();
@@ -110,35 +119,36 @@ class HBaseBulkOutputFormat extends Outp
@Override
public void abortJob(JobContext jobContext, JobStatus.State state)
throws IOException {
+ RevisionManager rm = null;
try {
baseOutputCommitter.abortJob(jobContext,state);
+ rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
+
rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
} finally {
cleanIntermediate(jobContext);
- }
- }
-
- @Override
- public void cleanupJob(JobContext context) throws IOException {
- try {
- baseOutputCommitter.cleanupJob(context);
- } finally {
- cleanIntermediate(context);
+ if(rm != null)
+ rm.close();
}
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
+ RevisionManager rm = null;
try {
baseOutputCommitter.commitJob(jobContext);
Configuration conf = jobContext.getConfiguration();
Path srcPath = FileOutputFormat.getOutputPath(jobContext);
Path destPath = new
Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
ImportSequenceFile.runJob(jobContext,
-
conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
- srcPath,
- destPath);
- } finally {
+
conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
+ srcPath,
+ destPath);
+ rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
+
rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
cleanIntermediate(jobContext);
+ } finally {
+ if(rm != null)
+ rm.close();
}
}
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
Sat Jan 21 05:12:16 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
import java.io.IOException;
import java.util.List;
@@ -55,10 +56,10 @@ public class HBaseBulkOutputStorageDrive
//initialize() gets called multiple time in the lifecycle of an MR
job, client, mapper, reducer, etc
//depending on the case we have to make sure for some context
variables we set here that they don't get set again
if(!outputJobInfo.getProperties().containsKey(PROPERTY_INT_OUTPUT_LOCATION)) {
+ Transaction txn = (Transaction)
+
HCatUtil.deserialize(outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
String tableLocation =
context.getConfiguration().get(PROPERTY_TABLE_LOCATION);
- String location = new Path(tableLocation,
-
"REVISION_"+outputJobInfo.getProperties()
-
.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)).toString();
+ String location = new Path(tableLocation,
"REVISION_"+txn.getRevisionNumber()).toString();
outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
location);
//We are writing out an intermediate sequenceFile hence location
is not passed in OutputJobInfo.getLocation()
//TODO replace this with a mapreduce constant when available
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
Sat Jan 21 05:12:16 2012
@@ -26,8 +26,8 @@ import org.apache.hcatalog.common.HCatCo
*/
class HBaseConstants {
- /** key used to define th version number HBaseOutputStorage driver to use
when writing out data for a job */
- public static final String PROPERTY_OUTPUT_VERSION_KEY =
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputVersion";
+ /** key used to store write transaction object */
+ public static final String PROPERTY_WRITE_TXN_KEY =
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.writeTxn";
/** key used to define the name of the table to write to */
public static final String PROPERTY_OUTPUT_TABLE_NAME_KEY =
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName";
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
Sat Jan 21 05:12:16 2012
@@ -24,10 +24,12 @@ import org.apache.hadoop.hbase.mapreduce
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import java.io.IOException;
@@ -57,7 +59,7 @@ class HBaseDirectOutputFormat extends Ou
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
- return outputFormat.getOutputCommitter(context);
+ return new
HBaseDirectOutputCommitter(outputFormat.getOutputCommitter(context));
}
@Override
@@ -72,4 +74,63 @@ class HBaseDirectOutputFormat extends Ou
public Configuration getConf() {
return outputFormat.getConf();
}
+
+ private static class HBaseDirectOutputCommitter extends OutputCommitter {
+ private OutputCommitter baseOutputCommitter;
+
+ public HBaseDirectOutputCommitter(OutputCommitter baseOutputCommitter)
throws IOException {
+ this.baseOutputCommitter = baseOutputCommitter;
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ baseOutputCommitter.abortTask(context);
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ baseOutputCommitter.commitTask(context);
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context) throws
IOException {
+ return baseOutputCommitter.needsTaskCommit(context);
+ }
+
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ baseOutputCommitter.setupJob(context);
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ baseOutputCommitter.setupTask(context);
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, JobStatus.State state)
throws IOException {
+ RevisionManager rm = null;
+ try {
+ baseOutputCommitter.abortJob(jobContext, state);
+ rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
+
rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ } finally {
+ if(rm != null)
+ rm.close();
+ }
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ RevisionManager rm = null;
+ try {
+ baseOutputCommitter.commitJob(jobContext);
+ rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
+
rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ } finally {
+ if(rm != null)
+ rm.close();
+ }
+ }
+ }
}
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
Sat Jan 21 05:12:16 2012
@@ -55,12 +55,14 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.apache.hcatalog.storagehandler.HCatStorageHandler;
import org.apache.thrift.TBase;
import org.apache.zookeeper.ZooKeeper;
@@ -572,5 +574,26 @@ public class HBaseHCatStorageHandler ext
HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp);
}
+ static Transaction getWriteTransaction(Configuration conf) throws
IOException {
+ OutputJobInfo outputJobInfo =
(OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
+
.getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+ }
+
+ static void setWriteTransaction(Configuration conf, Transaction txn)
throws IOException {
+ OutputJobInfo outputJobInfo =
(OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+
outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
HCatUtil.serialize(txn));
+ conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
HCatUtil.serialize(outputJobInfo));
+ }
+
+ /**
+ * Get the Revision number that will be assigned to this job's output data
+ * @param conf configuration of the job
+ * @return the revision number used
+ * @throws IOException
+ */
+ public static long getOutputRevision(Configuration conf) throws
IOException {
+ return getWriteTransaction(conf).getRevisionNumber();
+ }
}
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
Sat Jan 21 05:12:16 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
@@ -58,6 +59,7 @@ import java.util.Properties;
* {@link HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY}
*/
class HBaseSerDeResultConverter implements ResultConverter {
+
private HBaseSerDe serDe;
private HCatSchema schema;
private HCatSchema outputSchema;
@@ -75,14 +77,25 @@ class HBaseSerDeResultConverter implemen
HBaseSerDeResultConverter(HCatSchema schema,
HCatSchema outputSchema,
Properties hcatProperties) throws
IOException {
+ this(schema,outputSchema,hcatProperties,null);
+ }
+
+ /**
+ * @param schema table schema
+ * @param outputSchema schema of projected output
+ * @param hcatProperties table properties
+ * @param outputVersion value to write in timestamp field
+ * @throws IOException thrown if hive's HBaseSerDe couldn't be initialized
+ */
+ HBaseSerDeResultConverter(HCatSchema schema,
+ HCatSchema outputSchema,
+ Properties hcatProperties,
+ Long outputVersion) throws IOException {
hbaseColumnMapping =
hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,hbaseColumnMapping);
-
if(hcatProperties.containsKey(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY))
- outputVersion =
Long.parseLong(hcatProperties.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY));
- else
- outputVersion = null;
+ this.outputVersion = outputVersion;
this.schema = schema;
if(outputSchema == null) {
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Sat Jan 21 05:12:16 2012
@@ -30,12 +30,16 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -145,6 +149,19 @@ public class TestHBaseBulkOutputStorageD
job.setOutputFormatClass(HBaseBulkOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job,interPath);
+ //manually create transaction
+ RevisionManager rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+ try {
+ OutputJobInfo outputJobInfo = OutputJobInfo.create("default",
tableName, null, null, null);
+ Transaction txn = rm.beginWriteTransaction(tableName,
Arrays.asList(familyName));
+
outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ HCatUtil.serialize(outputJobInfo));
+ } finally {
+ rm.close();
+ }
+
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
@@ -323,113 +340,16 @@ public class TestHBaseBulkOutputStorageD
job.setNumReduceTasks(0);
- long ubTimestamp = System.currentTimeMillis();
- long lbTimestamp = ubTimestamp;
-
assertTrue(job.waitForCompletion(true));
-
- ubTimestamp = System.currentTimeMillis();
-
- //verify
- HTable table = new HTable(conf, databaseName+"."+tableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- long prevTimestamp = -1;
- int index=0;
- for(Result result: scanner) {
- String vals[] = data[index].toString().split(",");
- for(int i=1;i<vals.length;i++) {
- String pair[] = vals[i].split(":");
-
assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
-
assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
-
- //verify revision
- long timestamp =
result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
- if(prevTimestamp == -1) {
- prevTimestamp = timestamp;
- }
- else {
- assertEquals(prevTimestamp+"="+timestamp,
- prevTimestamp,
- timestamp);
- }
- assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
- timestamp >= lbTimestamp && timestamp <=
ubTimestamp);
+ RevisionManager rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot =
rm.createSnapshot(databaseName+"."+tableName);
+ for(String el: snapshot.getColumnFamilies()) {
+ assertEquals(1,snapshot.getRevision(el));
}
- index++;
+ } finally {
+ rm.close();
}
- //test if load count is the same
- assertEquals(data.length,index);
- }
-
- @Test
- public void hbaseBulkOutputStorageDriverTestWithRevision() throws
Exception {
- String testName = "hbaseBulkOutputStorageDriverTestWithRevision";
- Path methodTestDir = new Path(getTestDir(),testName);
- LOG.info("starting: "+testName);
-
- String databaseName = testName.toLowerCase();
- String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
- String tableName = newTableName(testName).toLowerCase();
- byte[] tableNameBytes = Bytes.toBytes(tableName);
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
-
-
- //include hbase config in conf file
- Configuration conf = new Configuration(allConf);
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(allConf.getAllProperties()));
-
-
- String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + "
LOCATION '" + dbDir + "'";
- String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
- "(key int, english string, spanish string)
STORED BY " +
-
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES
('hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
-
- assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
- assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:ONE,spanish:DOS",
- "3,english:ONE,spanish:TRES"};
-
- // input/output settings
- Path inputPath = new Path(methodTestDir,"mr_input");
- getFileSystem().mkdirs(inputPath);
- //create multiple files so we can test with multiple mappers
- for(int i=0;i<data.length;i++) {
- FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile"+i+".txt"));
- os.write(Bytes.toBytes(data[i] + "\n"));
- os.close();
- }
-
- //create job
- Job job = new Job(conf,testName);
- HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
- job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapHCatWrite.class);
-
- job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, inputPath);
-
-
- job.setOutputFormatClass(HCatOutputFormat.class);
- OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
-
outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
"1");
- HCatOutputFormat.setOutput(job,outputJobInfo);
-
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(HCatRecord.class);
-
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(HCatRecord.class);
-
- job.setNumReduceTasks(0);
-
- assertTrue(job.waitForCompletion(true));
//verify
HTable table = new HTable(conf, databaseName+"."+tableName);
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Sat Jan 21 05:12:16 2012
@@ -39,11 +39,15 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -117,6 +121,19 @@ public class TestHBaseDirectOutputStorag
job.setOutputFormatClass(HBaseDirectOutputFormat.class);
job.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,
tableName);
+ //manually create transaction
+ RevisionManager rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+ try {
+ OutputJobInfo outputJobInfo = OutputJobInfo.create("default",
tableName, null, null, null);
+ Transaction txn = rm.beginWriteTransaction(tableName,
Arrays.asList(familyName));
+
outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ HCatUtil.serialize(outputJobInfo));
+ } finally {
+ rm.close();
+ }
+
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
@@ -149,7 +166,7 @@ public class TestHBaseDirectOutputStorag
String testName = "directOutputStorageDriverTest";
Path methodTestDir = new Path(getTestDir(),testName);
- String databaseName = "default";
+ String databaseName = testName.toLowerCase();
String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
@@ -198,116 +215,29 @@ public class TestHBaseDirectOutputStorag
job.setOutputFormatClass(HCatOutputFormat.class);
OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
- long lbTimestamp = System.currentTimeMillis();
HCatOutputFormat.setOutput(job,outputJobInfo);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(Put.class);
+ job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
-
assertTrue(job.waitForCompletion(true));
- long ubTimestamp = System.currentTimeMillis();
- //verify
- HTable table = new HTable(conf, tableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- int index = 0;
- long prevTimestamp = -1;
- for(Result result: scanner) {
- String vals[] = data[index].toString().split(",");
- for(int i=1;i<vals.length;i++) {
- String pair[] = vals[i].split(":");
-
assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
-
assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
- long timestamp =
result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
- if(prevTimestamp < 1)
- prevTimestamp = timestamp;
- else
- assertEquals(prevTimestamp+"="+timestamp,
- prevTimestamp,
- timestamp);
- assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
- timestamp >= lbTimestamp && timestamp <=
ubTimestamp);
+ RevisionManager rm =
HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot =
rm.createSnapshot(databaseName+"."+tableName);
+ for(String el: snapshot.getColumnFamilies()) {
+ assertEquals(1,snapshot.getRevision(el));
}
- index++;
+ } finally {
+ rm.close();
}
- assertEquals(data.length,index);
- }
-
- @Test
- public void directOutputStorageDriverTestWithRevision() throws Exception {
- String testName = "directOutputStorageDriverTestWithRevision";
- Path methodTestDir = new Path(getTestDir(),testName);
-
- String databaseName = "default";
- String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
- String tableName = newTableName(testName).toLowerCase();
- byte[] tableNameBytes = Bytes.toBytes(tableName);
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
-
-
- //include hbase config in conf file
- Configuration conf = new Configuration(allConf);
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(allConf.getAllProperties()));
-
-
- String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + "
LOCATION '" + dbDir + "'";
- String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
- "(key int, english string, spanish string)
STORED BY " +
-
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES
('"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+
-
"'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
-
- assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
- assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:ONE,spanish:DOS",
- "3,english:ONE,spanish:TRES"};
-
- // input/output settings
- Path inputPath = new Path(methodTestDir,"mr_input");
- getFileSystem().mkdirs(inputPath);
- //create multiple files so we can test with multiple mappers
- for(int i=0;i<data.length;i++) {
- FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile"+i+".txt"));
- os.write(Bytes.toBytes(data[i] + "\n"));
- os.close();
- }
-
- //create job
- Job job = new Job(conf, testName);
- job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapHCatWrite.class);
-
- job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, inputPath);
-
-
- job.setOutputFormatClass(HCatOutputFormat.class);
- OutputJobInfo outputJobInfo =
OutputJobInfo.create(databaseName,tableName,null,null,null);
-
outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
"1");
- HCatOutputFormat.setOutput(job,outputJobInfo);
-
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(HCatRecord.class);
-
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(HCatRecord.class);
-
- job.setNumReduceTasks(0);
- assertTrue(job.waitForCompletion(true));
//verify
- HTable table = new HTable(conf, tableName);
+ HTable table = new HTable(conf, databaseName+"."+tableName);
Scan scan = new Scan();
scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java?rev=1234272&r1=1234271&r2=1234272&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
Sat Jan 21 05:12:16 2012
@@ -55,7 +55,6 @@ public class TestHBaseSerDeResultConvert
tbl.setProperty(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+
HBaseSerDe.HBASE_COLUMNS_MAPPING,
":key,my_family:my_qualifier1,my_family:my_qualifier2,my_family2:");
tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
- tbl.setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,"1");
return tbl;
}
@@ -74,8 +73,9 @@ public class TestHBaseSerDeResultConvert
@Test
public void testDeserialize() throws IOException {
HBaseSerDeResultConverter converter = new
HBaseSerDeResultConverter(createHCatSchema(),
- null,
- createProperties());
+
null,
+
createProperties(),
+
1l);
//test integer
Result result = new Result(new KeyValue[]{new
KeyValue(Bytes.toBytes("row"),
Bytes.toBytes("my_family"),
@@ -127,8 +127,9 @@ public class TestHBaseSerDeResultConvert
public void testSerialize() throws IOException {
HCatSchema schema = createHCatSchema();
HBaseSerDeResultConverter converter = new
HBaseSerDeResultConverter(schema,
- null,
- createProperties());
+
null,
+
createProperties(),
+
1l);
HCatRecord in = new DefaultHCatRecord(4);
//row key
in.set(0,"row");