Author: khorgath
Date: Mon Mar 3 18:59:25 2014
New Revision: 1573675
URL: http://svn.apache.org/r1573675
Log:
HIVE-6409 : FileOutputCommitterContainer::commitJob() cancels delegation tokens
too early. (Mithun Radhakrishnan via Sushanth Sowmyan)
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1573675&r1=1573674&r2=1573675&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
Mon Mar 3 18:59:25 2014
@@ -215,41 +215,41 @@ class FileOutputCommitterContainer exten
@Override
public void commitJob(JobContext jobContext) throws IOException {
- try {
- if (dynamicPartitioningUsed) {
- discoverPartitions(jobContext);
- // Commit each partition so it gets moved out of the job work
- // dir
- for (JobContext context : contextDiscoveredByPath.values()) {
- new JobConf(context.getConfiguration())
- .getOutputCommitter().commitJob(context);
- }
- }
- if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
- getBaseOutputCommitter().commitJob(
- HCatMapRedUtil.createJobContext(jobContext));
- }
- registerPartitions(jobContext);
- // create _SUCCESS FILE if so requested.
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
- if (getOutputDirMarking(jobContext.getConfiguration())) {
- Path outputPath = new Path(jobInfo.getLocation());
- FileSystem fileSys = outputPath.getFileSystem(jobContext
- .getConfiguration());
- // create a file in the folder to mark it
- if (fileSys.exists(outputPath)) {
- Path filePath = new Path(outputPath,
- SUCCEEDED_FILE_NAME);
- if (!fileSys.exists(filePath)) { // may have been
- // created by
- // baseCommitter.commitJob()
- fileSys.create(filePath).close();
- }
+ if (dynamicPartitioningUsed) {
+ discoverPartitions(jobContext);
+ // Commit each partition so it gets moved out of the job work
+ // dir
+ for (JobContext context : contextDiscoveredByPath.values()) {
+ new JobConf(context.getConfiguration())
+ .getOutputCommitter().commitJob(context);
+ }
+ }
+ if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+ getBaseOutputCommitter().commitJob(
+ HCatMapRedUtil.createJobContext(jobContext));
+ }
+ registerPartitions(jobContext);
+ // create _SUCCESS FILE if so requested.
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+ if (getOutputDirMarking(jobContext.getConfiguration())) {
+ Path outputPath = new Path(jobInfo.getLocation());
+ FileSystem fileSys = outputPath.getFileSystem(jobContext
+ .getConfiguration());
+ // create a file in the folder to mark it
+ if (fileSys.exists(outputPath)) {
+ Path filePath = new Path(outputPath,
+ SUCCEEDED_FILE_NAME);
+ if (!fileSys.exists(filePath)) { // may have been
+ // created by
+ // baseCommitter.commitJob()
+ fileSys.create(filePath).close();
}
}
- } finally {
- cancelDelegationTokens(jobContext);
}
+
+ // Commit has succeeded (since no exceptions have been thrown.)
+ // Safe to cancel delegation tokens now.
+ cancelDelegationTokens(jobContext);
}
@Override
@@ -970,7 +970,7 @@ class FileOutputCommitterContainer exten
}
private void cancelDelegationTokens(JobContext context) throws IOException{
- LOG.info("Cancelling deletgation token for the job.");
+ LOG.info("Cancelling delegation token for the job.");
HiveMetaStoreClient client = null;
try {
HiveConf hiveConf = HCatUtil