Author: rohini
Date: Wed Feb 10 21:37:05 2016
New Revision: 1729745
URL: http://svn.apache.org/viewvc?rev=1729745&view=rev
Log:
PIG-4801: Provide backward compatibility with mapreduce mapred.task settings
(rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1729745&r1=1729744&r2=1729745&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb 10 21:37:05 2016
@@ -91,6 +91,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4801: Provide backward compatibility with mapreduce mapred.task settings
(rohini)
+
PIG-4759: Fix Classresolution_1 e2e failure (rohini)
PIG-4800: EvalFunc.getCacheFiles() fails for different namenode (rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1729745&r1=1729744&r2=1729745&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Wed Feb 10 21:37:05 2016
@@ -563,6 +563,8 @@ public class TezDagBuilder extends TezOp
@SuppressWarnings("deprecation")
Job job = new Job(payloadConf);
payloadConf = (JobConf) job.getConfiguration();
+ //TODO: Investigate. Setting as map writes empty output.
+ //payloadConf.setBoolean(MRConfig.IS_MAP_PROCESSOR,
tezOp.isUseMRMapSettings());
payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1729745&r1=1729744&r2=1729745&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Wed Feb 10 21:37:05 2016
@@ -30,6 +30,8 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.pig.JVMReuseImpl;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
@@ -59,6 +61,7 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
@@ -131,6 +134,22 @@ public class PigProcessor extends Abstra
// To determine front-end in UDFContext
conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID,
getContext().getUniqueIdentifier());
+
+ // For compatibility with mapreduce. Some users use these configs in
their UDF
+ // Copied logic from the tez class -
org.apache.tez.mapreduce.output.MROutput
+ // Currently isMapperOutput is always false. Setting it to true
produces empty output with MROutput
+ boolean isMapperOutput = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
false);
+ TaskAttemptID taskAttemptId =
org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
+
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
+ getContext().getTaskVertexIndex(),
getContext().getApplicationId().getId(),
+ getContext().getTaskIndex(),
getContext().getTaskAttemptNumber(), isMapperOutput);
+ conf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+ conf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+ conf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+ conf.setInt(JobContext.TASK_PARTITION,
+ taskAttemptId.getTaskID().getId());
+ conf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+
conf.set(PigConstants.TASK_INDEX,
Integer.toString(getContext().getTaskIndex()));
UDFContext.getUDFContext().addJobConf(conf);
UDFContext.getUDFContext().deserialize();