Author: toffer
Date: Mon Oct 1 03:28:53 2012
New Revision: 1392186
URL: http://svn.apache.org/viewvc?rev=1392186&view=rev
Log:
backported for trunk: HCAT-513 Data Store onto HCatalog table fails for
dynamic partitioning as the temporary directory gets deleted by the completed
map tasks (amalakar via toffer)
Modified:
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1392186&r1=1392185&r2=1392186&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Mon Oct 1 03:28:53 2012
@@ -65,6 +65,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-513 Data Store onto HCatalog table fails for dynamic partitioning as
the temporary directory gets deleted by the completed map tasks (amalakar via
toffer)
+
HCAT-490 HCatStorer() throws error when the same partition key is present in
records in more than one tasks running as part of the same job (amalakar via
toffer)
HCAT-494 MultiOutputFormat in 0.23 fails to setAliasConf() correctly.
(mithun via toffer)
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1392186&r1=1392185&r2=1392186&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
Mon Oct 1 03:28:53 2012
@@ -35,9 +35,11 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.HCatMapRedUtil;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
@@ -160,6 +162,10 @@ class FileOutputCommitterContainer exten
try {
if (dynamicPartitioningUsed) {
discoverPartitions(jobContext);
+ // Commit each partition so it gets moved out of the job work
dir
+ for (JobContext context : contextDiscoveredByPath.values()) {
+ new
JobConf(context.getConfiguration()).getOutputCommitter().commitJob(context);
+ }
}
if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
getBaseOutputCommitter().commitJob(
@@ -475,8 +481,13 @@ class FileOutputCommitterContainer exten
LinkedHashMap<String, String> fullPartSpec = new
LinkedHashMap<String, String>();
Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec);
- JobContext currContext =
HCatHadoopShims.Instance.get().createJobContext(context.getConfiguration(),context.getJobID());
- HCatOutputFormat.configureOutputStorageHandler(context,
jobInfo, fullPartSpec);
+ JobConf jobConf = (JobConf)context.getConfiguration();
+ JobContext currContext = HCatMapRedUtil.createJobContext(
+ jobConf,
+ context.getJobID(),
+
InternalUtil.createReporter(HCatMapRedUtil.createTaskAttemptContext(jobConf,
+
HCatHadoopShims.Instance.get().createTaskAttemptID())));
+
HCatOutputFormat.configureOutputStorageHandler(currContext, jobInfo,
fullPartSpec);
contextDiscoveredByPath.put(st.getPath().toString(),currContext);
}
}
@@ -616,7 +627,7 @@ class FileOutputCommitterContainer exten
* 0.9 versions. The cleanupJob method is deprecated but, Pig 0.8 and
* 0.9 call cleanupJob method. Hence this method is used by both abortJob
* and cleanupJob methods.
- * @param JobContext The job context.
+ * @param context The job context.
* @throws java.io.IOException
*/
private void internalAbortJob(JobContext context, State state) throws
IOException{
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1392186&r1=1392185&r2=1392186&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
Mon Oct 1 03:28:53 2012
@@ -146,8 +146,6 @@ class FileRecordWriterContainer extends
if (baseOutputCommitter.needsTaskCommit(currContext)) {
baseOutputCommitter.commitTask(currContext);
}
- org.apache.hadoop.mapred.JobContext currJobContext =
HCatMapRedUtil.createJobContext(currContext);
- baseOutputCommitter.commitJob(currJobContext);
}
} else {
getBaseRecordWriter().close(reporter);
@@ -205,7 +203,7 @@ class FileRecordWriterContainer extends
//As it can throw a FileAlreadyExistsException when more than
one mapper is writing to a partition
//See HCATALOG-490, also to avoid contacting the namenode for
each new FileOutputFormat instance
//In general this should be ok for most FileOutputFormat
implementations
- //but may become an issue for cases when the method is used to
perform other setup tasks
+ //but may become an issue for cases when the method is used to
perform other setup tasks
//setupJob()
baseOutputCommitter.setupJob(currJobContext);