Author: toffer
Date: Tue Apr 17 21:18:07 2012
New Revision: 1327286
URL: http://svn.apache.org/viewvc?rev=1327286&view=rev
Log:
merged from trunk: HCATALOG-363 HCat leaks metastore connection (rohini via
toffer)
Modified:
incubator/hcatalog/branches/branch-0.4/ (props changed)
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatUtil.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
Propchange: incubator/hcatalog/branches/branch-0.4/
------------------------------------------------------------------------------
Merged /incubator/hcatalog/trunk:r1327285
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1327286&r1=1327285&r2=1327286&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Tue Apr 17 21:18:07 2012
@@ -108,6 +108,8 @@ Release 0.4.0 - Unreleased
OPTIMIZATIONS
BUG FIXES
+ HCAT-363 HCat leaks metastore connection (rohini via toffer)
+
HCAT-314 HCatOutputFormat.setOutput is called more than once by HCatStorer
(avandana via toffer)
HCAT-378 Found a few source files missing Apache headers (gates)
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatUtil.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1327286&r1=1327285&r2=1327286&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatUtil.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatUtil.java
Tue Apr 17 21:18:07 2012
@@ -591,10 +591,18 @@ public class HCatUtil {
}
}
- //TODO remove url component, everything should be encapsulated in HiveConf
- public static HiveMetaStoreClient createHiveClient(HiveConf hiveConf)
- throws MetaException {
- return new HiveMetaStoreClient(hiveConf);
+ public static HiveMetaStoreClient createHiveClient(HiveConf hiveConf)
+ throws MetaException {
+ return new HiveMetaStoreClient(hiveConf);
+ }
+
+ public static void closeHiveClientQuietly(HiveMetaStoreClient client) {
+ try {
+ if (client != null)
+ client.close();
+ } catch (Exception e) {
+ LOG.debug("Error closing metastore client", e);
+ }
}
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1327286&r1=1327285&r2=1327286&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
Tue Apr 17 21:18:07 2012
@@ -90,15 +90,18 @@ class DefaultOutputCommitterContainer ex
getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
//Cancel HCat and JobTracker tokens
+ HiveMetaStoreClient client = null;
try {
HiveConf hiveConf =
HCatUtil.getHiveConf(context.getConfiguration());
- HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
+ client = HCatUtil.createHiveClient(hiveConf);
String tokenStrForm = client.getTokenStrForm();
if(tokenStrForm != null &&
context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null)
{
client.cancelDelegationToken(tokenStrForm);
}
} catch (Exception e) {
LOG.warn("Failed to cancel delegation token", e);
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
}
}
}
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1327286&r1=1327285&r2=1327286&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
Tue Apr 17 21:18:07 2012
@@ -172,15 +172,14 @@ class FileOutputCommitterContainer exten
}
}
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
-
+ HiveMetaStoreClient client = null;
try {
HiveConf hiveConf =
HCatUtil.getHiveConf(jobContext.getConfiguration());
- HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
+ client = HCatUtil.createHiveClient(hiveConf);
// cancel the deleg. tokens that were acquired for this job now
that
// we are done - we should cancel if the tokens were acquired by
// HCatOutputFormat and not if they were supplied by Oozie.
- // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
+ // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
// the conf will not be set
String tokenStrForm = client.getTokenStrForm();
if(tokenStrForm != null && jobContext.getConfiguration().get
@@ -193,9 +192,12 @@ class FileOutputCommitterContainer exten
} else {
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
e);
}
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
}
Path src;
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
if (dynamicPartitioningUsed){
src = new Path(getPartitionRootLocation(
jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize()
@@ -402,9 +404,7 @@ class FileOutputCommitterContainer exten
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
e);
}
} finally {
- if( client != null ) {
- client.close();
- }
+ HCatUtil.closeHiveClientQuietly(client);
}
}
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1327286&r1=1327285&r2=1327286&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
Tue Apr 17 21:18:07 2012
@@ -113,11 +113,13 @@ class FileOutputFormatContainer extends
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+ HiveMetaStoreClient client = null;
try {
HiveConf hiveConf =
HCatUtil.getHiveConf(context.getConfiguration());
+ client = HCatUtil.createHiveClient(hiveConf);
handleDuplicatePublish(context,
jobInfo,
- HCatUtil.createHiveClient(hiveConf),
+ client,
jobInfo.getTableInfo().getTable());
} catch (MetaException e) {
throw new IOException(e);
@@ -125,6 +127,8 @@ class FileOutputFormatContainer extends
throw new IOException(e);
} catch (NoSuchObjectException e) {
throw new IOException(e);
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
}
if(!jobInfo.isDynamicPartitioningUsed()) {
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1327286&r1=1327285&r2=1327286&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
Tue Apr 17 21:18:07 2012
@@ -70,17 +70,16 @@ public class HCatOutputFormat extends HC
@SuppressWarnings("unchecked")
public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws
IOException {
HiveMetaStoreClient client = null;
- HiveConf hiveConf = null;
try {
Configuration conf = job.getConfiguration();
- hiveConf = HCatUtil.getHiveConf(conf);
+ HiveConf hiveConf = HCatUtil.getHiveConf(conf);
client = HCatUtil.createHiveClient(hiveConf);
Table table = client.getTable(outputJobInfo.getDatabaseName(),
outputJobInfo.getTableName());
-
+
List<String> indexList =
client.listIndexNames(outputJobInfo.getDatabaseName(),
outputJobInfo.getTableName(), Short.MAX_VALUE);
-
+
for (String indexName : indexList) {
Index index = client.getIndex(outputJobInfo.getDatabaseName(),
outputJobInfo.getTableName(), indexName);
if (!index.isDeferredRebuild()) {
@@ -88,19 +87,19 @@ public class HCatOutputFormat extends HC
}
}
StorageDescriptor sd = table.getSd();
-
+
if (sd.isCompressed()) {
throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into
a compressed partition from Pig/Mapreduce is not supported");
}
-
+
if (sd.getBucketCols()!=null && !sd.getBucketCols().isEmpty()) {
throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into
a partition with bucket definition from Pig/Mapreduce is not supported");
}
-
+
if (sd.getSortCols()!=null && !sd.getSortCols().isEmpty()) {
throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into
a partition with sorted column definition from Pig/Mapreduce is not supported");
}
-
+
if (table.getPartitionKeysSize() == 0 ){
if ((outputJobInfo.getPartitionValues() != null) &&
(!outputJobInfo.getPartitionValues().isEmpty())){
// attempt made to save partition values in non-partitioned table
- throw error.
@@ -196,10 +195,7 @@ public class HCatOutputFormat extends HC
throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
}
} finally {
- if( client != null ) {
- client.close();
- }
-// HCatUtil.logAllTokens(LOG,job);
+ HCatUtil.closeHiveClientQuietly(client);
}
}
@@ -247,7 +243,7 @@ public class HCatOutputFormat extends HC
}
private static int getMaxDynamicPartitions(HiveConf hConf) {
- // by default the bounds checking for maximum number of
+ // by default the bounds checking for maximum number of
// dynamic partitions is disabled (-1)
int maxDynamicPartitions = -1;
@@ -262,5 +258,5 @@ public class HCatOutputFormat extends HC
private static boolean getHarRequested(HiveConf hConf) {
return hConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
}
-
+
}
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1327286&r1=1327285&r2=1327286&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
Tue Apr 17 21:18:07 2012
@@ -47,16 +47,8 @@ import org.apache.hcatalog.data.schema.H
* info required in the client process context.
*/
public class InitializeInput {
-
- private static final Log LOG = LogFactory.getLog(InitializeInput.class);
-
- private static HiveConf hiveConf;
- private static HiveMetaStoreClient createHiveMetaClient(Configuration conf)
throws Exception {
-
- hiveConf = HCatUtil.getHiveConf(conf);
- return new HiveMetaStoreClient(hiveConf, null);
- }
+ private static final Log LOG = LogFactory.getLog(InitializeInput.class);
/**
* Set the input to use for the Job. This queries the metadata server with
the specified partition predicates,
@@ -71,7 +63,7 @@ public class InitializeInput {
//* Serialize the InputJobInfo and save in the Job's Configuration object
job.getConfiguration().set(
- HCatConstants.HCAT_KEY_JOB_INFO,
+ HCatConstants.HCAT_KEY_JOB_INFO,
getSerializedHcatKeyJobInfo(job, inputJobInfo,null));
}
@@ -79,14 +71,14 @@ public class InitializeInput {
//* Create and initialize an InputJobInfo object
HiveMetaStoreClient client = null;
-
+ HiveConf hiveConf = null;
try {
if (job != null){
- client = createHiveMetaClient(job.getConfiguration());
+ hiveConf = HCatUtil.getHiveConf(job.getConfiguration());
} else {
hiveConf = new HiveConf(HCatInputFormat.class);
- client = new HiveMetaStoreClient(hiveConf, null);
}
+ client = HCatUtil.createHiveClient(hiveConf);
Table table = client.getTable(inputJobInfo.getDatabaseName(),
inputJobInfo.getTableName());
@@ -108,8 +100,8 @@ public class InitializeInput {
// populate partition info
for (Partition ptn : parts){
- PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(),
- job.getConfiguration(),
+ PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(),
+ job.getConfiguration(),
inputJobInfo);
partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
partInfoList.add(partInfo);
@@ -118,7 +110,7 @@ public class InitializeInput {
}else{
//Non partitioned table
PartInfo partInfo =
extractPartInfo(table.getSd(),table.getParameters(),
- job.getConfiguration(),
+ job.getConfiguration(),
inputJobInfo);
partInfo.setPartitionValues(new HashMap<String,String>());
partInfoList.add(partInfo);
@@ -127,13 +119,11 @@ public class InitializeInput {
return HCatUtil.serialize(inputJobInfo);
} finally {
- if (client != null ) {
- client.close();
- }
+ HCatUtil.closeHiveClientQuietly(client);
}
}
-
+
private static Map<String, String> createPtnKeyValueMap(Table table,
Partition ptn) throws IOException{
List<String> values = ptn.getValues();
if( values.size() != table.getPartitionKeys().size() ) {
@@ -155,25 +145,25 @@ public class InitializeInput {
return ptnKeyValues;
}
- static PartInfo extractPartInfo(StorageDescriptor sd,
- Map<String,String> parameters, Configuration conf,
+ static PartInfo extractPartInfo(StorageDescriptor sd,
+ Map<String,String> parameters, Configuration conf,
InputJobInfo inputJobInfo) throws IOException{
HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd);
StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters);
Properties hcatProperties = new Properties();
- HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,
+ HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,
storerInfo);
// copy the properties from storageHandler to jobProperties
Map<String, String>jobProperties = HCatUtil.getInputJobProperties(
- storageHandler,
+ storageHandler,
inputJobInfo);
for (String key : parameters.keySet()){
hcatProperties.put(key, parameters.get(key));
}
- // FIXME
+ // FIXME
// Bloating partinfo with inputJobInfo is not good
return new PartInfo(schema, storageHandler,
sd.getLocation(), hcatProperties,
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1327286&r1=1327285&r2=1327286&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
Tue Apr 17 21:18:07 2012
@@ -29,7 +29,6 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -91,13 +90,8 @@ public class PigHCatUtil {
return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
}
- static HiveMetaStoreClient client = null;
-
private static HiveMetaStoreClient createHiveMetaClient(String serverUri,
String serverKerberosPrincipal, Class<?> clazz) throws Exception {
- if (client != null){
- return client;
- }
HiveConf hiveConf = new HiveConf(clazz);
if (serverUri != null){
@@ -111,11 +105,10 @@ public class PigHCatUtil {
}
try {
- client = new HiveMetaStoreClient(hiveConf,null);
+ return new HiveMetaStoreClient(hiveConf,null);
} catch (Exception e){
throw new Exception("Could not instantiate a HiveMetaStoreClient
connecting to server uri:["+serverUri+"]",e);
}
- return client;
}
@@ -146,6 +139,7 @@ public class PigHCatUtil {
String dbName = dbTablePair.first;
String tableName = dbTablePair.second;
Table table = null;
+ HiveMetaStoreClient client = null;
try {
client = createHiveMetaClient(hcatServerUri, hcatServerPrincipal,
PigHCatUtil.class);
table = client.getTable(dbName, tableName);
@@ -153,6 +147,8 @@ public class PigHCatUtil {
throw new PigException("Table not found : " + nsoe.getMessage(),
PIG_EXCEPTION_CODE); // prettier error messages to frontend
} catch (Exception e) {
throw new IOException(e);
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
}
hcatTableCache.put(loc_server, table);
return table;