Author: xuefu
Date: Fri Nov  4 03:46:06 2016
New Revision: 1767986

URL: http://svn.apache.org/viewvc?rev=1767986&view=rev
Log:
PIG-5051: Initialize PigContants.TASK_INDEX in spark mode correctly (Liyun via 
Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Fri Nov  4 03:46:06 2016
@@ -94,7 +94,6 @@ public class SparkUtil {
         jobConf.set(PigConstants.LOCAL_CODE_DIR,
                 System.getProperty("java.io.tmpdir"));
         jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
-        jobConf.set(PigConstants.TASK_INDEX, "0");
 
         LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
                 physicalPlan, POStore.class);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Fri Nov  4 03:46:06 2016
@@ -25,7 +25,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.PigConstants;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
 import org.apache.pig.impl.util.UDFContext;
@@ -53,6 +55,7 @@ import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.SparkContext;
+import org.apache.spark.TaskContext;
 import org.apache.spark.rdd.RDD;
 
 import com.google.common.collect.Lists;
@@ -157,6 +160,7 @@ public class LoadConverter implements RD
         private SparkCounters sparkCounters;
         private boolean disableCounter;
         private SparkEngineConf sparkEngineConf;
+        private boolean initialized;
 
         public ToTupleFunction(SparkEngineConf sparkEngineConf){
                this.sparkEngineConf = sparkEngineConf;
@@ -165,6 +169,11 @@ public class LoadConverter implements RD
 
         @Override
         public Tuple apply(Tuple2<Text, Tuple> v1) {
+            if (!initialized) {
+                long partitionId = TaskContext.get().partitionId();
+                
PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, 
Long.toString(partitionId));
+                initialized = true;
+            }
             if (sparkCounters != null && disableCounter == false) {
                 sparkCounters.increment(counterGroupName, counterName, 1L);
             }

Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Nov  4 
03:46:06 2016
@@ -3229,8 +3229,6 @@ public class TestBuiltin {
             assertEquals(iter.next().get(1), "1-3");
             assertEquals(iter.next().get(1), "1-4");
         } else{
-            //because we set PigConstants.TASK_INDEX as 0 in 
ForEachConverter#ForEachFunction#initializeJobConf
-            //UniqueID.exec() will output like 0-*
             //there will be 2 InputSplits when mapred.max.split.size is 
10(byte) for the testUniqueID.txt(20 bytes)
             //Split0:
             //            1\n
@@ -3252,10 +3250,10 @@ public class TestBuiltin {
             assertEquals(iter.next().get(1), "0-3");
             assertEquals(iter.next().get(1), "0-4");
             assertEquals(iter.next().get(1), "0-5");
-            assertEquals(iter.next().get(1), "0-0");
-            assertEquals(iter.next().get(1), "0-1");
-            assertEquals(iter.next().get(1), "0-2");
-            assertEquals(iter.next().get(1), "0-3");
+            assertEquals(iter.next().get(1), "1-0");
+            assertEquals(iter.next().get(1), "1-1");
+            assertEquals(iter.next().get(1), "1-2");
+            assertEquals(iter.next().get(1), "1-3");
         }
         Util.deleteFile(cluster, inputFileName);
     }


Reply via email to