Author: avandana
Date: Wed Oct 3 02:32:14 2012
New Revision: 1393259
URL: http://svn.apache.org/viewvc?rev=1393259&view=rev
Log:
HCAT-451 Partitions are created even when Jobs are aborted
Added:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Oct 3 02:32:14 2012
@@ -119,6 +119,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-451 Partitions are created even when Jobs are aborted (avandana)
+
HCAT-513 Data Store onto HCatalog table fails for dynamic partitioning as
the temporary directory gets deleted by the completed map tasks (amalakar via
toffer)
HCAT-497 HCatContext should use the jobconf instead of its own conf
(traviscrawford)
Modified:
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
---
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java
(original)
+++
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java
Wed Oct 3 02:32:14 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hcatalog.pig;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -25,6 +26,7 @@ import java.util.Iterator;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hcatalog.HcatTestUtils;
import org.apache.hcatalog.mapreduce.HCatBaseTest;
+import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
@@ -593,4 +595,62 @@ public class TestHCatStorer extends HCat
Assert.assertEquals(0, results.size());
driver.run("drop table employee");
}
+
+ public void testPartitionPublish()
+ throws IOException, CommandNeedRetryException {
+
+ driver.run("drop table ptn_fail");
+ String createTable = "create table ptn_fail(a int, c string)
partitioned by (b string) stored as RCFILE";
+ int retCode = driver.run(createTable).getResponseCode();
+ if (retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ int LOOP_SIZE = 11;
+ String[] input = new String[LOOP_SIZE];
+
+ for (int i = 0; i < LOOP_SIZE; i++) {
+ input[i] = i + "\tmath";
+ }
+ HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("A = load '" + INPUT_FILE_NAME
+ + "' as (a:int, c:chararray);");
+ server.registerQuery("B = filter A by " + FailEvalFunc.class.getName()
+ + "($0);");
+ server.registerQuery("store B into 'ptn_fail' using "
+ + HCatStorer.class.getName() + "('b=math');");
+ server.executeBatch();
+
+ String query = "show partitions ptn_fail";
+ retCode = driver.run(query).getResponseCode();
+
+ if (retCode != 0) {
+ throw new IOException("Error " + retCode + " running query "
+ + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ Assert.assertEquals(0, res.size());
+
+ // Make sure the partitions directory is not in hdfs.
+ Assert.assertTrue((new File(TEST_WAREHOUSE_DIR +
"/ptn_fail")).exists());
+ Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math"))
+ .exists());
+ }
+
+ static public class FailEvalFunc extends EvalFunc<Boolean> {
+
+ /*
+ * @param Tuple /* @return null /* @throws IOException
+ *
+ * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public Boolean exec(Tuple tuple) throws IOException {
+ throw new IOException("Eval Func to mimic Failure.");
+ }
+
+ }
}
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
Wed Oct 3 02:32:14 2012
@@ -18,6 +18,15 @@
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -52,15 +61,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
/**
* Part of the FileOutput*Container classes
* See {@link FileOutputFormatContainer} for more information
@@ -139,59 +139,40 @@ class FileOutputCommitterContainer exten
@Override
public void abortJob(JobContext jobContext, State state) throws
IOException {
- org.apache.hadoop.mapred.JobContext
- mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext);
- if (dynamicPartitioningUsed) {
- discoverPartitions(jobContext);
- }
-
- if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
- getBaseOutputCommitter().abortJob(mapRedJobContext, state);
- } else if (dynamicPartitioningUsed) {
- for (JobContext currContext : contextDiscoveredByPath.values()) {
- try {
- new
JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext,
state);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- HiveMetaStoreClient client = null;
try {
- HiveConf hiveConf =
HCatUtil.getHiveConf(jobContext.getConfiguration());
- client = HCatUtil.getHiveClient(hiveConf);
- // cancel the deleg. tokens that were acquired for this job now
that
- // we are done - we should cancel if the tokens were acquired by
- // HCatOutputFormat and not if they were supplied by Oozie.
- // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
- // the conf will not be set
- String tokenStrForm = client.getTokenStrForm();
- if (tokenStrForm != null && jobContext.getConfiguration().get
- (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
- client.cancelDelegationToken(tokenStrForm);
+ if (dynamicPartitioningUsed) {
+ discoverPartitions(jobContext);
}
- } catch (Exception e) {
- if (e instanceof HCatException) {
- throw (HCatException) e;
+ org.apache.hadoop.mapred.JobContext mapRedJobContext =
HCatMapRedUtil
+ .createJobContext(jobContext);
+ if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+ getBaseOutputCommitter().abortJob(mapRedJobContext, state);
+ } else if (dynamicPartitioningUsed) {
+ for (JobContext currContext :
contextDiscoveredByPath.values()) {
+ try {
+ new JobConf(currContext.getConfiguration())
+ .getOutputCommitter().abortJob(currContext,
+ state);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ Path src;
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+ if (dynamicPartitioningUsed) {
+ src = new Path(getPartitionRootLocation(jobInfo.getLocation()
+ .toString(), jobInfo.getTableInfo().getTable()
+ .getPartitionKeysSize()));
} else {
- throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
e);
+ src = new Path(jobInfo.getLocation());
}
+ FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+ LOG.info("Job failed. Cleaning up temporary directory [{}].", src);
+ fs.delete(src, true);
} finally {
- HCatUtil.closeHiveClientQuietly(client);
+ cancelDelegationTokens(jobContext);
}
-
- Path src;
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
- if (dynamicPartitioningUsed) {
- src = new Path(getPartitionRootLocation(jobInfo.getLocation(),
- jobInfo.getTableInfo().getTable().getPartitionKeysSize()));
- } else {
- src = new Path(jobInfo.getLocation());
- }
- FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
-// LOG.warn("abortJob about to delete ["+src.toString() +"]");
- fs.delete(src, true);
}
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -205,191 +186,50 @@ class FileOutputCommitterContainer exten
@Override
public void commitJob(JobContext jobContext) throws IOException {
- if (dynamicPartitioningUsed) {
- discoverPartitions(jobContext);
- // Commit each partition so it gets moved out of the job work dir
- for (JobContext context : contextDiscoveredByPath.values()) {
- new
JobConf(context.getConfiguration()).getOutputCommitter().commitJob(context);
- }
- }
- if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-
getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
- }
- // create _SUCCESS FILE if so requested.
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
- if (getOutputDirMarking(jobContext.getConfiguration())) {
- Path outputPath = new Path(jobInfo.getLocation());
- if (outputPath != null) {
- FileSystem fileSys =
outputPath.getFileSystem(jobContext.getConfiguration());
- // create a file in the folder to mark it
- if (fileSys.exists(outputPath)) {
- Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
- if (!fileSys.exists(filePath)) { // may have been created
by baseCommitter.commitJob()
- fileSys.create(filePath).close();
- }
- }
- }
- }
- cleanupJob(jobContext);
- }
-
- @Override
- public void cleanupJob(JobContext context) throws IOException {
-
- if (dynamicPartitioningUsed) {
- discoverPartitions(context);
- }
-
-
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
- Configuration conf = context.getConfiguration();
- Table table = new Table(jobInfo.getTableInfo().getTable());
- Path tblPath = new Path(table.getTTable().getSd().getLocation());
- FileSystem fs = tblPath.getFileSystem(conf);
-
- if (table.getPartitionKeys().size() == 0) {
- //non partitioned table
- if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-
getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
- } else if (dynamicPartitioningUsed) {
- for (JobContext currContext :
contextDiscoveredByPath.values()) {
- try {
- JobConf jobConf = new
JobConf(currContext.getConfiguration());
- jobConf.getOutputCommitter().cleanupJob(currContext);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- //Move data from temp directory the actual table directory
- //No metastore operation required.
- Path src = new Path(jobInfo.getLocation());
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
- return;
- }
-
- HiveMetaStoreClient client = null;
- HCatTableInfo tableInfo = jobInfo.getTableInfo();
-
- List<Partition> partitionsAdded = new ArrayList<Partition>();
-
try {
- HiveConf hiveConf = HCatUtil.getHiveConf(conf);
- client = HCatUtil.getHiveClient(hiveConf);
-
- StorerInfo storer =
- InternalUtil.extractStorerInfo(table.getTTable().getSd(),
table.getParameters());
-
- updateTableSchema(client, table, jobInfo.getOutputSchema());
-
- FileStatus tblStat = fs.getFileStatus(tblPath);
- String grpName = tblStat.getGroup();
- FsPermission perms = tblStat.getPermission();
-
- List<Partition> partitionsToAdd = new ArrayList<Partition>();
- if (!dynamicPartitioningUsed) {
- partitionsToAdd.add(
- constructPartition(
- context, jobInfo,
- tblPath.toString(), jobInfo.getPartitionValues()
- , jobInfo.getOutputSchema(),
getStorerParameterMap(storer)
- , table, fs
- , grpName, perms));
- } else {
- for (Entry<String, Map<String, String>> entry :
partitionsDiscoveredByPath.entrySet()) {
- partitionsToAdd.add(
- constructPartition(
- context, jobInfo,
- getPartitionRootLocation(entry.getKey(),
entry.getValue().size()), entry.getValue()
- , jobInfo.getOutputSchema(),
getStorerParameterMap(storer)
- , table, fs
- , grpName, perms));
+ if (dynamicPartitioningUsed) {
+ discoverPartitions(jobContext);
+ // Commit each partition so it gets moved out of the job work
+ // dir
+ for (JobContext context : contextDiscoveredByPath.values()) {
+ new JobConf(context.getConfiguration())
+ .getOutputCommitter().commitJob(context);
}
}
-
- //Publish the new partition(s)
- if (dynamicPartitioningUsed && harProcessor.isEnabled() &&
(!partitionsToAdd.isEmpty())) {
-
- Path src = new Path(ptnRootLocation);
-
- // check here for each dir we're copying out, to see if it
already exists, error out if so
- moveTaskOutputs(fs, src, src, tblPath, true);
-
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
-
-
-// for (Partition partition : partitionsToAdd){
-// partitionsAdded.add(client.add_partition(partition));
-// // currently following add_partition instead of add_partitions
because latter isn't
-// // all-or-nothing and we want to be able to roll back partitions
we added if need be.
-// }
-
- try {
- client.add_partitions(partitionsToAdd);
- partitionsAdded = partitionsToAdd;
- } catch (Exception e) {
- // There was an error adding partitions : rollback fs copy
and rethrow
- for (Partition p : partitionsToAdd) {
- Path ptnPath = new
Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
- if (fs.exists(ptnPath)) {
- fs.delete(ptnPath, true);
- }
- }
- throw e;
- }
-
- } else {
- // no harProcessor, regular operation
-
- // No duplicate partition publish case to worry about because
we'll
- // get a AlreadyExistsException here if so, and appropriately
rollback
-
- client.add_partitions(partitionsToAdd);
- partitionsAdded = partitionsToAdd;
-
- if (dynamicPartitioningUsed && (partitionsAdded.size() > 0)) {
- Path src = new Path(ptnRootLocation);
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
- }
-
- }
-
if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-
getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
+ getBaseOutputCommitter().commitJob(
+ HCatMapRedUtil.createJobContext(jobContext));
}
-
- if (Security.getInstance().isSecurityEnabled()) {
- Security.getInstance().cancelToken(client, context);
- }
- } catch (Exception e) {
-
- if (partitionsAdded.size() > 0) {
- try {
- //baseCommitter.cleanupJob failed, try to clean up the
metastore
- for (Partition p : partitionsAdded) {
- client.dropPartition(tableInfo.getDatabaseName(),
- tableInfo.getTableName(), p.getValues());
+ registerPartitions(jobContext);
+ // create _SUCCESS FILE if so requested.
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+ if (getOutputDirMarking(jobContext.getConfiguration())) {
+ Path outputPath = new Path(jobInfo.getLocation());
+ if (outputPath != null) {
+ FileSystem fileSys = outputPath.getFileSystem(jobContext
+ .getConfiguration());
+ // create a file in the folder to mark it
+ if (fileSys.exists(outputPath)) {
+ Path filePath = new Path(outputPath,
+ SUCCEEDED_FILE_NAME);
+ if (!fileSys.exists(filePath)) { // may have been
+ // created by
+ //
baseCommitter.commitJob()
+ fileSys.create(filePath).close();
+ }
}
- } catch (Exception te) {
- //Keep cause as the original exception
- throw new
HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
}
}
-
- if (e instanceof HCatException) {
- throw (HCatException) e;
- } else {
- throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
e);
- }
} finally {
- HCatUtil.closeHiveClientQuietly(client);
+ cancelDelegationTokens(jobContext);
}
}
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ throw new IOException("The method cleanupJob is deprecated and should
not be called.");
+ }
+
private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) {
if (ptnRootLocation == null) {
// we only need to calculate it once, it'll be the same for other
partitions in this job.
@@ -478,7 +318,6 @@ class FileOutputCommitterContainer exten
} else {
partition.getSd().setLocation(partPath.toString());
}
-
return partition;
}
@@ -701,4 +540,153 @@ class FileOutputCommitterContainer exten
}
}
+ private void registerPartitions(JobContext context) throws IOException{
+ if (dynamicPartitioningUsed){
+ discoverPartitions(context);
+ }
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+ Configuration conf = context.getConfiguration();
+ Table table = new Table(jobInfo.getTableInfo().getTable());
+ Path tblPath = new Path(table.getTTable().getSd().getLocation());
+ FileSystem fs = tblPath.getFileSystem(conf);
+
+ if( table.getPartitionKeys().size() == 0 ) {
+ //Move data from temp directory the actual table directory
+ //No metastore operation required.
+ Path src = new Path(jobInfo.getLocation());
+ moveTaskOutputs(fs, src, src, tblPath, false);
+ fs.delete(src, true);
+ return;
+ }
+
+ HiveMetaStoreClient client = null;
+ HCatTableInfo tableInfo = jobInfo.getTableInfo();
+ List<Partition> partitionsAdded = new ArrayList<Partition>();
+ try {
+ HiveConf hiveConf = HCatUtil.getHiveConf(conf);
+ client = HCatUtil.getHiveClient(hiveConf);
+ StorerInfo storer =
InternalUtil.extractStorerInfo(table.getTTable().getSd(),table.getParameters());
+
+ FileStatus tblStat = fs.getFileStatus(tblPath);
+ String grpName = tblStat.getGroup();
+ FsPermission perms = tblStat.getPermission();
+
+ List<Partition> partitionsToAdd = new ArrayList<Partition>();
+ if (!dynamicPartitioningUsed){
+ partitionsToAdd.add(
+ constructPartition(
+ context,jobInfo,
+ tblPath.toString(),
jobInfo.getPartitionValues()
+ ,jobInfo.getOutputSchema(),
getStorerParameterMap(storer)
+ ,table, fs
+ ,grpName,perms));
+ }else{
+ for (Entry<String,Map<String,String>> entry :
partitionsDiscoveredByPath.entrySet()){
+ partitionsToAdd.add(
+ constructPartition(
+ context,jobInfo,
+
getPartitionRootLocation(entry.getKey(),entry.getValue().size()),
entry.getValue()
+ ,jobInfo.getOutputSchema(),
getStorerParameterMap(storer)
+ ,table, fs
+ ,grpName,perms));
+ }
+ }
+
+ ArrayList<Map<String,String>> ptnInfos = new
ArrayList<Map<String,String>>();
+ for(Partition ptn : partitionsToAdd){
+ ptnInfos.add(InternalUtil.createPtnKeyValueMap(new
Table(tableInfo.getTable()), ptn));
+ }
+
+ //Publish the new partition(s)
+ if (dynamicPartitioningUsed && harProcessor.isEnabled() &&
(!partitionsToAdd.isEmpty())){
+
+ Path src = new Path(ptnRootLocation);
+ // check here for each dir we're copying out, to see if it
+ // already exists, error out if so
+ moveTaskOutputs(fs, src, src, tblPath,true);
+ moveTaskOutputs(fs, src, src, tblPath,false);
+ fs.delete(src, true);
+ try {
+ updateTableSchema(client, table,
jobInfo.getOutputSchema());
+ LOG.info("The table {} has new partitions {}.",
table.getTableName(),ptnInfos);
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
+ } catch (Exception e){
+ // There was an error adding partitions : rollback fs copy
and rethrow
+ for (Partition p : partitionsToAdd){
+ Path ptnPath = new
Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+ if (fs.exists(ptnPath)){
+ fs.delete(ptnPath,true);
+ }
+ }
+ throw e;
+ }
+
+ }else{
+ // no harProcessor, regular operation
+ // No duplicate partition publish case to worry about because
we'll
+ // get a AlreadyExistsException here if so, and appropriately
rollback
+ updateTableSchema(client, table, jobInfo.getOutputSchema());
+ LOG.info("The table {} has new partitions {}.",
table.getTableName(),ptnInfos);
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
+ if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
+ Path src = new Path(ptnRootLocation);
+ moveTaskOutputs(fs, src, src, tblPath,false);
+ fs.delete(src, true);
+ }
+ }
+ } catch (Exception e) {
+ if (partitionsAdded.size() > 0) {
+ try {
+ // baseCommitter.cleanupJob failed, try to clean up the
+ // metastore
+ for (Partition p : partitionsAdded) {
+ client.dropPartition(tableInfo.getDatabaseName(),
+ tableInfo.getTableName(), p.getValues());
+ }
+ } catch (Exception te) {
+ // Keep cause as the original exception
+ throw new HCatException(
+ ErrorType.ERROR_PUBLISHING_PARTITION, e);
+ }
+ }
+ if (e instanceof HCatException) {
+ throw (HCatException) e;
+ } else {
+ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
e);
+ }
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
+ }
+
+ private void cancelDelegationTokens(JobContext context) throws IOException{
+ LOG.info("Cancelling deletgation token for the job.");
+ HiveMetaStoreClient client = null;
+ try {
+ HiveConf hiveConf = HCatUtil
+ .getHiveConf(context.getConfiguration());
+ client = HCatUtil.getHiveClient(hiveConf);
+ // cancel the deleg. tokens that were acquired for this job now
that
+ // we are done - we should cancel if the tokens were acquired by
+ // HCatOutputFormat and not if they were supplied by Oozie.
+ // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
+ // the conf will not be set
+ String tokenStrForm = client.getTokenStrForm();
+ if (tokenStrForm != null
+ && context.getConfiguration().get(
+ HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ client.cancelDelegationToken(tokenStrForm);
+ }
+ } catch (MetaException e) {
+ LOG.warn("MetaException while cancelling delegation token.",e );
+ } catch (TException e) {
+ LOG.warn("TException while cancelling delegation token.", e);
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
+ }
+
+
}
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
Wed Oct 3 02:32:14 2012
@@ -27,7 +27,6 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -119,7 +118,7 @@ public class InitializeInput {
new
org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
ptn.getParameters(), job.getConfiguration(),
inputJobInfo);
- partInfo.setPartitionValues(createPtnKeyValueMap(table,
ptn));
+
partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table, ptn));
partInfoList.add(partInfo);
}
@@ -140,27 +139,6 @@ public class InitializeInput {
}
- private static Map<String, String> createPtnKeyValueMap(Table table,
Partition ptn) throws IOException {
- List<String> values = ptn.getValues();
- if (values.size() != table.getPartitionKeys().size()) {
- throw new IOException("Partition values in partition inconsistent
with table definition, table "
- + table.getTableName() + " has "
- + table.getPartitionKeys().size()
- + " partition keys, partition has " + values.size() +
"partition values");
- }
-
- Map<String, String> ptnKeyValues = new HashMap<String, String>();
-
- int i = 0;
- for (FieldSchema schema : table.getPartitionKeys()) {
- // CONCERN : the way this mapping goes, the order *needs* to be
preserved for table.getPartitionKeys() and ptn.getValues()
- ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
- i++;
- }
-
- return ptnKeyValues;
- }
-
private static PartInfo extractPartInfo(HCatSchema schema,
StorageDescriptor sd,
Map<String, String> parameters,
Configuration conf,
InputJobInfo inputJobInfo) throws
IOException {
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
Wed Oct 3 02:32:14 2012
@@ -21,7 +21,9 @@ package org.apache.hcatalog.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -48,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -185,4 +188,30 @@ class InternalUtil {
+ " but found " + split.getClass().getName());
}
}
+
+
+ static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn)
+ throws IOException {
+ List<String> values = ptn.getValues();
+ if (values.size() != table.getPartitionKeys().size()) {
+ throw new IOException(
+ "Partition values in partition inconsistent with table
definition, table "
+ + table.getTableName() + " has "
+ + table.getPartitionKeys().size()
+ + " partition keys, partition has " + values.size()
+ + "partition values");
+ }
+
+ Map<String, String> ptnKeyValues = new HashMap<String, String>();
+
+ int i = 0;
+ for (FieldSchema schema : table.getPartitionKeys()) {
+ // CONCERN : the way this mapping goes, the order *needs* to be
+ // preserved for table.getPartitionKeys() and ptn.getValues()
+ ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+ i++;
+ }
+
+ return ptnKeyValues;
+ }
}
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
Wed Oct 3 02:32:14 2012
@@ -155,7 +155,7 @@ public class TestHCatOutputFormat extend
public void publishTest(Job job) throws Exception {
OutputCommitter committer = new FileOutputCommitterContainer(job,
null);
- committer.cleanupJob(job);
+ committer.commitJob(job);
Partition part = client.getPartition(dbName, tblName,
Arrays.asList("p1"));
assertNotNull(part);
Added:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java?rev=1393259&view=auto
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
(added)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
Wed Oct 3 02:32:14 2012
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.NoExitSecurityManager;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatPartitionPublish {
+ private static Configuration mrConf = null;
+ private static FileSystem fs = null;
+ private static MiniMRCluster mrCluster = null;
+ private static boolean isServerRunning = false;
+ private static final int msPort = 20101;
+ private static HiveConf hcatConf;
+ private static HiveMetaStoreClient msc;
+ private static SecurityManager securityManager;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Configuration conf = new Configuration(true);
+ fs = FileSystem.get(conf);
+ System.setProperty("hadoop.log.dir", new File(fs.getWorkingDirectory()
+ .toString(), "/logs").getAbsolutePath());
+ // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+ // to use MiniMRCluster. MAPREDUCE-2350
+ mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ mrConf = mrCluster.createJobConf();
+
+ if (isServerRunning) {
+ return;
+ }
+
+ MetaStoreUtils.startMetaStore(msPort, ShimLoader
+ .getHadoopThriftAuthBridge());
+ isServerRunning = true;
+ securityManager = System.getSecurityManager();
+ System.setSecurityManager(new NoExitSecurityManager());
+
+ hcatConf = new HiveConf(TestHCatPartitionPublish.class);
+ hcatConf.set("hive.metastore.local", "false");
+ hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+ + msPort);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3);
+
+ hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
+ "false");
+ msc = new HiveMetaStoreClient(hcatConf, null);
+ System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+ System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ System.setSecurityManager(securityManager);
+ isServerRunning = false;
+ }
+
+ @Test
+ public void testPartitionPublish() throws Exception {
+ String dbName = "default";
+ String tableName = "testHCatPartitionedTable";
+ createTable(null, tableName);
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value1");
+ partitionMap.put("part0", "p0value1");
+
+ ArrayList<HCatFieldSchema> hcatTableColumns = new
ArrayList<HCatFieldSchema>();
+ for (FieldSchema fs : getTableColumns()) {
+ hcatTableColumns.add(HCatSchemaUtils.getHCatFieldSchema(fs));
+ }
+
+ runMRCreateFail(dbName, tableName, partitionMap, hcatTableColumns);
+ List<String> ptns = msc.listPartitionNames(dbName, tableName,
+ (short) 10);
+ Assert.assertEquals(0, ptns.size());
+ Table table = msc.getTable(dbName, tableName);
+ Assert.assertTrue(table != null);
+ // Also make sure that the directory has been deleted in the table
+ // location.
+ Assert.assertFalse(fs.exists(new Path(table.getSd().getLocation()
+ + "/part1=p1value1/part0=p0value1")));
+ }
+
+ void runMRCreateFail(
+ String dbName, String tableName, Map<String, String> partitionValues,
+ List<HCatFieldSchema> columns) throws Exception {
+
+ Job job = new Job(mrConf, "hcat mapreduce write fail test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(TestHCatPartitionPublish.MapFail.class);
+
+ // input/output settings
+ job.setInputFormatClass(TextInputFormat.class);
+
+ Path path = new Path(fs.getWorkingDirectory(),
+ "mapred/testHCatMapReduceInput");
+ // The write count does not matter, as the map will fail in its first
+ // call.
+ createInputFile(path, 5);
+
+ TextInputFormat.setInputPaths(job, path);
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName,
+ partitionValues);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ HCatOutputFormat.setSchema(job, new HCatSchema(columns));
+
+ boolean success = job.waitForCompletion(true);
+ Assert.assertTrue(success == false);
+ }
+
+ private void createInputFile(Path path, int rowCount) throws IOException {
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ FSDataOutputStream os = fs.create(path);
+ for (int i = 0; i < rowCount; i++) {
+ os.writeChars(i + "\n");
+ }
+ os.close();
+ }
+
+ public static class MapFail extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ {
+ throw new IOException("Exception to mimic job failure.");
+ }
+ }
+ }
+
+ private void createTable(String dbName, String tableName) throws Exception
{
+ String databaseName = (dbName == null) ?
MetaStoreUtils.DEFAULT_DATABASE_NAME
+ : dbName;
+ try {
+ msc.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ } // can fail with NoSuchObjectException
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType("MANAGED_TABLE");
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(getTableColumns());
+ tbl.setPartitionKeys(getPartitionKeys());
+ tbl.setSd(sd);
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT,
+ "1");
+ sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName());
+ sd.setInputFormat(RCFileInputFormat.class.getName());
+ sd.setOutputFormat(RCFileOutputFormat.class.getName());
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tbl.setParameters(tableParams);
+
+ msc.createTable(tbl);
+ }
+
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ // Defining partition names in unsorted order
+ fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+}