Author: gates
Date: Tue Mar 20 16:26:49 2012
New Revision: 1302978
URL: http://svn.apache.org/viewvc?rev=1302978&view=rev
Log:
HCATALOG-302 unable to write to hbase channel. HBaseHCatStorageHandler class
not found
Modified:
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Tue Mar 20 16:26:49 2012
@@ -30,6 +30,8 @@ Release 0.4.1 - Unreleased
BUG FIXES
HCAT-319 Cleanup of 0.3 mapred classes (khorgath via gates)
+ HCAT-302 unable to write to hbase channel. HBaseHCatStorageHandler class not
found (rohini via gates)
+
Release 0.4.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
Tue Mar 20 16:26:49 2012
@@ -70,27 +70,30 @@ public class HCatStorer extends HCatBase
@Override
public void setStoreLocation(String location, Job job) throws IOException {
-
job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" +
sign);
Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass(), new
String[]{sign});
String[] userStr = location.split("\\.");
OutputJobInfo outputJobInfo;
- if(userStr.length == 2) {
- outputJobInfo = OutputJobInfo.create(userStr[0],
+ String outInfoString = p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ if (outInfoString != null) {
+ outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outInfoString);
+ } else {
+ if(userStr.length == 2) {
+ outputJobInfo = OutputJobInfo.create(userStr[0],
userStr[1],
partitions);
- } else if(userStr.length == 1) {
- outputJobInfo = OutputJobInfo.create(null,
+ } else if(userStr.length == 1) {
+ outputJobInfo = OutputJobInfo.create(null,
userStr[0],
partitions);
- } else {
- throw new FrontendException("location "+location+" is invalid. It must
be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
+ } else {
+ throw new FrontendException("location "+location+" is invalid. It must
be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
}
-
Configuration config = job.getConfiguration();
if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){
@@ -123,6 +126,7 @@ public class HCatStorer extends HCatBase
PigHCatUtil.saveConfigIntoUDFProperties(p,
config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
PigHCatUtil.saveConfigIntoUDFProperties(p,
config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
PigHCatUtil.saveConfigIntoUDFProperties(p,
config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+ PigHCatUtil.saveConfigIntoUDFProperties(p,
config,HCatConstants.HCAT_KEY_OUTPUT_INFO);
p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
Modified:
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
Tue Mar 20 16:26:49 2012
@@ -61,9 +61,6 @@ class HBaseBulkOutputFormat extends HBas
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws IOException {
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
- job.setOutputCommitter(HBaseBulkOutputCommitter.class);
baseOutputFormat.checkOutputSpecs(ignored, job);
HBaseUtil.addHBaseDelegationToken(job);
addJTDelegationToken(job);
@@ -73,6 +70,8 @@ class HBaseBulkOutputFormat extends HBas
public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable
progress)
throws IOException {
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
long version = HBaseRevisionManagerUtil.getOutputRevision(job);
return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(
ignored, job, name, progress), version);
@@ -188,11 +187,21 @@ class HBaseBulkOutputFormat extends HBas
try {
Configuration conf = jobContext.getConfiguration();
Path srcPath =
FileOutputFormat.getOutputPath(jobContext.getJobConf());
+ if (!FileSystem.get(conf).exists(srcPath)) {
+ throw new IOException("Failed to bulk import hfiles. " +
+ "Intermediate data directory is cleaned up or
missing. " +
+ "Please look at the bulk import job if it
exists for failure reason");
+ }
Path destPath = new Path(srcPath.getParent(),
srcPath.getName() + "_hfiles");
- ImportSequenceFile.runJob(jobContext,
+ boolean success = ImportSequenceFile.runJob(jobContext,
conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
srcPath,
destPath);
+ if(!success) {
+ cleanIntermediate(jobContext);
+ throw new IOException("Failed to bulk import hfiles." +
+ " Please look at the bulk import job for
failure reason");
+ }
rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf));
cleanIntermediate(jobContext);
Modified:
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
Tue Mar 20 16:26:49 2012
@@ -63,9 +63,6 @@ class HBaseDirectOutputFormat extends HB
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws IOException {
- job.setOutputCommitter(HBaseDirectOutputCommitter.class);
- job.setIfUnset(TableOutputFormat.OUTPUT_TABLE,
- job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY));
outputFormat.checkOutputSpecs(ignored, job);
HBaseUtil.addHBaseDelegationToken(job);
}
Modified:
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
Tue Mar 20 16:26:49 2012
@@ -19,7 +19,6 @@
package org.apache.hcatalog.hbase;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -27,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.MasterNot
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -48,14 +49,15 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.TableDesc;
import
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
+import
org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
+import
org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
@@ -68,20 +70,20 @@ import org.apache.thrift.TBase;
import org.apache.zookeeper.ZooKeeper;
import com.facebook.fb303.FacebookBase;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class HBaseHCatStorageHandler provides functionality to create HBase
* tables through HCatalog. The implementation is very similar to the
* HiveHBaseStorageHandler, with more details to suit HCatalog.
*/
-//TODO remove serializable when HCATALOG-282 is fixed
-public class HBaseHCatStorageHandler extends HCatStorageHandler implements
HiveMetaHook, Serializable {
+public class HBaseHCatStorageHandler extends HCatStorageHandler implements
HiveMetaHook, Configurable {
public final static String DEFAULT_PREFIX = "default.";
private final static String PROPERTY_INT_OUTPUT_LOCATION =
"hcat.hbase.mapreduce.intermediateOutputLocation";
- private transient Configuration hbaseConf;
- private transient HBaseAdmin admin;
+ private Configuration hbaseConf;
+ private HBaseAdmin admin;
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String,
String> jobProperties) {
@@ -96,20 +98,32 @@ public class HBaseHCatStorageHandler ext
jobProperties.put(TableInputFormat.INPUT_TABLE,
qualifiedTableName);
Configuration jobConf = getConf();
+ addHbaseResources(jobConf, jobProperties);
+ Configuration copyOfConf = new Configuration(jobConf);
+ HBaseConfiguration.addHbaseResources(copyOfConf);
+ //Getting hbase delegation token in getInputSplits does not work
with PIG. So need to
+ //do it here
+ if (jobConf instanceof JobConf) {
+ HBaseUtil.addHBaseDelegationToken((JobConf)jobConf);
+ }
+
String outputSchema =
jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
jobProperties.put(TableInputFormat.SCAN_COLUMNS,
getScanColumns(tableInfo, outputSchema));
String serSnapshot = (String) inputJobInfo.getProperties().get(
HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
if (serSnapshot == null) {
- Configuration conf = addHbaseResources(jobConf);
- HCatTableSnapshot snapshot =
HBaseRevisionManagerUtil.createSnapshot(conf,
+ HCatTableSnapshot snapshot =
HBaseRevisionManagerUtil.createSnapshot(copyOfConf,
qualifiedTableName, tableInfo);
jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
HCatUtil.serialize(snapshot));
}
- addHbaseResources(jobConf, jobProperties);
+ //This adds it directly to the jobConf. Setting in jobProperties
does not get propagated
+ //to JobConf as of now as the jobProperties is maintained per
partition
+ //TODO: Remove when HCAT-308 is fixed
+ addOutputDependencyJars(jobConf);
+ jobProperties.put("tmpjars", jobConf.get("tmpjars"));
} catch (IOException e) {
throw new IllegalStateException("Error while configuring job
properties", e);
@@ -128,33 +142,50 @@ public class HBaseHCatStorageHandler ext
HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
String qualifiedTableName =
HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,
qualifiedTableName);
+ jobProperties.put(TableOutputFormat.OUTPUT_TABLE,
qualifiedTableName);
Configuration jobConf = getConf();
+ addHbaseResources(jobConf, jobProperties);
+
+ Configuration copyOfConf = new Configuration(jobConf);
+ HBaseConfiguration.addHbaseResources(copyOfConf);
+
String txnString = outputJobInfo.getProperties().getProperty(
HBaseConstants.PROPERTY_WRITE_TXN_KEY);
- if (txnString == null) {
- Configuration conf = addHbaseResources(jobConf);
- Transaction txn =
HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo,
conf);
+ String jobTxnString =
jobConf.get(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ //Pig makes 3 calls to HCatOutputFormat.setOutput(HCatStorer) with
different JobConf
+ //which leads to creating 2 transactions.
+ //So apart from fixing HCatStorer to pass same OutputJobInfo,
making the call idempotent for other
+ //cases which might call multiple times but with same JobConf.
+ Transaction txn = null;
+ if (txnString == null && jobTxnString == null) {
+ txn =
HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo,
copyOfConf);
+ String serializedTxn = HCatUtil.serialize(txn);
outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
- HCatUtil.serialize(txn));
-
- if (isBulkMode(outputJobInfo) &&
!(outputJobInfo.getProperties()
- .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) {
- String tableLocation = tableInfo.getTableLocation();
- String location = new Path(tableLocation, "REVISION_" +
txn.getRevisionNumber())
- .toString();
-
outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
- location);
- // We are writing out an intermediate sequenceFile hence
- // location is not passed in OutputJobInfo.getLocation()
- // TODO replace this with a mapreduce constant when
available
- jobProperties.put("mapred.output.dir", location);
- }
+ serializedTxn);
+ jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
serializedTxn);
+ } else {
+ txnString = (txnString == null) ? jobTxnString : txnString;
+ txn = (Transaction) HCatUtil.deserialize(txnString);
+
outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ txnString);
+ jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
txnString);
+ }
+ if (isBulkMode(outputJobInfo)) {
+ String tableLocation = tableInfo.getTableLocation();
+ String location = new Path(tableLocation, "REVISION_" +
txn.getRevisionNumber())
+ .toString();
+
outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
location);
+ // We are writing out an intermediate sequenceFile hence
+ // location is not passed in OutputJobInfo.getLocation()
+ // TODO replace this with a mapreduce constant when available
+ jobProperties.put("mapred.output.dir", location);
+ jobProperties.put("mapred.output.committer.class",
HBaseBulkOutputCommitter.class.getName());
+ } else {
+ jobProperties.put("mapred.output.committer.class",
HBaseDirectOutputCommitter.class.getName());
}
- jobProperties
- .put(HCatConstants.HCAT_KEY_OUTPUT_INFO,
HCatUtil.serialize(outputJobInfo));
- addHbaseResources(jobConf, jobProperties);
+ jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO,
HCatUtil.serialize(outputJobInfo));
addOutputDependencyJars(jobConf);
jobProperties.put("tmpjars", jobConf.get("tmpjars"));
@@ -429,7 +460,10 @@ public class HBaseHCatStorageHandler ext
@Override
public void setConf(Configuration conf) {
- hbaseConf = HBaseConfiguration.create(conf);
+ //Not cloning as we want to set tmpjars on it. Putting in
jobProperties does not
+ //get propagated to JobConf in case of InputFormat as they are
maintained per partition.
+ //Also we need to add hbase delegation token to the Credentials.
+ hbaseConf = conf;
}
private void checkDeleteTable(Table table) throws MetaException {
@@ -479,8 +513,6 @@ public class HBaseHCatStorageHandler ext
*/
private void addOutputDependencyJars(Configuration conf) throws
IOException {
TableMapReduceUtil.addDependencyJars(conf,
- //hadoop-core
- Writable.class,
//ZK
ZooKeeper.class,
//HBase
@@ -489,6 +521,8 @@ public class HBaseHCatStorageHandler ext
HiveException.class,
//HCatalog jar
HCatOutputFormat.class,
+ //hcat hbase storage handler jar
+ HBaseHCatStorageHandler.class,
//hive hbase storage handler jar
HBaseSerDe.class,
//hive jar
@@ -498,18 +532,9 @@ public class HBaseHCatStorageHandler ext
//hbase jar
Bytes.class,
//thrift-fb303 .jar
- FacebookBase.class);
- }
-
- /**
- * Utility method to get a new Configuration with hbase-default.xml and
hbase-site.xml added
- * @param jobConf existing configuration
- * @return a new Configuration with hbase-default.xml and hbase-site.xml
added
- */
- private Configuration addHbaseResources(Configuration jobConf) {
- Configuration conf = new Configuration(jobConf);
- HBaseConfiguration.addHbaseResources(conf);
- return conf;
+ FacebookBase.class,
+ //guava jar
+ ThreadFactoryBuilder.class);
}
/**
Modified:
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
Tue Mar 20 16:26:49 2012
@@ -105,7 +105,6 @@ class HBaseInputFormat implements InputF
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int
numSplits)
throws IOException {
inputFormat.setConf(job);
- HBaseUtil.addHBaseDelegationToken(job);
return
convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
Reporter.NULL)));
}
Modified:
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
Tue Mar 20 16:26:49 2012
@@ -80,19 +80,16 @@ class HbaseSnapshotRecordReader implemen
public void restart(byte[] firstRow) throws IOException {
allAbortedTransactions =
getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
- long maxValidRevision = snapshot.getLatestRevision();
+ long maxValidRevision = getMaximumRevision(scan, snapshot);
while (allAbortedTransactions.contains(maxValidRevision)) {
maxValidRevision--;
}
- long minValidRevision = getMinimumRevision(scan, snapshot);
- while (allAbortedTransactions.contains(minValidRevision)) {
- minValidRevision--;
- }
Scan newScan = new Scan(scan);
newScan.setStartRow(firstRow);
//TODO: See if filters in 0.92 can be used to optimize the scan
//TODO: Consider create a custom snapshot filter
- newScan.setTimeRange(minValidRevision, maxValidRevision + 1);
+ //TODO: Make min revision a constant in RM
+ newScan.setTimeRange(0, maxValidRevision + 1);
newScan.setMaxVersions();
this.scanner = this.htable.getScanner(newScan);
resultItr = this.scanner.iterator();
@@ -120,16 +117,16 @@ class HbaseSnapshotRecordReader implemen
}
}
- private long getMinimumRevision(Scan scan, TableSnapshot snapshot) {
- long minRevision = snapshot.getLatestRevision();
+ private long getMaximumRevision(Scan scan, TableSnapshot snapshot) {
+ long maxRevision = 0;
byte[][] families = scan.getFamilies();
for (byte[] familyKey : families) {
String family = Bytes.toString(familyKey);
long revision = snapshot.getRevision(family);
- if (revision < minRevision)
- minRevision = revision;
+ if (revision > maxRevision)
+ maxRevision = revision;
}
- return minRevision;
+ return maxRevision;
}
/*
Modified:
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
Tue Mar 20 16:26:49 2012
@@ -54,6 +54,7 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
+import
org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
import
org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction;
import
org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction;
import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
@@ -203,6 +204,7 @@ public class TestHBaseBulkOutputFormat e
job.setOutputFormat(HBaseBulkOutputFormat.class);
org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job,
interPath);
+ job.setOutputCommitter(HBaseBulkOutputCommitter.class);
//manually create transaction
RevisionManager rm =
HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
Modified:
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
Tue Mar 20 16:26:49 2012
@@ -138,6 +138,7 @@ public class TestHBaseDirectOutputFormat
org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormat(HBaseDirectOutputFormat.class);
+ job.set(TableOutputFormat.OUTPUT_TABLE, tableName);
job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
//manually create transaction
@@ -378,7 +379,17 @@ public class TestHBaseDirectOutputFormat
TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormatClass(HCatOutputFormat.class);
HCatOutputFormat.setOutput(job, outputJobInfo);
-
+ String txnString =
job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ //Test passing in same jobConf or same OutputJobInfo multiple times
and verify 1 transaction is created
+ //Same jobConf
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+ assertEquals(txnString,
job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+ String jobString =
job.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ //Same OutputJobInfo
+ outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+ Job job2 = new Job(conf);
+ HCatOutputFormat.setOutput(job2, outputJobInfo);
+ assertEquals(txnString,
job2.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(BytesWritable.class);
Modified:
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
Tue Mar 20 16:26:49 2012
@@ -449,11 +449,12 @@ public class TestHBaseInputFormat extend
assertTrue(doesTableExist);
populateHBaseTable(tableName, 2);
- populateHBaseTableQualifier1(tableName, 3, null); //Running transaction
- populateHBaseTableQualifier1(tableName, 4, Boolean.FALSE); //Aborted
transaction
- populateHBaseTableQualifier1(tableName, 5, Boolean.TRUE); //Committed
transaction
- populateHBaseTableQualifier1(tableName, 6, null); //Running Transaction
- populateHBaseTableQualifier1(tableName, 7, Boolean.FALSE); //Aborted
Transaction
+ populateHBaseTableQualifier1(tableName, 3, Boolean.TRUE); //Committed
transaction
+ populateHBaseTableQualifier1(tableName, 4, null); //Running transaction
+ populateHBaseTableQualifier1(tableName, 5, Boolean.FALSE); //Aborted
transaction
+ populateHBaseTableQualifier1(tableName, 6, Boolean.TRUE); //Committed
transaction
+ populateHBaseTableQualifier1(tableName, 7, null); //Running Transaction
+ populateHBaseTableQualifier1(tableName, 8, Boolean.FALSE); //Aborted
Transaction
Configuration conf = new Configuration(hcatConf);
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
@@ -588,7 +589,7 @@ public class TestHBaseInputFormat extend
System.out.println("HCat record value" + value.toString());
boolean correctValues = (value.size() == 3)
&& (value.get(0).toString()).equalsIgnoreCase("testRow")
- &&
(value.get(1).toString()).equalsIgnoreCase("textValue-2")
+ &&
(value.get(1).toString()).equalsIgnoreCase("textValue-3")
&&
(value.get(2).toString()).equalsIgnoreCase("textValue-2");
if (correctValues == false) {