Author: toffer
Date: Wed Jan 18 00:18:03 2012
New Revision: 1232664
URL: http://svn.apache.org/viewvc?rev=1232664&view=rev
Log:
HCATALOG-192 HBase output storage driver integration with zookeeper based
revision manager (avandana via toffer)
Added:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
Modified:
incubator/hcatalog/branches/branch-0.3/CHANGES.txt
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
Modified: incubator/hcatalog/branches/branch-0.3/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/CHANGES.txt?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.3/CHANGES.txt Wed Jan 18 00:18:03 2012
@@ -23,6 +23,8 @@ Release 0.3.0 (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-192. HBase output storage driver integration with zookeeper based
revision manager (avandana via toffer)
+
HCAT-193. Snapshot class for HCatalog tables. (avandana via toffer)
HCAT-87. Newly added partition should inherit table properties. (hashutosh
at HIVE-2589 via khorgath)
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
Wed Jan 18 00:18:03 2012
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.hbase.HBas
import org.apache.hcatalog.common.HCatConstants;
/**
- * Constants class for constants used in Ht
+ * Constants class for constants used in HBase storage driver.
*/
class HBaseConstants {
@@ -38,4 +38,7 @@ class HBaseConstants {
/** key used to define wether bulk storage driver will be used or not */
public static final String PROPERTY_OSD_BULK_MODE_KEY =
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.output.bulkMode";
+ /** key used to define the hbase table snapshot. */
+ public static final String PROPERTY_TABLE_SNAPSHOT_KEY =
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + "hbase.table.snapshot";
+
}
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
Wed Jan 18 00:18:03 2012
@@ -26,10 +26,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import com.facebook.fb303.FacebookBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -50,14 +50,23 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.storagehandler.HCatStorageHandler;
import org.apache.thrift.TBase;
import org.apache.zookeeper.ZooKeeper;
+import com.facebook.fb303.FacebookBase;
+
/**
* This class HBaseHCatStorageHandler provides functionality to create HBase
* tables through HCatalog. The implementation is very similar to the
@@ -191,8 +200,10 @@ public class HBaseHCatStorageHandler ext
uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
for (String columnFamily : uniqueColumnFamilies) {
- tableDesc.addFamily(new HColumnDescriptor(Bytes
- .toBytes(columnFamily)));
+ HColumnDescriptor familyDesc = new
HColumnDescriptor(Bytes
+ .toBytes(columnFamily));
+ familyDesc.setMaxVersions(Integer.MAX_VALUE);
+ tableDesc.addFamily(familyDesc);
}
getHBaseAdmin().createTable(tableDesc);
@@ -435,4 +446,131 @@ public class HBaseHCatStorageHandler ext
FacebookBase.class);
}
+
+ /**
+ * Creates the latest snapshot of the table.
+ *
+ * @param jobConf The job configuration.
+ * @param hbaseTableName The fully qualified name of the HBase table.
+ * @return An instance of HCatTableSnapshot
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public static HCatTableSnapshot createSnapshot(Configuration jobConf,
+ String hbaseTableName ) throws IOException {
+
+ RevisionManager rm = null;
+ TableSnapshot snpt;
+ try {
+ rm = getOpenedRevisionManager(jobConf);
+ snpt = rm.createSnapshot(hbaseTableName);
+ } finally {
+ if (rm != null)
+ rm.close();
+ }
+
+ String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ if(inputJobString == null){
+ throw new IOException(
+ "InputJobInfo information not found in JobContext. "
+ + "HCatInputFormat.setInput() not called?");
+ }
+ InputJobInfo inputInfo = (InputJobInfo)
HCatUtil.deserialize(inputJobString);
+ HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
+ .convertSnapshot(snpt, inputInfo.getTableInfo());
+
+ return hcatSnapshot;
+ }
+
+ /**
+ * Creates the snapshot using the revision specified by the user.
+ *
+ * @param jobConf The job configuration.
+ * @param tableName The fully qualified name of the table whose snapshot
is being taken.
+ * @param revision The revision number to use for the snapshot.
+ * @return An instance of HCatTableSnapshot.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public static HCatTableSnapshot createSnapshot(Configuration jobConf,
+ String tableName, long revision)
+ throws IOException {
+
+ TableSnapshot snpt;
+ RevisionManager rm = null;
+ try {
+ rm = getOpenedRevisionManager(jobConf);
+ snpt = rm.createSnapshot(tableName, revision);
+ } finally {
+ if (rm != null)
+ rm.close();
+ }
+
+ String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ if(inputJobString == null){
+ throw new IOException(
+ "InputJobInfo information not found in JobContext. "
+ + "HCatInputFormat.setInput() not called?");
+ }
+ InputJobInfo inputInfo = (InputJobInfo)
HCatUtil.deserialize(inputJobString);
+ HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
+ .convertSnapshot(snpt, inputInfo.getTableInfo());
+
+ return hcatSnapshot;
+ }
+
+ /**
+ * Gets an instance of revision manager which is opened.
+ *
+ * @param jobConf The job configuration.
+ * @return RevisionManager An instance of revision manager.
+ * @throws IOException
+ */
+ static RevisionManager getOpenedRevisionManager(Configuration jobConf)
throws IOException {
+
+ Properties properties = new Properties();
+ String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
+ int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
+ HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+
+ if (zkHostList != null) {
+ String[] splits = zkHostList.split(",");
+ StringBuffer sb = new StringBuffer();
+ for (String split : splits) {
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ sb.append(',');
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+ properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+ }
+ String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
+ if (dataDir != null) {
+ properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
+ }
+ String rmClassName = jobConf.get(
+ RevisionManager.REVISION_MGR_IMPL_CLASS,
+ ZKBasedRevisionManager.class.getName());
+ properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
+ RevisionManager revisionManger = RevisionManagerFactory
+ .getRevisionManager(properties);
+ revisionManger.open();
+ return revisionManger;
+ }
+
+ /**
+ * Set snapshot as a property.
+ *
+ * @param snapshot The HCatTableSnapshot to be passed to the job.
+ * @param inpJobInfo The InputJobInfo for the job.
+ * @throws IOException
+ */
+ public void setSnapshot(HCatTableSnapshot snapshot, InputJobInfo
inpJobInfo)
+ throws IOException {
+ String serializedSnp = HCatUtil.serialize(snapshot);
+ inpJobInfo.getProperties().setProperty(
+ HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp);
+ }
+
+
}
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
Wed Jan 18 00:18:03 2012
@@ -21,39 +21,46 @@ package org.apache.hcatalog.hbase;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
/**
* This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
*/
-class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
-
+class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result>
implements Configurable{
+
private final TableInputFormat inputFormat;
-
- public HBaseInputFormat() {
+ private final InputJobInfo jobInfo;
+ private Configuration conf;
+
+ public HBaseInputFormat(InputJobInfo jobInfo) {
inputFormat = new TableInputFormat();
+ this.jobInfo = jobInfo;
}
-
+
/*
* @param instance of InputSplit
- *
+ *
* @param instance of TaskAttemptContext
- *
+ *
* @return RecordReader
- *
+ *
* @throws IOException
- *
+ *
* @throws InterruptedException
- *
+ *
* @see
* org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
* .hadoop.mapreduce.InputSplit,
@@ -63,18 +70,28 @@ class HBaseInputFormat extends InputForm
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
InputSplit split, TaskAttemptContext tac) throws IOException,
InterruptedException {
- return inputFormat.createRecordReader(split, tac);
+
+ String tableName =
inputFormat.getConf().get(TableInputFormat.INPUT_TABLE);
+ TableSplit tSplit = (TableSplit) split;
+ HbaseSnapshotRecordReader recordReader = new
HbaseSnapshotRecordReader(jobInfo);
+ Scan sc = new Scan(inputFormat.getScan());
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ recordReader.setScan(sc);
+ recordReader.setHTable(new HTable(this.conf, tableName));
+ recordReader.init();
+ return recordReader;
}
-
+
/*
* @param jobContext
- *
+ *
* @return List of InputSplit
- *
+ *
* @throws IOException
- *
+ *
* @throws InterruptedException
- *
+ *
* @see
*
org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce
* .JobContext)
@@ -82,19 +99,33 @@ class HBaseInputFormat extends InputForm
@Override
public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
+
+ String tableName = this.conf.get(TableInputFormat.INPUT_TABLE);
+ if (tableName == null) {
+ throw new IOException("The input table is not set. The input splits
cannot be created.");
+ }
return inputFormat.getSplits(jobContext);
}
-
+
public void setConf(Configuration conf) {
+ this.conf = conf;
inputFormat.setConf(conf);
}
-
+
public Scan getScan() {
return inputFormat.getScan();
}
-
+
public void setScan(Scan scan) {
inputFormat.setScan(scan);
}
-
+
+ /* @return
+ * @see org.apache.hadoop.conf.Configurable#getConf()
+ */
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
}
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
Wed Jan 18 00:18:03 2012
@@ -19,6 +19,8 @@
package org.apache.hcatalog.hbase;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -27,9 +29,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -37,22 +41,28 @@ import org.apache.hadoop.mapreduce.JobCo
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+
/**
* The Class HBaseInputStorageDriver enables reading of HBase tables through
* HCatalog.
*/
public class HBaseInputStorageDriver extends HCatInputStorageDriver {
- private HCatTableInfo tableInfo;
+
+ private InputJobInfo inpJobInfo;
private ResultConverter converter;
- private HCatSchema outputColSchema;
- private HCatSchema dataSchema;
- private Configuration jobConf;
- private String scanColumns;
+ private HCatSchema outputColSchema;
+ private HCatSchema dataSchema;
+ private Configuration jobConf;
+ private String scanColumns;
+ private HCatTableSnapshot snapshot;
/*
* @param JobContext
@@ -64,6 +74,7 @@ public class HBaseInputStorageDriver ext
*/
@Override
public void initialize(JobContext context, Properties hcatProperties)
throws IOException {
+
jobConf = context.getConfiguration();
String jobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
if (jobString == null) {
@@ -71,9 +82,8 @@ public class HBaseInputStorageDriver ext
"InputJobInfo information not found in JobContext. "
+ "HCatInputFormat.setInput() not called?");
}
- InputJobInfo jobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
- tableInfo = jobInfo.getTableInfo();
- dataSchema = tableInfo.getDataColumns();
+ inpJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+ dataSchema = inpJobInfo.getTableInfo().getDataColumns();
List<FieldSchema> fields = HCatUtil.getFieldSchemaList(dataSchema
.getFields());
hcatProperties.setProperty(Constants.LIST_COLUMNS,
@@ -83,6 +93,19 @@ public class HBaseInputStorageDriver ext
converter = new HBaseSerDeResultConverter(dataSchema, outputColSchema,
hcatProperties);
scanColumns = converter.getHBaseScanColumns();
+ String hbaseTableName = HBaseHCatStorageHandler
+ .getFullyQualifiedName(inpJobInfo.getTableInfo());
+ String serSnapshot = (String) inpJobInfo.getProperties().get(
+ HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+ if(serSnapshot == null){
+ snapshot = HBaseHCatStorageHandler.createSnapshot(jobConf,
+ hbaseTableName);
+ inpJobInfo.getProperties().setProperty(
+ HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
+ HCatUtil.serialize(snapshot));
+ }
+
+ context.getConfiguration().set(HCatConstants.HCAT_KEY_JOB_INFO,
HCatUtil.serialize(inpJobInfo));
}
@@ -97,10 +120,13 @@ public class HBaseInputStorageDriver ext
@Override
public InputFormat<ImmutableBytesWritable, Result> getInputFormat(
Properties hcatProperties) {
- HBaseInputFormat tableInputFormat = new HBaseInputFormat();
- String hbaseTableName =
HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+
+ String hbaseTableName = HBaseHCatStorageHandler
+ .getFullyQualifiedName(inpJobInfo.getTableInfo());
+ HBaseInputFormat tableInputFormat = new HBaseInputFormat(inpJobInfo);
jobConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName);
jobConf.set(TableInputFormat.SCAN_COLUMNS, scanColumns);
+ jobConf.setInt(TableInputFormat.SCAN_MAXVERSIONS, 1);
tableInputFormat.setConf(jobConf);
// TODO: Make the caching configurable by the user
tableInputFormat.getScan().setCaching(200);
@@ -109,11 +135,11 @@ public class HBaseInputStorageDriver ext
}
/*
- * @param baseKey
+ * @param baseKey The key produced by the MR job.
*
- * @param baseValue
+ * @param baseValue The value produced by the MR job.
*
- * @return HCatRecord
+ * @return HCatRecord An instance of HCatRecord produced by the key, value.
*
* @throws IOException
*
@@ -128,9 +154,9 @@ public class HBaseInputStorageDriver ext
}
/*
- * @param jobContext
+ * @param jobContext The jobcontext of MR job
*
- * @param howlSchema
+ * @param howlSchema The output schema of the hcat record.
*
* @throws IOException
*
@@ -161,9 +187,9 @@ public class HBaseInputStorageDriver ext
}
/*
- * @param jobContext
+ * @param jobContext The jobcontext of MR job.
*
- * @param hcatSchema
+ * @param hcatSchema The schema of the hcat record.
*
* @throws IOException
*
@@ -176,4 +202,74 @@ public class HBaseInputStorageDriver ext
throws IOException {
this.dataSchema = hcatSchema;
}
+
+ static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot,
+ HCatTableInfo hcatTableInfo) throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ Map<String, String> hcatHbaseColMap =
getHCatHBaseColumnMapping(hcatTableInfo);
+ HashMap<String, Long> revisionMap = new HashMap<String, Long>();
+
+ for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+ if(hcatHbaseColMap.containsKey(fSchema.getName())){
+ String colFamily = hcatHbaseColMap.get(fSchema.getName());
+ long revisionID = hbaseSnapshot.getRevision(colFamily);
+ revisionMap.put(fSchema.getName(), revisionID);
+ }
+ }
+
+ HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot(
+ hcatTableInfo.getDatabaseName(),
hcatTableInfo.getTableName(),revisionMap);
+ return hcatSnapshot;
+ }
+
+ static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot,
+ HCatTableInfo hcatTableInfo) throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ Map<String, Long> revisionMap = new HashMap<String, Long>();
+ Map<String, String> hcatHbaseColMap =
getHCatHBaseColumnMapping(hcatTableInfo);
+ for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+ String colFamily = hcatHbaseColMap.get(fSchema.getName());
+ if (hcatSnapshot.containsColumn(fSchema.getName())) {
+ long revision = hcatSnapshot.getRevision(fSchema.getName());
+ revisionMap.put(colFamily, revision);
+ }
+ }
+
+ String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "."
+ + hcatSnapshot.getTableName();
+ return new TableSnapshot(fullyQualifiedName, revisionMap);
+
+ }
+
+ private static Map<String, String> getHCatHBaseColumnMapping(
HCatTableInfo hcatTableInfo)
+ throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ StorerInfo storeInfo = hcatTableInfo.getStorerInfo();
+ String hbaseColumnMapping = storeInfo.getProperties().getProperty(
+ HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+
+ Map<String, String> hcatHbaseColMap = new HashMap<String, String>();
+ List<String> columnFamilies = new ArrayList<String>();
+ List<String> columnQualifiers = new ArrayList<String>();
+ try {
+ HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies,
+ null, columnQualifiers, null);
+ } catch (SerDeException e) {
+ throw new IOException("Exception while converting snapshots.", e);
+ }
+
+ for (HCatFieldSchema column : hcatTableSchema.getFields()) {
+ int fieldPos = hcatTableSchema.getPosition(column.getName());
+ String colFamily = columnFamilies.get(fieldPos);
+ if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+ hcatHbaseColMap.put(column.getName(), colFamily);
+ }
+ }
+
+ return hcatHbaseColMap;
+ }
+
}
Added:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1232664&view=auto
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
(added)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
Wed Jan 18 00:18:03 2012
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+
+/**
+ * The Class HbaseSnapshotRecordReader implements logic for filtering records
+ * based on snapshot.
+ */
+class HbaseSnapshotRecordReader extends TableRecordReader {
+
+ static final Log LOG = LogFactory.getLog(HbaseSnapshotRecordReader.class);
+ private ResultScanner scanner;
+ private Scan scan;
+ private HTable htable;
+ private ImmutableBytesWritable key;
+ private Result value;
+ private InputJobInfo inpJobInfo;
+ private TableSnapshot snapshot;
+ private int maxRevisions;
+ private Iterator<Result> resultItr;
+
+
+ HbaseSnapshotRecordReader(InputJobInfo inputJobInfo) throws IOException {
+ this.inpJobInfo = inputJobInfo;
+ String snapshotString = inpJobInfo.getProperties().getProperty(
+ HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+ HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
+ .deserialize(snapshotString);
+ this.snapshot = HBaseInputStorageDriver.convertSnapshot(hcatSnapshot,
+ inpJobInfo.getTableInfo());
+ this.maxRevisions = 1;
+ }
+
+ /* @param firstRow The first record in the split.
+ /* @throws IOException
+ * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#restart(byte[])
+ */
+ @Override
+ public void restart(byte[] firstRow) throws IOException {
+ Scan newScan = new Scan(scan);
+ newScan.setStartRow(firstRow);
+ this.scanner = this.htable.getScanner(newScan);
+ resultItr = this.scanner.iterator();
+ }
+
+ /* @throws IOException
+ * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#init()
+ */
+ @Override
+ public void init() throws IOException {
+ restart(scan.getStartRow());
+ }
+
+ /*
+ * @param htable The HTable ( of HBase) to use for the record reader.
+ *
+ * @see
+ * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setHTable(org.apache
+ * .hadoop.hbase.client.HTable)
+ */
+ @Override
+ public void setHTable(HTable htable) {
+ this.htable = htable;
+ }
+
+ /*
+ * @param scan The scan to be used for reading records.
+ *
+ * @see
+ * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setScan(org.apache
+ * .hadoop.hbase.client.Scan)
+ */
+ @Override
+ public void setScan(Scan scan) {
+ this.scan = scan;
+ }
+
+ /*
+ * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#close()
+ */
+ @Override
+ public void close() {
+ this.resultItr = null;
+ this.scanner.close();
+ }
+
+ /* @return The row of hbase record.
+ /* @throws IOException
+ /* @throws InterruptedException
+ * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentKey()
+ */
+ @Override
+ public ImmutableBytesWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return key;
+ }
+
+ /* @return Single row result of scan of HBase table.
+ /* @throws IOException
+ /* @throws InterruptedException
+ * @see
org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentValue()
+ */
+ @Override
+ public Result getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ /* @return Returns whether a next key-value is available for reading.
+ * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#nextKeyValue()
+ */
+ @Override
+ public boolean nextKeyValue() {
+
+ if (this.resultItr == null) {
+ LOG.warn("The HBase result iterator is found null. It is possible"
+ + " that the record reader has already been closed.");
+ } else {
+
+ if (key == null)
+ key = new ImmutableBytesWritable();
+ while (resultItr.hasNext()) {
+ Result temp = resultItr.next();
+ Result hbaseRow = prepareResult(temp.list());
+ if (hbaseRow != null) {
+ key.set(hbaseRow.getRow());
+ value = hbaseRow;
+ return true;
+ }
+
+ }
+ }
+ return false;
+ }
+
+ private Result prepareResult(List<KeyValue> keyvalues) {
+
+ List<KeyValue> finalKeyVals = new ArrayList<KeyValue>();
+ Map<String, List<KeyValue>> qualValMap = new HashMap<String,
List<KeyValue>>();
+ for (KeyValue kv : keyvalues) {
+ byte[] cf = kv.getFamily();
+ byte[] qualifier = kv.getQualifier();
+ String key = Bytes.toString(cf) + ":" + Bytes.toString(qualifier);
+ List<KeyValue> kvs;
+ if (qualValMap.containsKey(key)) {
+ kvs = qualValMap.get(key);
+ } else {
+ kvs = new ArrayList<KeyValue>();
+ }
+
+ String family = Bytes.toString(kv.getFamily());
+ long desiredTS = snapshot.getRevision(family);
+ if (kv.getTimestamp() <= desiredTS) {
+ kvs.add(kv);
+ }
+ qualValMap.put(key, kvs);
+ }
+
+ Set<String> keys = qualValMap.keySet();
+ for (String cf : keys) {
+ List<KeyValue> kvs = qualValMap.get(cf);
+ if (maxRevisions <= kvs.size()) {
+ for (int i = 0; i < maxRevisions; i++) {
+ finalKeyVals.add(kvs.get(i));
+ }
+ } else {
+ finalKeyVals.addAll(kvs);
+ }
+ }
+
+ if(finalKeyVals.size() == 0){
+ return null;
+ } else {
+ KeyValue[] kvArray = new KeyValue[finalKeyVals.size()];
+ finalKeyVals.toArray(kvArray);
+ return new Result(kvArray);
+ }
+ }
+
+ /* @return The progress of the record reader.
+ * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getProgress()
+ */
+ @Override
+ public float getProgress() {
+ // Depends on the total number of tuples
+ return 0;
+ }
+
+}
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
Wed Jan 18 00:18:03 2012
@@ -38,6 +38,8 @@ public class ZKBasedRevisionManager impl
public static final String HOSTLIST = "revision.manager.zk.HostList";
public static final String DATADIR = "revision.manager.zk.DataDir";
+ public static final String DEFAULT_DATADIR = "/revision-management";
+ public static final String DEFAULT_HOSTLIST = "localhost:2181";
private static int DEFAULT_WRITE_TRANSACTION_TIMEOUT = 14400000;
private static final Log LOG =
LogFactory.getLog(ZKBasedRevisionManager.class);
private String zkHostList;
@@ -50,8 +52,11 @@ public class ZKBasedRevisionManager impl
*/
@Override
public void initialize(Properties properties) {
- this.zkHostList =
properties.getProperty(ZKBasedRevisionManager.HOSTLIST, "localhost:2181");
- this.baseDir =
properties.getProperty(ZKBasedRevisionManager.DATADIR,"/revision-management");
+ this.zkHostList = properties.getProperty(
+ ZKBasedRevisionManager.HOSTLIST,
+ ZKBasedRevisionManager.DEFAULT_HOSTLIST);
+ this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR,
+ ZKBasedRevisionManager.DEFAULT_DATADIR);
}
/**
Modified:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
(original)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
Wed Jan 18 00:18:03 2012
@@ -18,11 +18,13 @@
package org.apache.hcatalog.hbase;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -54,6 +56,8 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.junit.Test;
@@ -66,20 +70,34 @@ public class TestHBaseInputStorageDriver
private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2");
- List<Put> generatePuts(int num) {
- List<Put> myPuts = new ArrayList<Put>();
- for (int i = 0; i < num; i++) {
- Put put = new Put(Bytes.toBytes("testRow" + i));
- put.add(FAMILY, QUALIFIER1, 0,
- Bytes.toBytes("testQualifier1-" + "textValue-" + i));
- put.add(FAMILY, QUALIFIER2, 0,
- Bytes.toBytes("testQualifier2-" + "textValue-" + i));
- myPuts.add(put);
+ private List<Put> generatePuts(int num, String tableName) throws
IOException {
+
+ List<String> columnFamilies = Arrays.asList("testFamily");
+ RevisionManager rm = null;
+ List<Put> myPuts;
+ try {
+ rm = HBaseHCatStorageHandler
+ .getOpenedRevisionManager(getHbaseConf());
+ rm.open();
+ myPuts = new ArrayList<Put>();
+ for (int i = 1; i <= num; i++) {
+ Put put = new Put(Bytes.toBytes("testRow"));
+ put.add(FAMILY, QUALIFIER1, i, Bytes.toBytes("textValue-" +
i));
+ put.add(FAMILY, QUALIFIER2, i, Bytes.toBytes("textValue-" +
i));
+ myPuts.add(put);
+ Transaction tsx = rm.beginWriteTransaction(tableName,
+ columnFamilies);
+ rm.commitWriteTransaction(tsx);
+ }
+ } finally {
+ if (rm != null)
+ rm.close();
}
+
return myPuts;
}
- public void Initialize() throws Exception {
+ private void Initialize() throws Exception {
hcatConf = getHiveConf();
hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
HCatSemanticAnalyzer.class.getName());
@@ -102,8 +120,8 @@ public class TestHBaseInputStorageDriver
}
- public void populateHBaseTable(String tName) throws IOException {
- List<Put> myPuts = generatePuts(10);
+ private void populateHBaseTable(String tName, int revisions) throws
IOException {
+ List<Put> myPuts = generatePuts(revisions, tName);
HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName));
table.put(myPuts);
}
@@ -132,7 +150,7 @@ public class TestHBaseInputStorageDriver
boolean doesTableExist = hAdmin.tableExists(hbaseTableName);
assertTrue(doesTableExist);
- populateHBaseTable(hbaseTableName);
+ populateHBaseTable(hbaseTableName, 5);
Configuration conf = new Configuration(hcatConf);
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(getHiveConf().getAllProperties()));
@@ -160,14 +178,15 @@ public class TestHBaseInputStorageDriver
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));
- assertTrue(MapReadHTable.error == false);
+ assertFalse(MapReadHTable.error);
+ assertEquals(MapReadHTable.count, 1);
String dropTableQuery = "DROP TABLE " + hbaseTableName ;
CommandProcessorResponse responseThree =
hcatDriver.run(dropTableQuery);
assertEquals(0, responseThree.getResponseCode());
boolean isHbaseTableThere = hAdmin.tableExists(hbaseTableName);
- assertTrue(isHbaseTableThere == false);
+ assertFalse(isHbaseTableThere);
String dropDB = "DROP DATABASE " + databaseName;
CommandProcessorResponse responseFour = hcatDriver.run(dropDB);
@@ -192,7 +211,7 @@ public class TestHBaseInputStorageDriver
boolean doesTableExist = hAdmin.tableExists(tableName);
assertTrue(doesTableExist);
- populateHBaseTable(tableName);
+ populateHBaseTable(tableName, 5);
Configuration conf = new Configuration(hcatConf);
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
@@ -222,14 +241,15 @@ public class TestHBaseInputStorageDriver
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));
- assertTrue(MapReadHTable.error == false);
+ assertFalse(MapReadProjHTable.error);
+ assertEquals(MapReadProjHTable.count, 1);
String dropTableQuery = "DROP TABLE " + tableName ;
CommandProcessorResponse responseThree =
hcatDriver.run(dropTableQuery);
assertEquals(0, responseThree.getResponseCode());
boolean isHbaseTableThere = hAdmin.tableExists(tableName);
- assertTrue(isHbaseTableThere == false);
+ assertFalse(isHbaseTableThere);
}
@@ -238,18 +258,20 @@ public class TestHBaseInputStorageDriver
Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable,
Text> {
static boolean error = false;
-
+ static int count = 0;
@Override
public void map(ImmutableBytesWritable key, HCatRecord value,
Context context) throws IOException, InterruptedException {
+ System.out.println("HCat record value" + value.toString());
boolean correctValues = (value.size() == 3)
- && (value.get(0).toString()).startsWith("testRow")
- && (value.get(1).toString()).startsWith("testQualifier1")
- && (value.get(2).toString()).startsWith("testQualifier2");
+ && (value.get(0).toString()).equalsIgnoreCase("testRow")
+ &&
(value.get(1).toString()).equalsIgnoreCase("textValue-5")
+ &&
(value.get(2).toString()).equalsIgnoreCase("textValue-5");
if (correctValues == false) {
error = true;
}
+ count++;
}
}
@@ -258,17 +280,19 @@ public class TestHBaseInputStorageDriver
Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable,
Text> {
static boolean error = false;
-
+ static int count = 0;
@Override
public void map(ImmutableBytesWritable key, HCatRecord value,
Context context) throws IOException, InterruptedException {
+ System.out.println("HCat record value" + value.toString());
boolean correctValues = (value.size() == 2)
- && (value.get(0).toString()).startsWith("testRow")
- && (value.get(1).toString()).startsWith("testQualifier1");
+ && (value.get(0).toString()).equalsIgnoreCase("testRow")
+ &&
(value.get(1).toString()).equalsIgnoreCase("textValue-5");
if (correctValues == false) {
error = true;
}
+ count++;
}
}
Added:
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java?rev=1232664&view=auto
==============================================================================
---
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
(added)
+++
incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
Wed Jan 18 00:18:03 2012
@@ -0,0 +1,122 @@
+package org.apache.hcatalog.hbase;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.junit.Test;
+
+public class TestSnapshots extends SkeletonHBaseTest {
+ private static HiveConf hcatConf;
+ private static HCatDriver hcatDriver;
+
+ public void Initialize() throws Exception {
+ hcatConf = getHiveConf();
+ hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ URI fsuri = getFileSystem().getUri();
+ Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(),
+ getTestDir());
+ hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString());
+ hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString());
+
+ //Add hbase properties
+
+ for (Map.Entry<String, String> el : getHbaseConf()) {
+ if (el.getKey().startsWith("hbase.")) {
+ hcatConf.set(el.getKey(), el.getValue());
+ }
+ }
+
+ SessionState.start(new CliSessionState(hcatConf));
+ hcatDriver = new HCatDriver();
+
+ }
+
+ @Test
+ public void TestSnapshotConversion() throws Exception{
+ Initialize();
+ String tableName = newTableName("mytableOne");
+ String databaseName = newTableName("mydatabase");
+ String fullyQualTableName = databaseName + "." + tableName;
+ String db_dir = getTestDir() + "/hbasedb";
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + "
LOCATION '"
+ + db_dir + "'";
+ String tableQuery = "CREATE TABLE " + fullyQualTableName
+ + "(key string, value1 string, value2 string)
STORED BY " +
+
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+ + "TBLPROPERTIES
('hbase.columns.mapping'=':key,cf1:q1,cf2:q2')" ;
+
+ CommandProcessorResponse cmdResponse = hcatDriver.run(dbquery);
+ assertEquals(0, cmdResponse.getResponseCode());
+ cmdResponse = hcatDriver.run(tableQuery);
+ assertEquals(0, cmdResponse.getResponseCode());
+
+ InputJobInfo inputInfo = InputJobInfo.create(databaseName, tableName,
null, null, null);
+ Configuration conf = new Configuration(hcatConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+ HCatUtil.serialize(getHiveConf().getAllProperties()));
+ Job job = new Job(conf);
+ InitializeInput.setInput(job, inputInfo);
+ String modifiedInputInfo =
job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+ inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
+
+ Map<String, Long> revMap = new HashMap<String, Long>();
+ revMap.put("cf1", 3L);
+ revMap.put("cf2", 5L);
+ TableSnapshot hbaseSnapshot = new TableSnapshot(fullyQualTableName,
revMap);
+ HCatTableSnapshot hcatSnapshot =
HBaseInputStorageDriver.convertSnapshot(hbaseSnapshot,
inputInfo.getTableInfo());
+
+ assertEquals(hcatSnapshot.getRevision("value1"), 3);
+ assertEquals(hcatSnapshot.getRevision("value2"), 5);
+
+ String dropTable = "DROP TABLE " + fullyQualTableName;
+ cmdResponse = hcatDriver.run(dropTable);
+ assertEquals(0, cmdResponse.getResponseCode());
+
+ tableName = newTableName("mytableTwo");
+ fullyQualTableName = databaseName + "." + tableName;
+ tableQuery = "CREATE TABLE " + fullyQualTableName
+ + "(key string, value1 string, value2 string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+ + "TBLPROPERTIES ('hbase.columns.mapping'=':key,cf1:q1,cf1:q2')" ;
+ cmdResponse = hcatDriver.run(tableQuery);
+ assertEquals(0, cmdResponse.getResponseCode());
+ revMap.clear();
+ revMap.put("cf1", 3L);
+ hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap);
+ inputInfo = InputJobInfo.create(databaseName, tableName, null, null,
null);
+ InitializeInput.setInput(job, inputInfo);
+ modifiedInputInfo =
job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+ inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
+ hcatSnapshot = HBaseInputStorageDriver.convertSnapshot(hbaseSnapshot,
inputInfo.getTableInfo());
+ assertEquals(hcatSnapshot.getRevision("value1"), 3);
+ assertEquals(hcatSnapshot.getRevision("value2"), 3);
+
+ dropTable = "DROP TABLE " + fullyQualTableName;
+ cmdResponse = hcatDriver.run(dropTable);
+ assertEquals(0, cmdResponse.getResponseCode());
+
+ String dropDatabase = "DROP DATABASE IF EXISTS " + databaseName +
"CASCADE";
+ cmdResponse = hcatDriver.run(dropDatabase);
+ assertEquals(0, cmdResponse.getResponseCode());
+ }
+
+}