Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
 Mon May 29 15:00:39 2017
@@ -122,6 +122,15 @@ public class POMergeCogroup extends Phys
         this.endOfRecordMark = endOfRecordMark;
     }
 
+    //For Spark
+    private transient boolean endOfInput = false;
+    public boolean isEndOfInput() {
+        return endOfInput;
+    }
+    public void setEndOfInput (boolean isEndOfInput) {
+        endOfInput = isEndOfInput;
+    }
+
     @Override
     public Result getNextTuple() throws ExecException {
 
@@ -145,7 +154,7 @@ public class POMergeCogroup extends Phys
                 break;
 
             case POStatus.STATUS_EOP:
-                if(!this.parentPlan.endOfAllInput)
+                if(!(this.parentPlan.endOfAllInput || isEndOfInput()))
                     return baseInp;
 
                 if(lastTime)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Mon May 29 15:00:39 2017
@@ -131,6 +131,24 @@ public class POMergeJoin extends Physica
 
     private byte endOfRecordMark = POStatus.STATUS_NULL;
 
+    // Only for Spark
+    // If current operator reaches at its end, flag endOfInput is set as true.
+    // The old flag parentPlan.endOfAllInput doesn't work in spark mode, 
because it is shared
+    // between operators in the same plan, so it could be set by preceding 
operators even
+    // current operator does not reach at its end. (see PIG-4876)
+    private transient boolean endOfInput = false;
+    public boolean isEndOfInput() {
+        return endOfInput;
+    }
+    public void setEndOfInput (boolean isEndOfInput) {
+        endOfInput = isEndOfInput;
+    }
+
+    // Only for spark.
+    // it means that current operator reaches at its end and the last left 
input was
+    // added into 'leftTuples', ready for join.
+    private boolean leftInputConsumedInSpark = false;
+
     // This serves as the default TupleFactory
     private transient TupleFactory mTupleFactory;
 
@@ -352,7 +370,7 @@ public class POMergeJoin extends Physica
 
                     }
                     else if(cmpval > 0){    // We got ahead on right side. 
Store currently read right tuple.
-                        if(!this.parentPlan.endOfAllInput){
+                        if(!(this.parentPlan.endOfAllInput|| 
leftInputConsumedInSpark)){
                             prevRightKey = rightKey;
                             prevRightInp = rightInp;
                             // There cant be any more join on this key.
@@ -413,11 +431,14 @@ public class POMergeJoin extends Physica
             }
 
         case POStatus.STATUS_EOP:
-            if(this.parentPlan.endOfAllInput){
+            if(this.parentPlan.endOfAllInput || isEndOfInput()){
                 // We hit the end on left input.
                 // Tuples in bag may still possibly join with right side.
                 curJoinKey = prevLeftKey;
                 curLeftKey = null;
+                if (isEndOfInput()) {
+                    leftInputConsumedInSpark = true;
+                }
                 break;
             }
             else    // Fetch next left input.
@@ -427,7 +448,9 @@ public class POMergeJoin extends Physica
             return curLeftInp;
         }
 
-        if((null != prevRightKey) && !this.parentPlan.endOfAllInput && 
((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){
+        if((null != prevRightKey)
+                && !(this.parentPlan.endOfAllInput || leftInputConsumedInSpark)
+                && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){
 
             // This will happen when we accumulated inputs on left side and 
moved on, but are still behind the right side
             // In that case, throw away the tuples accumulated till now and 
add the one we read in this function call.
@@ -509,7 +532,7 @@ public class POMergeJoin extends Physica
                 leftTuples.add((Tuple)curLeftInp.result);
                 prevLeftInp = curLeftInp;
                 prevLeftKey = curLeftKey;
-                if(this.parentPlan.endOfAllInput){  // This is end of all 
input and this is last time we will read right input.
+                if(this.parentPlan.endOfAllInput || leftInputConsumedInSpark){ 
 // This is end of all input and this is last time we will read right input.
                     // Right loader in this case wouldn't get a chance to 
close input stream. So, we close it ourself.
                     try {
                         ((IndexableLoadFunc)rightLoader).close();
@@ -719,4 +742,8 @@ public class POMergeJoin extends Physica
     public LOJoin.JOINTYPE getJoinType() {
         return joinType;
     }
+
+    public POLocalRearrange[] getLRs() {
+        return LRs;
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 Mon May 29 15:00:39 2017
@@ -29,45 +29,45 @@ import org.apache.pig.impl.plan.VisitorE
 
 public class POPoissonSample extends PhysicalOperator {
 
-    private static final long serialVersionUID = 1L;
+    protected static final long serialVersionUID = 1L;
 
     // 17 is not a magic number. It can be obtained by using a poisson
     // cumulative distribution function with the mean set to 10 (empirically,
     // minimum number of samples) and the confidence set to 95%
     public static final int DEFAULT_SAMPLE_RATE = 17;
 
-    private int sampleRate = 0;
+    protected int sampleRate = 0;
 
-    private float heapPerc = 0f;
+    protected float heapPerc = 0f;
 
-    private Long totalMemory;
+    protected Long totalMemory;
 
-    private transient boolean initialized;
+    protected transient boolean initialized;
 
     // num of rows skipped so far
-    private transient int numSkipped;
+    protected transient int numSkipped;
 
     // num of rows sampled so far
-    private transient int numRowsSampled;
+    protected transient int numRowsSampled;
 
     // average size of tuple in memory, for tuples sampled
-    private transient long avgTupleMemSz;
+    protected transient long avgTupleMemSz;
 
     // current row number
-    private transient long rowNum;
+    protected transient long rowNum;
 
     // number of tuples to skip after each sample
-    private transient long skipInterval;
+    protected transient long skipInterval;
 
     // bytes in input to skip after every sample.
     // divide this by avgTupleMemSize to get skipInterval
-    private transient long memToSkipPerSample;
+    protected transient long memToSkipPerSample;
 
     // has the special row with row number information been returned
-    private transient boolean numRowSplTupleReturned;
+    protected transient boolean numRowSplTupleReturned;
 
     // new Sample result
-    private transient Result newSample;
+    protected transient Result newSample;
 
     public POPoissonSample(OperatorKey k, int rp, int sr, float hp, long tm) {
         super(k, rp, null);
@@ -204,7 +204,7 @@ public class POPoissonSample extends Phy
      * and recalculate skipInterval
      * @param t - tuple
      */
-    private void updateSkipInterval(Tuple t) {
+    protected void updateSkipInterval(Tuple t) {
         avgTupleMemSz =
             ((avgTupleMemSz*numRowsSampled) + 
t.getMemorySize())/(numRowsSampled + 1);
         skipInterval = memToSkipPerSample/avgTupleMemSz;
@@ -224,7 +224,7 @@ public class POPoissonSample extends Phy
      * @return - Tuple appended with special marker string column, num-rows 
column
      * @throws ExecException
      */
-    private Result createNumRowTuple(Tuple sample) throws ExecException {
+    protected Result createNumRowTuple(Tuple sample) throws ExecException {
         int sz = (sample == null) ? 0 : sample.size();
         Tuple t = mTupleFactory.newTuple(sz + 2);
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 Mon May 29 15:00:39 2017
@@ -347,6 +347,10 @@ public class POSort extends PhysicalOper
         mSortFunc = sortFunc;
     }
 
+    public Comparator<Tuple> getMComparator() {
+        return mComparator;
+    }
+
     public List<Boolean> getMAscCols() {
         return mAscCols;
     }

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+
+import com.google.common.collect.Lists;
+
+public class JobGraphBuilder extends SparkOpPlanVisitor {
+
+    private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
+    public static final int NULLPART_JOB_ID = -1;
+
+    private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap = 
null;
+    private SparkPigStats sparkStats = null;
+    private JavaSparkContext sparkContext = null;
+    private JobMetricsListener jobMetricsListener = null;
+    private String jobGroupID = null;
+    private Set<Integer> seenJobIDs = new HashSet<Integer>();
+    private SparkOperPlan sparkPlan = null;
+    private Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new 
HashMap<OperatorKey, RDD<Tuple>>();
+    private Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new 
HashMap<OperatorKey, RDD<Tuple>>();
+    private JobConf jobConf = null;
+    private PigContext pc;
+
+    public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends 
PhysicalOperator>, RDDConverter> convertMap,
+                           SparkPigStats sparkStats, JavaSparkContext 
sparkContext, JobMetricsListener
+            jobMetricsListener, String jobGroupID, JobConf jobConf, PigContext 
pc) {
+        super(plan, new DependencyOrderWalker<SparkOperator, 
SparkOperPlan>(plan, true));
+        this.sparkPlan = plan;
+        this.convertMap = convertMap;
+        this.sparkStats = sparkStats;
+        this.sparkContext = sparkContext;
+        this.jobMetricsListener = jobMetricsListener;
+        this.jobGroupID = jobGroupID;
+        this.jobConf = jobConf;
+        this.pc = pc;
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        new PhyPlanSetter(sparkOp.physicalPlan).visit();
+        try {
+            setReplicationForMergeJoin(sparkOp.physicalPlan);
+            sparkOperToRDD(sparkOp);
+            finishUDFs(sparkOp.physicalPlan);
+        } catch (Exception e) {
+            throw new VisitorException("fail to get the rdds of this spark 
operator: ", e);
+        }
+    }
+
+    private void setReplicationForMergeJoin(PhysicalPlan plan) throws 
IOException {
+        List<Path> filesForMoreReplication = new ArrayList<>();
+        List<POMergeJoin> poMergeJoins = PlanHelper.getPhysicalOperators(plan, 
POMergeJoin.class);
+        if (poMergeJoins.size() > 0) {
+            for (POMergeJoin poMergeJoin : poMergeJoins) {
+                String idxFileName = poMergeJoin.getIndexFile();
+                if (idxFileName != null) {
+                    filesForMoreReplication.add(new Path(idxFileName));
+                }
+                // in spark mode, set as null so that PoMergeJoin won't use 
hadoop distributed cache
+                // see POMergeJoin.seekInRightStream()
+                poMergeJoin.setIndexFile(null);
+            }
+        }
+
+        setReplicationForFiles(filesForMoreReplication);
+    }
+
+    private void setReplicationForFiles(List<Path> files) throws IOException {
+        FileSystem fs = FileSystem.get(this.jobConf);
+        short replication = (short) 
jobConf.getInt(MRConfiguration.SUMIT_REPLICATION, 10);
+        for (int i = 0; i < files.size(); i++) {
+            fs.setReplication(files.get(i), replication);
+        }
+    }
+
+    // Calling EvalFunc.finish()
+    private void finishUDFs(PhysicalPlan physicalPlan) throws VisitorException 
{
+        UDFFinishVisitor finisher = new UDFFinishVisitor(physicalPlan,
+                new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
+                        physicalPlan));
+        try {
+            finisher.visit();
+        } catch (VisitorException e) {
+            int errCode = 2121;
+            String msg = "Error while calling finish method on UDFs.";
+            throw new VisitorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    private void sparkOperToRDD(SparkOperator sparkOperator) throws 
InterruptedException, VisitorException, JobCreationException, ExecException {
+        List<SparkOperator> predecessors = sparkPlan
+                .getPredecessors(sparkOperator);
+        Set<OperatorKey> predecessorOfPreviousSparkOp = new 
HashSet<OperatorKey>();
+        if (predecessors != null) {
+            for (SparkOperator pred : predecessors) {
+                predecessorOfPreviousSparkOp.add(pred.getOperatorKey());
+            }
+        }
+
+        boolean isFail = false;
+        Exception exception = null;
+        if (sparkOperator instanceof NativeSparkOperator) {
+            ((NativeSparkOperator) sparkOperator).runJob();
+        } else {
+            List<PhysicalOperator> leafPOs = 
sparkOperator.physicalPlan.getLeaves();
+
+            //One SparkOperator may have multiple leaves(POStores) after 
multiquery feature is enabled
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("sparkOperator.physicalPlan have " + 
sparkOperator.physicalPlan.getLeaves().size() + " leaves");
+            }
+            for (PhysicalOperator leafPO : leafPOs) {
+                try {
+                    physicalToRDD(sparkOperator, sparkOperator.physicalPlan, 
leafPO,
+                            predecessorOfPreviousSparkOp);
+                    sparkOpRdds.put(sparkOperator.getOperatorKey(),
+                            physicalOpRdds.get(leafPO.getOperatorKey()));
+                } catch (Exception e) {
+                    LOG.error("throw exception in sparkOperToRDD: ", e);
+                    exception = e;
+                    isFail = true;
+                    boolean stopOnFailure = Boolean.valueOf(pc
+                            .getProperties().getProperty("stop.on.failure",
+                                    "false"));
+                    if (stopOnFailure) {
+                        int errCode = 6017;
+                        throw new ExecException(e.getMessage(), errCode,
+                                PigException.REMOTE_ENVIRONMENT);
+                    }
+                }
+            }
+
+
+            List<POStore> poStores = PlanHelper.getPhysicalOperators(
+                    sparkOperator.physicalPlan, POStore.class);
+            Collections.sort(poStores);
+            if (poStores.size() > 0) {
+                int i = 0;
+                if (!isFail) {
+                    List<Integer> jobIDs = getJobIDs(seenJobIDs);
+                    for (POStore poStore : poStores) {
+                        if (jobIDs.size() == 0) {
+                            /**
+                             * Spark internally misses information about its 
jobs that mapped 0 partitions.
+                             * Although these have valid jobIds, Spark itself 
is unable to tell anything about them.
+                             * If the store rdd had 0 partitions we return a 
dummy success stat with jobId =
+                             * NULLPART_JOB_ID, in any other cases we throw 
exception if no new jobId was seen.
+                             */
+                            if 
(physicalOpRdds.get(poStore.getOperatorKey()).partitions().length == 0) {
+                                sparkStats.addJobStats(poStore, sparkOperator, 
NULLPART_JOB_ID, null, sparkContext);
+                                return;
+                            } else {
+                                throw new RuntimeException("Expected at least 
one unseen jobID "
+                                        + " in this call to getJobIdsForGroup, 
but got 0");
+                            }
+                        }
+                        SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), 
poStore, sparkOperator,
+                                jobMetricsListener, sparkContext, sparkStats);
+                    }
+                } else {
+                    for (POStore poStore : poStores) {
+                        String failJobID = 
sparkOperator.name().concat("_fail");
+                        SparkStatsUtil.addFailJobStats(failJobID, poStore, 
sparkOperator, sparkStats, exception);
+                    }
+                }
+            }
+        }
+    }
+
+    private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan plan,
+                               PhysicalOperator physicalOperator,
+                               Set<OperatorKey> predsFromPreviousSparkOper)
+            throws IOException {
+        RDD<Tuple> nextRDD = null;
+        List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = 
getPredecessors(plan, physicalOperator);
+        LinkedHashSet<OperatorKey> operatorKeysOfAllPreds = new 
LinkedHashSet<OperatorKey>();
+        addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, 
operatorKeysOfAllPreds);
+        if (predecessorsOfCurrentPhysicalOp != null) {
+            for (PhysicalOperator predecessor : 
predecessorsOfCurrentPhysicalOp) {
+                physicalToRDD(sparkOperator, plan, predecessor, 
predsFromPreviousSparkOper);
+                operatorKeysOfAllPreds.add(predecessor.getOperatorKey());
+            }
+
+        } else {
+            if (predsFromPreviousSparkOper != null
+                    && predsFromPreviousSparkOper.size() > 0) {
+                for (OperatorKey predFromPreviousSparkOper : 
predsFromPreviousSparkOper) {
+                    operatorKeysOfAllPreds.add(predFromPreviousSparkOper);
+                }
+            }
+        }
+
+        if (physicalOperator instanceof POSplit) {
+            List<PhysicalPlan> successorPlans = ((POSplit) 
physicalOperator).getPlans();
+            for (PhysicalPlan successorPlan : successorPlans) {
+                List<PhysicalOperator> leavesOfSuccessPlan = 
successorPlan.getLeaves();
+                if (leavesOfSuccessPlan.size() != 1) {
+                    throw new RuntimeException("the size of the leaves of 
successorPlan should be 1");
+                }
+                PhysicalOperator leafOfSuccessPlan = 
leavesOfSuccessPlan.get(0);
+                physicalToRDD(sparkOperator, successorPlan, leafOfSuccessPlan, 
operatorKeysOfAllPreds);
+            }
+        } else {
+            RDDConverter converter = 
convertMap.get(physicalOperator.getClass());
+            if (converter == null) {
+                throw new IllegalArgumentException(
+                        "Pig on Spark does not support Physical Operator: " + 
physicalOperator);
+            }
+
+            LOG.info("Converting operator "
+                    + physicalOperator.getClass().getSimpleName() + " "
+                    + physicalOperator);
+            List<RDD<Tuple>> allPredRDDs = 
sortPredecessorRDDs(operatorKeysOfAllPreds);
+
+            if (converter instanceof FRJoinConverter) {
+                setReplicatedInputs(physicalOperator, (FRJoinConverter) 
converter);
+            }
+
+            if (sparkOperator.isSkewedJoin() && converter instanceof 
SkewedJoinConverter) {
+                SkewedJoinConverter skewedJoinConverter = 
(SkewedJoinConverter) converter;
+                
skewedJoinConverter.setSkewedJoinPartitionFile(sparkOperator.getSkewedJoinPartitionFile());
+            }
+            adjustRuntimeParallelismForSkewedJoin(physicalOperator, 
sparkOperator, allPredRDDs);
+            nextRDD = converter.convert(allPredRDDs, physicalOperator);
+
+            if (nextRDD == null) {
+                throw new IllegalArgumentException(
+                        "RDD should not be null after PhysicalOperator: "
+                                + physicalOperator);
+            }
+
+            physicalOpRdds.put(physicalOperator.getOperatorKey(), nextRDD);
+        }
+    }
+
+    private void setReplicatedInputs(PhysicalOperator physicalOperator, 
FRJoinConverter converter) {
+        Set<String> replicatedInputs = new HashSet<>();
+        for (PhysicalOperator operator : physicalOperator.getInputs()) {
+            if (operator instanceof POBroadcastSpark) {
+                replicatedInputs.add(((POBroadcastSpark) 
operator).getBroadcastedVariableName());
+            }
+        }
+        converter.setReplicatedInputs(replicatedInputs);
+    }
+
+    private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, 
PhysicalOperator op) {
+        List preds = null;
+        if (!(op instanceof POJoinGroupSpark)) {
+            preds = plan.getPredecessors(op);
+            if (preds != null && preds.size() > 1 && !(op instanceof 
POSkewedJoin)) {
+                Collections.sort(preds);
+            }
+        } else {
+            //For POJoinGroupSpark, we could not use plan.getPredecessors(op)+ 
sort to get
+            //the predecessors with correct order, more detail see 
JoinOptimizerSpark#restructSparkOp
+            preds = ((POJoinGroupSpark) op).getPredecessors();
+        }
+        return preds;
+    }
+
+    //get all rdds of predecessors sorted by the OperatorKey
+    private List<RDD<Tuple>> sortPredecessorRDDs(LinkedHashSet <OperatorKey> 
operatorKeysOfAllPreds) {
+        List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+        for (OperatorKey operatorKeyOfAllPred : operatorKeysOfAllPreds) {
+            predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred));
+        }
+        return predecessorRDDs;
+    }
+
+    //deal special cases containing operators with multiple predecessors when 
multiquery is enabled to get the predecessors of specified
+    // physicalOp in previous SparkOp(see PIG-4675)
+    private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator, 
PhysicalOperator physicalOperator, Set<OperatorKey> operatorKeysOfPredecessors) 
{
+        // the relationship is stored in 
sparkOperator.getMultiQueryOptimizeConnectionItem()
+        List<OperatorKey> predOperatorKeys = 
sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey());
+        if (predOperatorKeys != null) {
+            for (OperatorKey predOperator : predOperatorKeys) {
+                LOG.debug(String.format("add predecessor(OperatorKey:%s) for 
OperatorKey:%s", predOperator, physicalOperator.getOperatorKey()));
+                operatorKeysOfPredecessors.add(predOperator);
+            }
+        }
+    }
+
+    /**
+     * In Spark, currently only async actions return job id. There is no async
+     * equivalent of actions like saveAsNewAPIHadoopFile()
+     * <p/>
+     * The only other way to get a job id is to register a "job group ID" with
+     * the spark context and request all job ids corresponding to that job 
group
+     * via getJobIdsForGroup.
+     * <p/>
+     * However getJobIdsForGroup does not guarantee the order of the elements 
in
+     * it's result.
+     * <p/>
+     * This method simply returns the previously unseen job ids.
+     *
+     * @param seenJobIDs job ids in the job group that are already seen
+     * @return Spark job ids not seen before
+     */
+    private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+        Set<Integer> groupjobIDs = new HashSet<Integer>(
+                Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
+                        .getJobIdsForGroup(jobGroupID))));
+        groupjobIDs.removeAll(seenJobIDs);
+        List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
+        seenJobIDs.addAll(unseenJobIDs);
+        return unseenJobIDs;
+    }
+
+
+    /**
+     * if the parallelism of skewed join is NOT specified by user in the 
script when sampling,
+     * set a default parallelism for sampling
+     *
+     * @param physicalOperator
+     * @param sparkOperator
+     * @param allPredRDDs
+     * @throws VisitorException
+     */
+    private void adjustRuntimeParallelismForSkewedJoin(PhysicalOperator 
physicalOperator,
+                                                       SparkOperator 
sparkOperator,
+                                                       List<RDD<Tuple>> 
allPredRDDs) throws VisitorException {
+        // We need to calculate the final number of reducers of the next job 
(skew-join)
+        // adjust parallelism of ConstantExpression
+        if (sparkOperator.isSampler() && 
sparkPlan.getSuccessors(sparkOperator) != null
+                && physicalOperator instanceof POPoissonSampleSpark) {
+            // set the runtime #reducer of the next job as the #partition
+
+            int defaultParallelism = 
SparkPigContext.get().getParallelism(allPredRDDs, physicalOperator);
+
+            ParallelConstantVisitor visitor =
+                    new ParallelConstantVisitor(sparkOperator.physicalPlan, 
defaultParallelism);
+            visitor.visit();
+        }
+    }
+
+    /**
+     * here, we don't reuse MR/Tez's ParallelConstantVisitor
+     * To automatic adjust reducer parallelism for skewed join, we only adjust 
the
+     * ConstantExpression operator after POPoissionSampleSpark operator
+     */
+    private static class ParallelConstantVisitor extends PhyPlanVisitor {
+
+        private int rp;
+        private boolean replaced = false;
+        private boolean isAfterSampleOperator = false;
+
+        public ParallelConstantVisitor(PhysicalPlan plan, int rp) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.rp = rp;
+        }
+
+        @Override
+        public void visitConstant(ConstantExpression cnst) throws 
VisitorException {
+            if (isAfterSampleOperator && cnst.getRequestedParallelism() == -1) 
{
+                Object obj = cnst.getValue();
+                if (obj instanceof Integer) {
+                    if (replaced) {
+                        // sample job should have only one ConstantExpression
+                        throw new VisitorException("Invalid reduce plan: more 
" +
+                                "than one ConstantExpression found in sampling 
job");
+                    }
+                    cnst.setValue(rp);
+                    cnst.setRequestedParallelism(rp);
+                    replaced = true;
+                }
+            }
+        }
+
+        @Override
+        public void visitPoissonSample(POPoissonSample po) {
+            isAfterSampleOperator = true;
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerBlockUpdated;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorAdded;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class JobMetricsListener implements SparkListener {
+
+    private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
+
+    private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+    private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+    private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = 
Maps.newHashMap();
+    private final Set<Integer> finishedJobIds = Sets.newHashSet();
+
+    @Override
+    public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+//        uncomment and remove the code onTaskEnd until we fix PIG-5157. It is 
better to update taskMetrics of stage when stage completes
+//        if we update taskMetrics in onTaskEnd(), it consumes lot of memory.
+//        int stageId = stageCompleted.stageInfo().stageId();
+//        int stageAttemptId = stageCompleted.stageInfo().attemptId();
+//        String stageIdentifier = stageId + "_" + stageAttemptId;
+//        Integer jobId = stageIdToJobId.get(stageId);
+//        if (jobId == null) {
+//            LOG.warn("Cannot find job id for stage[" + stageId + "].");
+//        } else {
+//            Map<String, List<TaskMetrics>> jobMetrics = 
allJobMetrics.get(jobId);
+//            if (jobMetrics == null) {
+//                jobMetrics = Maps.newHashMap();
+//                allJobMetrics.put(jobId, jobMetrics);
+//            }
+//            List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+//            if (stageMetrics == null) {
+//                stageMetrics = Lists.newLinkedList();
+//                jobMetrics.put(stageIdentifier, stageMetrics);
+//            }
+//            // uncomment until we fix PIG-5157. after we upgrade to spark2.0 
StageInfo().taskMetrics() api is available
+//            // stageMetrics.add(stageCompleted.stageInfo().taskMetrics());
+//        }
+    }
+
+    @Override
+    public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+    }
+
+    @Override
+    public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+    }
+
+    @Override
+    public void onTaskGettingResult(SparkListenerTaskGettingResult 
taskGettingResult) {
+
+    }
+
+    @Override
+    public void onExecutorRemoved(SparkListenerExecutorRemoved 
executorRemoved) {
+
+    }
+
+    @Override
+    public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
+
+    }
+
+    @Override
+    public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated){
+
+    }
+
+    @Override
+    public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+        int stageId = taskEnd.stageId();
+        int stageAttemptId = taskEnd.stageAttemptId();
+        String stageIdentifier = stageId + "_" + stageAttemptId;
+        Integer jobId = stageIdToJobId.get(stageId);
+        if (jobId == null) {
+            LOG.warn("Cannot find job id for stage[" + stageId + "].");
+        } else {
+            Map<String, List<TaskMetrics>> jobMetrics = 
allJobMetrics.get(jobId);
+            if (jobMetrics == null) {
+                jobMetrics = Maps.newHashMap();
+                allJobMetrics.put(jobId, jobMetrics);
+            }
+            List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+            if (stageMetrics == null) {
+                stageMetrics = Lists.newLinkedList();
+                jobMetrics.put(stageIdentifier, stageMetrics);
+            }
+            stageMetrics.add(taskEnd.taskMetrics());
+        }
+    }
+
+    @Override
+    public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+        int jobId = jobStart.jobId();
+        int size = jobStart.stageIds().size();
+        int[] intStageIds = new int[size];
+        for (int i = 0; i < size; i++) {
+            Integer stageId = (Integer) jobStart.stageIds().apply(i);
+            intStageIds[i] = stageId;
+            stageIdToJobId.put(stageId, jobId);
+        }
+        jobIdToStageId.put(jobId, intStageIds);
+    }
+
+    @Override
+    public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+        finishedJobIds.add(jobEnd.jobId());
+        notify();
+    }
+
+    @Override
+    public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate 
environmentUpdate) {
+
+    }
+
+    @Override
+    public void onBlockManagerAdded(SparkListenerBlockManagerAdded 
blockManagerAdded) {
+
+    }
+
+    @Override
+    public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved 
blockManagerRemoved) {
+
+    }
+
+    @Override
+    public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+    }
+
+    @Override
+    public void onApplicationStart(SparkListenerApplicationStart 
applicationStart) {
+
+    }
+
+    @Override
+    public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+    }
+
+    @Override
+    public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate 
executorMetricsUpdate) {
+
+    }
+
+
+    public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) 
{
+        return allJobMetrics.get(jobId);
+    }
+
+    public synchronized boolean waitForJobToEnd(int jobId) throws 
InterruptedException {
+        if (finishedJobIds.contains(jobId)) {
+            finishedJobIds.remove(jobId);
+            return true;
+        }
+
+        wait();
+        return false;
+    }
+
+    public synchronized void cleanup(int jobId) {
+        allJobMetrics.remove(jobId);
+        jobIdToStageId.remove(jobId);
+        Iterator<Map.Entry<Integer, Integer>> iterator = 
stageIdToJobId.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Integer> entry = iterator.next();
+            if (entry.getValue() == jobId) {
+                iterator.remove();
+            }
+        }
+    }
+
+    public synchronized void reset() {
+        stageIdToJobId.clear();
+        jobIdToStageId.clear();
+        allJobMetrics.clear();
+        finishedJobIds.clear();
+    }
+}
\ No newline at end of file

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Pulled class from Hive on Spark
+ */
+public class KryoSerializer {
+    private static final Log LOG = LogFactory.getLog(KryoSerializer.class);
+
+    public static byte[] serializeJobConf(JobConf jobConf) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try {
+            jobConf.write(new DataOutputStream(out));
+        } catch (IOException e) {
+            LOG.error("Error serializing job configuration", e);
+            return null;
+        } finally {
+            try {
+                out.close();
+            } catch (IOException e) {
+                LOG.error("Error closing output stream", e);
+            }
+        }
+
+        return out.toByteArray();
+
+    }
+
+    public static JobConf deserializeJobConf(byte[] buffer) {
+        JobConf conf = new JobConf();
+        try {
+            conf.readFields(new DataInputStream(new 
ByteArrayInputStream(buffer)));
+        } catch (IOException e) {
+            LOG.error("Error de-serializing job configuration");
+            return null;
+        }
+        return conf;
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.lang.reflect.Method;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.Partitioner;
+
+/**
+ * Spark Partitioner that wraps a custom partitioner that implements
+ * org.apache.hadoop.mapreduce.Partitioner interface.
+ *
+ * Since Spark's shuffle API takes a different parititioner class
+ * (@see org.apache.spark.Partitioner) compared to MapReduce, we need to
+ * wrap custom partitioners written for MapReduce inside this Spark 
Partitioner.
+ *
+ * MR Custom partitioners are expected to implement getPartition() with
+ * specific arguments:
+ *   public int getPartition(PigNullableWritable key, Writable value, int 
numPartitions)
+ * For an example of such a partitioner,
+ * @see org.apache.pig.test.utils.SimpleCustomPartitioner
+ */
+public class MapReducePartitionerWrapper extends Partitioner {
+    private static final Log LOG = 
LogFactory.getLog(MapReducePartitionerWrapper.class);
+
+    private int numPartitions;
+    private String partitionerName;
+    // MR's Partitioner interface is not serializable.
+    // And since it is not serializable, it cannot be initialized in the 
constructor
+    // (in Spark's DAG scheduler thread in Spark Driver),
+    // To workaround this, It will be lazily initialized inside the map task
+    // (Executor thread) first time that getPartitions() gets called.
+    transient private 
org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>
+            mapredPartitioner = null;
+    transient private Method getPartitionMethod = null;
+
+    public MapReducePartitionerWrapper(String partitionerName,
+                      int numPartitions) {
+        if (partitionerName == null) {
+            throw new RuntimeException("MapReduce Partitioner cannot be 
null.");
+        }
+
+        this.partitionerName = partitionerName;
+        this.numPartitions = numPartitions;
+    }
+
+    public int numPartitions() {
+        return numPartitions;
+    }
+
+    public int getPartition(final Object key) {
+        try {
+
+            PigNullableWritable writeableKey = new PigNullableWritable() {
+                public Object getValueAsPigType() {
+                    if (key instanceof IndexedKey) {
+                        IndexedKey indexedKey = (IndexedKey) key;
+                        this.setIndex(indexedKey.getIndex());
+                        return indexedKey.getKey();
+                    } else {
+                        return key;
+                    }
+                }
+            };
+
+
+            // Lazy initialization
+            // Synchronized because multiple (map) tasks in the same Spark 
Executor
+            // may call getPartition, attempting to initialize at the same 
time.
+            if (mapredPartitioner == null) {
+                synchronized (this) {
+                    // check again for race condition
+                    if (mapredPartitioner == null) {
+                        Class<?> mapredPartitionerClass =
+                                PigContext.resolveClassName(partitionerName);
+                        Configuration conf = new Configuration();
+                        mapredPartitioner = 
(org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>)
+                                
ReflectionUtils.newInstance(mapredPartitionerClass, conf);
+                        getPartitionMethod = mapredPartitionerClass.getMethod(
+                                "getPartition",
+                                PigNullableWritable.class,
+                                org.apache.hadoop.io.Writable.class,
+                                int.class);
+                    }
+                }
+            }
+
+            // MR Parititioner getPartition takes a value argument as well, but
+            // Spark's Partitioner only accepts the key argument.
+            // In practice, MR Partitioners ignore the value. However, it's
+            // possible that some don't.
+            // TODO: We could handle this case by packaging the value inside 
the
+            // key (conditioned on some config option, since this will balloon
+            // memory usage). PIG-4575.
+            int partition = (Integer) 
getPartitionMethod.invoke(mapredPartitioner,
+                    writeableKey, null, numPartitions);
+
+            return partition;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * The object of SparkEngineConf is to solve the initialization problem of 
PigContext.properties.get("udf.import.list"),
+ * UDFContext#udfConfs, UDFContext#clientSysProps in spark mode. These 
variables can not be
+ * serialized because they are ThreadLocal variables. In MR mode, they are 
serialized in JobConfiguration
+ * in JobControlCompiler#getJob and deserialized by JobConfiguration in 
PigGenericMapBase#setup. But there is no
+ * setup() in spark like what in mr, so these variables can be correctly 
deserialized before spark programs call them.
+ * Here we use following solution to solve:
+ * these variables are saved in SparkEngineConf#writeObject and available and 
then initialized
+ * in SparkEngineConf#readObject in spark executor thread.
+ */
+public class SparkEngineConf implements Serializable {
+
+    private static final Log LOG = LogFactory.getLog(SparkEngineConf.class);
+    private static String SPARK_UDF_IMPORT_LIST = "pig.spark.udf.import.list";
+    private static String SPARK_UDFCONTEXT_UDFCONFS = 
"pig.spark.udfcontext.udfConfs";
+    private static String SPARK_UDFCONTEXT_CLIENTSYSPROPS = 
"pig.spark.udfcontext.clientSysProps";
+
+    private Properties properties = new Properties();
+
+    public SparkEngineConf() {
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        ArrayList<String> udfImportList = (ArrayList<String>) in.readObject();
+        PigContext.setPackageImportList(udfImportList);
+        String udfConfsStr = (String) in.readObject();
+        String clientSysPropsStr = (String) in.readObject();
+        UDFContext.getUDFContext().deserializeForSpark(udfConfsStr, 
clientSysPropsStr);
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        ArrayList<String> udfImportList = 
Lists.newArrayList(Splitter.on(",").split(properties.getProperty(SPARK_UDF_IMPORT_LIST)));
+        out.writeObject(udfImportList);
+        //2 threads call SparkEngineConf#writeObject
+        //In main thread: SparkLauncher#initialize->SparkUtil#newJobConf
+        //                ->ObjectSerializer#serialize-> 
SparkEngineConf#writeObject
+        //In dag-scheduler-event-loop thread: 
DAGScheduler.submitMissingTasks->JavaSerializationStream.writeObject
+        //
+        //In main thread,UDFContext#getUDFContext is not empty, we store 
UDFContext#udfConfs and UDFContext#clientSysProps
+        //into properties and serialize them.
+        //In dag-scheduler-event-loop thread, UDFContext#getUDFContext is 
empty, we get value of UDFContext#udfConfs and UDFContext#clientSysProps
+        //from properties and serialize them.
+        if (!UDFContext.getUDFContext().isUDFConfEmpty()) {
+            //In SparkUtil#newJobConf(), sparkEngineConf is serialized in job 
configuration and will call
+            //SparkEngineConf#writeObject(at this time UDFContext#udfConfs and 
UDFContext#clientSysProps is not null)
+            //later spark will call JavaSerializationStream.writeObject to 
serialize all objects when submit spark
+            //jobs(at that time, UDFContext#udfConfs and 
UDFContext#clientSysProps is null so we need to save their
+            //value in SparkEngineConf#properties after these two variables 
are correctly initialized in
+            //SparkUtil#newJobConf, More detailed see PIG-4920
+            String udfConfsStr = UDFContext.getUDFContext().serialize();
+            String clientSysPropsStr = 
ObjectSerializer.serialize(UDFContext.getUDFContext().getClientSystemProps());
+            this.properties.setProperty(SPARK_UDFCONTEXT_UDFCONFS, 
udfConfsStr);
+            this.properties.setProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS, 
clientSysPropsStr);
+            out.writeObject(udfConfsStr);
+            out.writeObject(clientSysPropsStr);
+        } else {
+            
out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS));
+            
out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS));
+        }
+    }
+
+    public void setSparkUdfImportListStr(String sparkUdfImportListStr) {
+        this.properties.setProperty(SPARK_UDF_IMPORT_LIST, 
sparkUdfImportListStr);
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.impl.PigContext;
+
+public class SparkExecType implements ExecType {
+
+    private static final long serialVersionUID = 1L;
+    private static final String mode = "SPARK";
+
+    @Override
+    public boolean accepts(Properties properties) {
+        String execTypeSpecified = properties.getProperty("exectype", "")
+                .toUpperCase();
+        if (execTypeSpecified.equals(mode))
+            return true;
+        return false;
+    }
+
+    @Override
+    public ExecutionEngine getExecutionEngine(PigContext pigContext) {
+        return new SparkExecutionEngine(pigContext);
+    }
+
+    @Override
+    public Class<? extends ExecutionEngine> getExecutionEngineClass() {
+        return SparkExecutionEngine.class;
+    }
+
+    @Override
+    public boolean isLocal() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return "SPARK";
+    }
+
+    public String toString() {
+        return name();
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.util.UUID;
+
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.streaming.SparkExecutableManager;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkScriptState;
+
+public class SparkExecutionEngine extends HExecutionEngine {
+
+    public SparkExecutionEngine(PigContext pigContext) {
+        super(pigContext);
+        this.launcher = new SparkLauncher();
+    }
+
+    @Override
+    public ScriptState instantiateScriptState() {
+        SparkScriptState ss = new 
SparkScriptState(UUID.randomUUID().toString());
+        ss.setPigContext(pigContext);
+        return ss;
+    }
+
+    @Override
+    public ExecutableManager getExecutableManager() {
+        return new SparkExecutableManager();
+    }
+
+    @Override
+    public PigStats instantiatePigStats() {
+        return new SparkPigStats();
+    }
+}


Reply via email to