Author: zly
Date: Wed Mar  1 08:56:54 2017
New Revision: 1784882

URL: http://svn.apache.org/viewvc?rev=1784882&view=rev
Log:
PIG-5154:Fix GFCross related issues after merging from trunk to spark(Adam via 
Liyun)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1784882&r1=1784881&r2=1784882&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Wed Mar  1 08:56:54 2017
@@ -25,28 +25,26 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.PigConstants;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.spark.SparkCounters;
-import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
-import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import scala.Function1;
 import scala.Tuple2;
 import scala.runtime.AbstractFunction1;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConstants;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark;
 import org.apache.pig.data.Tuple;
@@ -54,6 +52,10 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import org.apache.spark.SparkContext;
 import org.apache.spark.TaskContext;
 import org.apache.spark.rdd.RDD;
@@ -170,8 +172,9 @@ public class LoadConverter implements RD
         public Tuple apply(Tuple2<Text, Tuple> v1) {
             if (!initialized) {
                 long partitionId = TaskContext.get().partitionId();
-                
PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, 
Long.toString(partitionId));
-
+                Configuration jobConf = PigMapReduce.sJobConfInternal.get();
+                jobConf.set(PigConstants.TASK_INDEX, 
Long.toString(partitionId));
+                jobConf.set(MRConfiguration.TASK_ID, 
Long.toString(partitionId));
                 initialized = true;
             }
             if (sparkCounters != null && disableCounter == false) {


Reply via email to