Author: gates
Date: Thu Feb 2 20:28:51 2012
New Revision: 1239808
URL: http://svn.apache.org/viewvc?rev=1239808&view=rev
Log:
HCATALOG-207 Changes to current HCat subsystem to allow it to work with hive
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1239808&r1=1239807&r2=1239808&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Feb 2 20:28:51 2012
@@ -35,7 +35,9 @@ Release 0.3.0 - Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
- HCAT-204. HCatRecord SerDe (kgorath via gates)
+ HCAT-207. Changes to current HCat subsystem to allow it to work with hive
(khorgath via gates)
+
+ HCAT-204. HCatRecord SerDe (khorgath via gates)
HCAT-192. HBase output storage driver integration with zookeeper based
revision manager (toffer via hashutosh)
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1239808&r1=1239807&r2=1239808&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
Thu Feb 2 20:28:51 2012
@@ -75,6 +75,15 @@ public final class HCatConstants {
public static final String HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE =
HCAT_KEY_OUTPUT_BASE + ".jobclient.token.sig";
public static final String HCAT_KEY_JOBCLIENT_TOKEN_STRFORM =
HCAT_KEY_OUTPUT_BASE + ".jobclient.token.strform";
+ public static final String[] OUTPUT_CONFS_TO_SAVE = {
+ HCAT_KEY_OUTPUT_INFO,
+ HCAT_KEY_HIVE_CONF,
+ HCAT_KEY_TOKEN_SIGNATURE,
+ HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE,
+ HCAT_KEY_JOBCLIENT_TOKEN_STRFORM
+ };
+
+
public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq";
public static final String HCAT_MSG_EXPIRY_DURATION =
"hcat.msg.expiry.duration";
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1239808&r1=1239807&r2=1239808&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
Thu Feb 2 20:28:51 2012
@@ -26,6 +26,7 @@ import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -55,6 +57,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenIdentifier;
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.data.Pair;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
@@ -418,12 +421,17 @@ public class HCatUtil {
public static void logEntrySet(Log logger, String itemName,
Set<? extends Entry> entrySet) {
- logger.info(itemName + ":");
- for (Entry e : entrySet) {
- logger.info("\t[" + e.getKey() + "]=>[" + e.getValue() + "]");
- }
+ logIterableSet(logger,itemName,entrySet.iterator());
}
+ public static void logIterableSet(Log logger, String itemName, Iterator<?
extends Entry> iterator){
+ logger.info(itemName + ":");
+ while (iterator.hasNext()){
+ Entry e = iterator.next();
+ logger.debug("\t[" + e.getKey() + "]=>[" + e.getValue() + "]");
+ }
+ }
+
public static void logAllTokens(Log logger, JobContext context)
throws IOException {
for (Token<? extends TokenIdentifier> t : context.getCredentials()
@@ -459,4 +467,15 @@ public class HCatUtil {
}
}
+ public static Pair<String,String> getDbAndTableName(String tableName)
throws IOException{
+ String[] dbTableNametokens = tableName.split("\\.");
+ if(dbTableNametokens.length == 1) {
+ return new
Pair<String,String>(MetaStoreUtils.DEFAULT_DATABASE_NAME,tableName);
+ }else if (dbTableNametokens.length == 2) {
+ return new Pair<String, String>(dbTableNametokens[0],
dbTableNametokens[1]);
+ }else{
+ throw new IOException("tableName expected in the form "
+ +"<databasename>.<table name> or <table name>. Got " + tableName);
+ }
+ }
}
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1239808&r1=1239807&r2=1239808&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
Thu Feb 2 20:28:51 2012
@@ -19,17 +19,24 @@ package org.apache.hcatalog.mapreduce;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
/** The HCat wrapper for the underlying RecordReader, this ensures that the
initialize on
* the underlying record reader is done with the underlying split, not with
HCatSplit.
*/
-class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> {
+class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord>
+ implements org.apache.hadoop.mapred.RecordReader {
+
+ Log LOG = LogFactory.getLog(HCatRecordReader.class);
+ int lineCount = 0;
/** The underlying record reader to delegate to. */
private final RecordReader<? extends WritableComparable, ? extends
Writable> baseRecordReader;
@@ -74,15 +81,25 @@ class HCatRecordReader extends RecordRea
*/
@Override
public HCatRecord getCurrentValue() throws IOException,
InterruptedException {
- return
storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue());
+ HCatRecord r =
storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue());
+ return r;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
*/
@Override
- public float getProgress() throws IOException, InterruptedException {
- return baseRecordReader.getProgress();
+ public float getProgress() {
+ try {
+ return baseRecordReader.getProgress();
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ }
+ return 0.0f; // errored
}
/* (non-Javadoc)
@@ -90,6 +107,7 @@ class HCatRecordReader extends RecordRea
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
+ lineCount++;
return baseRecordReader.nextKeyValue();
}
@@ -100,4 +118,46 @@ class HCatRecordReader extends RecordRea
public void close() throws IOException {
baseRecordReader.close();
}
+
+ @Override
+ public Object createKey() {
+ WritableComparable o = null;
+ try {
+ o = getCurrentKey();
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ }
+ return o;
+ }
+
+ @Override
+ public Object createValue() {
+ return new DefaultHCatRecord();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return lineCount;
+ }
+
+ @Override
+ public boolean next(Object key, Object value) throws IOException {
+ try {
+ if (!nextKeyValue()){
+ return false;
+ }
+
+ ((HCatRecord)value).copy(getCurrentValue());
+
+ return true;
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ }
+ return false;
+ }
}
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java?rev=1239808&r1=1239807&r2=1239808&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
Thu Feb 2 20:28:51 2012
@@ -22,6 +22,8 @@ import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -29,8 +31,10 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.schema.HCatSchema;
/** The HCatSplit wrapper around the InputSplit returned by the underlying
InputFormat */
-class HCatSplit extends InputSplit implements Writable {
+public class HCatSplit extends InputSplit implements
Writable,org.apache.hadoop.mapred.InputSplit {
+ Log LOG = LogFactory.getLog(HCatSplit.class);
+
/** The partition info for the split. */
private PartInfo partitionInfo;
@@ -94,16 +98,34 @@ class HCatSplit extends InputSplit imple
* @see org.apache.hadoop.mapreduce.InputSplit#getLength()
*/
@Override
- public long getLength() throws IOException, InterruptedException {
- return baseSplit.getLength();
+ public long getLength() {
+ try {
+ return baseSplit.getLength();
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ }
+ return 0; // we errored
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
*/
@Override
- public String[] getLocations() throws IOException, InterruptedException {
- return baseSplit.getLocations();
+ public String[] getLocations() {
+ try {
+ return baseSplit.getLocations();
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage());
+ LOG.warn(e.getStackTrace());
+ }
+ return new String[0]; // we errored
}
/* (non-Javadoc)
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1239808&r1=1239807&r2=1239808&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
Thu Feb 2 20:28:51 2012
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -46,6 +48,8 @@ import org.apache.hcatalog.data.schema.H
* info required in the client process context.
*/
public class InitializeInput {
+
+ private static final Log LOG = LogFactory.getLog(InitializeInput.class);
/** The prefix for keys used for storage driver arguments */
static final String HCAT_KEY_PREFIX = "hcat.";
@@ -69,10 +73,23 @@ public class InitializeInput {
//* Create and initialize an InputJobInfo object
//* Serialize the InputJobInfo and save in the Job's Configuration object
+ job.getConfiguration().set(
+ HCatConstants.HCAT_KEY_JOB_INFO,
+ getSerializedHcatKeyJobInfo(job, inputJobInfo,null));
+ }
+
+ public static String getSerializedHcatKeyJobInfo(Job job, InputJobInfo
inputJobInfo, String locationFilter) throws Exception {
+ //* Create and initialize an InputJobInfo object
+
HiveMetaStoreClient client = null;
try {
- client = createHiveMetaClient(job.getConfiguration(),inputJobInfo);
+ if (job != null){
+ client = createHiveMetaClient(job.getConfiguration(),inputJobInfo);
+ } else {
+ hiveConf = new HiveConf(HCatInputFormat.class);
+ client = new HiveMetaStoreClient(hiveConf, null);
+ }
Table table = client.getTable(inputJobInfo.getDatabaseName(),
inputJobInfo.getTableName());
@@ -107,17 +124,15 @@ public class InitializeInput {
inputJobInfo.setPartitions(partInfoList);
inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
- job.getConfiguration().set(
- HCatConstants.HCAT_KEY_JOB_INFO,
- HCatUtil.serialize(inputJobInfo)
- );
+ return HCatUtil.serialize(inputJobInfo);
} finally {
if (client != null ) {
client.close();
}
}
- }
+ }
+
private static Map<String, String> createPtnKeyValueMap(Table table,
Partition ptn) throws IOException{
List<String> values = ptn.getValues();
if( values.size() != table.getPartitionKeys().size() ) {
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1239808&r1=1239807&r2=1239808&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
Thu Feb 2 20:28:51 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatArrayBag;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.Pair;
@@ -68,12 +69,9 @@ public class PigHCatUtil {
// <database name>.<table name> - parse it and
// communicate the information to HCatInputFormat
- String[] dbTableNametokens = location.split("\\.");
- if(dbTableNametokens.length == 1) {
- return new Pair<String,String>(DEFAULT_DB,location);
- }else if (dbTableNametokens.length == 2) {
- return new Pair<String, String>(dbTableNametokens[0],
dbTableNametokens[1]);
- }else{
+ try {
+ return HCatUtil.getDbAndTableName(location);
+ } catch (IOException e) {
String locationErrMsg = "The input location in load statement " +
"should be of the form " +
"<databasename>.<table name> or <table name>. Got " + location;