Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
 Wed Apr 12 02:20:20 2017
@@ -1,137 +1,137 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.spark.converter;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.rdd.RDD;
-
-public class CounterConverter implements RDDConverter<Tuple, Tuple, POCounter> 
{
-
-       private static final Log LOG = 
LogFactory.getLog(CounterConverter.class);
-       
-       @Override
-       public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, 
-                       POCounter poCounter) throws IOException {
-               SparkUtil.assertPredecessorSize(predecessors, poCounter, 1);
-        RDD<Tuple> rdd = predecessors.get(0);
-        CounterConverterFunction f = new CounterConverterFunction(poCounter);
-        JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true);
-//        jRdd = jRdd.cache();
-        return jRdd.rdd();
-       }
-       
-       @SuppressWarnings("serial")
-       private static class CounterConverterFunction implements 
-               Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, 
Serializable {
-
-               private final POCounter poCounter;
-               private long localCount = 1L;
-               private long sparkCount = 0L;
-               
-               private CounterConverterFunction(POCounter poCounter) {
-                       this.poCounter = poCounter;
-               }
-               
-               @Override
-               public Iterator<Tuple> call(Integer index, final 
-                               Iterator<Tuple> input) {
-               Tuple inp = null;
-               Tuple output = null;
-               long sizeBag = 0L;
-
-               List<Tuple> listOutput = new ArrayList<Tuple>();
-               
-               try {
-                       while (input.hasNext()) {
-                                       inp = input.next();
-                                       output = TupleFactory.getInstance()
-                                                       
.newTuple(inp.getAll().size() + 3);
-                                       
-                                       for (int i = 0; i < 
inp.getAll().size(); i++) {
-                                               output.set(i + 3, inp.get(i));
-                                       }
-                                       
-                                       if (poCounter.isRowNumber() || 
poCounter.isDenseRank()) {
-                                               output.set(2, 
getLocalCounter());
-                                               incrementSparkCounter();
-                                               incrementLocalCounter();
-                                       } else if (!poCounter.isDenseRank()) {
-                                               int positionBag = 
inp.getAll().size()-1;
-                                               if (inp.getType(positionBag) == 
DataType.BAG) {
-                                       sizeBag = 
((org.apache.pig.data.DefaultAbstractBag)
-                                                       
inp.get(positionBag)).size();
-                                   }
-                                               
-                                               output.set(2, 
getLocalCounter());
-                               
-                                               addToSparkCounter(sizeBag);
-                               addToLocalCounter(sizeBag);
-                                       }
-                                       
-                                       output.set(0, index);
-                                       output.set(1, getSparkCounter());
-                                       listOutput.add(output);
-                               }
-               } catch(ExecException e) {
-                       throw new RuntimeException(e);
-               }
-                       
-                                       
-                       return listOutput.iterator();
-               }
-               
-               private long getLocalCounter() {
-                       return localCount;
-               }
-               
-               private long incrementLocalCounter() {
-                       return localCount++;
-               }
-               
-               private long addToLocalCounter(long amount) {
-                       return localCount += amount;
-               }
-               
-               private long getSparkCounter() {
-                       return sparkCount;
-               }
-               
-               private long incrementSparkCounter() {
-                       return sparkCount++;
-               }
-               
-               private long addToSparkCounter(long amount) {
-                       return sparkCount += amount;
-               }
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.rdd.RDD;
+
+public class CounterConverter implements RDDConverter<Tuple, Tuple, POCounter> 
{
+
+    private static final Log LOG = LogFactory.getLog(CounterConverter.class);
+    
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, 
+            POCounter poCounter) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, poCounter, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        CounterConverterFunction f = new CounterConverterFunction(poCounter);
+        JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true);
+//        jRdd = jRdd.cache();
+        return jRdd.rdd();
+    }
+    
+    @SuppressWarnings("serial")
+    private static class CounterConverterFunction implements 
+        Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable {
+
+        private final POCounter poCounter;
+        private long localCount = 1L;
+        private long sparkCount = 0L;
+        
+        private CounterConverterFunction(POCounter poCounter) {
+            this.poCounter = poCounter;
+        }
+        
+        @Override
+        public Iterator<Tuple> call(Integer index, final 
+                Iterator<Tuple> input) {
+            Tuple inp = null;
+            Tuple output = null;
+            long sizeBag = 0L;
+
+            List<Tuple> listOutput = new ArrayList<Tuple>();
+            
+            try {
+                while (input.hasNext()) {
+                    inp = input.next();
+                    output = TupleFactory.getInstance()
+                            .newTuple(inp.getAll().size() + 3);
+                    
+                    for (int i = 0; i < inp.getAll().size(); i++) {
+                        output.set(i + 3, inp.get(i));
+                    }
+                    
+                    if (poCounter.isRowNumber() || poCounter.isDenseRank()) {
+                        output.set(2, getLocalCounter());
+                        incrementSparkCounter();
+                        incrementLocalCounter();
+                    } else if (!poCounter.isDenseRank()) {
+                        int positionBag = inp.getAll().size()-1;
+                        if (inp.getType(positionBag) == DataType.BAG) {
+                            sizeBag = ((org.apache.pig.data.DefaultAbstractBag)
+                                    inp.get(positionBag)).size();
+                        }
+                        
+                        output.set(2, getLocalCounter());
+                        
+                        addToSparkCounter(sizeBag);
+                        addToLocalCounter(sizeBag);
+                    }
+                    
+                    output.set(0, index);
+                    output.set(1, getSparkCounter());
+                    listOutput.add(output);
+                }
+            } catch(ExecException e) {
+                throw new RuntimeException(e);
+            }
+            
+                    
+            return listOutput.iterator();
+        }
+        
+        private long getLocalCounter() {
+            return localCount;
+        }
+        
+        private long incrementLocalCounter() {
+            return localCount++;
+        }
+        
+        private long addToLocalCounter(long amount) {
+            return localCount += amount;
+        }
+        
+        private long getSparkCounter() {
+            return sparkCount;
+        }
+        
+        private long incrementSparkCounter() {
+            return sparkCount++;
+        }
+        
+        private long addToSparkCounter(long amount) {
+            return sparkCount += amount;
+        }
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
 Wed Apr 12 02:20:20 2017
@@ -21,18 +21,19 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
-import scala.Tuple2;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.AbstractFunction2;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.rdd.PairRDDFunctions;
 import org.apache.spark.rdd.RDD;
 
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.AbstractFunction2;
+
 @SuppressWarnings({ "serial" })
 public class DistinctConverter implements RDDConverter<Tuple, Tuple, 
PODistinct> {
     private static final Log LOG = LogFactory.getLog(DistinctConverter.class);
@@ -51,7 +52,7 @@ public class DistinctConverter implement
           = new PairRDDFunctions<Tuple, Object>(keyValRDD,
                 SparkUtil.getManifest(Tuple.class),
                 SparkUtil.getManifest(Object.class), null);
-        int parallelism = SparkUtil.getParallelism(predecessors, op);
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
op);
         return pairRDDFunctions.reduceByKey(
                 SparkUtil.getPartitioner(op.getCustomPartitioner(), 
parallelism),
                 new MergeValuesFunction())
@@ -66,15 +67,9 @@ public class DistinctConverter implement
             Serializable {
         @Override
         public Tuple2<Tuple, Object> apply(Tuple t) {
-            if (LOG.isDebugEnabled())
-                LOG.debug("DistinctConverter.ToKeyValueFunction in " + t);
-
             Tuple key = t;
             Object value = null;
             Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
-
-            if (LOG.isDebugEnabled())
-                LOG.debug("DistinctConverter.ToKeyValueFunction out " + out);
             return out;
         }
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
 Wed Apr 12 02:20:20 2017
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -59,7 +60,7 @@ public class FRJoinConverter implements
         Map<String, List<Tuple>> replicatedInputMap = new HashMap<>();
 
         for (String replicatedInput : replicatedInputs) {
-            replicatedInputMap.put(replicatedInput, 
SparkUtil.getBroadcastedVars().get(replicatedInput).value());
+            replicatedInputMap.put(replicatedInput, 
SparkPigContext.get().getBroadcastedVars().get(replicatedInput).value());
         }
 
         poFRJoin.attachInputs(replicatedInputMap);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
 Wed Apr 12 02:20:20 2017
@@ -23,15 +23,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.runtime.AbstractFunction1;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.data.Tuple;
@@ -44,6 +39,12 @@ import org.apache.spark.api.java.functio
 import org.apache.spark.rdd.CoGroupedRDD;
 import org.apache.spark.rdd.RDD;
 
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.runtime.AbstractFunction1;
+
 @SuppressWarnings({ "serial" })
 public class GlobalRearrangeConverter implements
         RDDConverter<Tuple, Tuple, POGlobalRearrangeSpark> {
@@ -57,7 +58,7 @@ public class GlobalRearrangeConverter im
                               POGlobalRearrangeSpark op) throws IOException {
         SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
                 op, 0);
-        int parallelism = SparkUtil.getParallelism(predecessors,
+        int parallelism = SparkPigContext.get().getParallelism(predecessors,
                 op);
 
 //         TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 
input),
@@ -322,7 +323,6 @@ public class GlobalRearrangeConverter im
                             try {
                                 Tuple tuple = tf.newTuple(3);
                                 tuple.set(0, index);
-                                tuple.set(1, key);
                                 tuple.set(2, next);
                                 return tuple;
                             } catch (ExecException e) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
 Wed Apr 12 02:20:20 2017
@@ -23,18 +23,14 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.backend.executionengine.ExecException;
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
@@ -44,6 +40,11 @@ import org.apache.pig.impl.io.PigNullabl
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.rdd.CoGroupedRDD;
 import org.apache.spark.rdd.RDD;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
 import scala.runtime.AbstractFunction1;
 
 public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, 
POJoinGroupSpark> {
@@ -54,10 +55,10 @@ public class JoinGroupSparkConverter imp
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark 
op) throws IOException {
         SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
                 op, 0);
-        List<POLocalRearrange> lraOps = op.getLraOps();
-        POGlobalRearrangeSpark glaOp = op.getGlaOp();
+        List<POLocalRearrange> lraOps = op.getLROps();
+        POGlobalRearrangeSpark glaOp = op.getGROp();
         POPackage pkgOp = op.getPkgOp();
-        int parallelism = SparkUtil.getParallelism(predecessors, glaOp);
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
glaOp);
         List<RDD<Tuple2<IndexedKey, Tuple>>> rddAfterLRA = new 
ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
         boolean useSecondaryKey = glaOp.isUseSecondaryKey();
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Wed Apr 12 02:20:20 2017
@@ -119,7 +119,7 @@ public class LoadConverter implements RD
         //create SparkCounter and set it for ToTupleFunction
         boolean disableCounter = jobConf.getBoolean("pig.disable.counter", 
false);
         if (!op.isTmpLoad() && !disableCounter) {
-            String counterName = SparkStatsUtil.getLoadSparkCounterName(op);
+            String counterName = SparkStatsUtil.getCounterName(op);
             SparkPigStatusReporter counterReporter = 
SparkPigStatusReporter.getInstance();
             if (counterReporter.getCounters() != null) {
                 counterReporter.getCounters().createCounter(

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
 Wed Apr 12 02:20:20 2017
@@ -94,8 +94,7 @@ public class PackageConverter implements
                                     .byteValue());
                             if (LOG.isDebugEnabled())
                                 LOG.debug("Setting index to " + next.get(0) +
-                                    " for tuple " + (Tuple)next.get(2) + " 
with key " +
-                                    next.get(1));
+                                    " for tuple " + (Tuple)next.get(1));
                             return nullableTuple;
                         } catch (ExecException e) {
                             throw new RuntimeException(e);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
 Wed Apr 12 02:20:20 2017
@@ -1,134 +1,135 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.spark.converter;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import scala.Tuple2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.rdd.RDD;
-
-public class RankConverter implements RDDConverter<Tuple, Tuple, PORank> {
-
-       private static final Log LOG = LogFactory.getLog(RankConverter.class);
-       
-       @Override
-       public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
-                       throws IOException {
-        int parallelism = SparkUtil.getParallelism(predecessors, poRank);
-        SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
-        RDD<Tuple> rdd = predecessors.get(0);
-        JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
-                .mapToPair(new ToPairRdd());
-        JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
-                .groupByKey(parallelism);
-        JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
-                .mapToPair(new IndexCounters());
-        JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
-                .sortByKey(true, parallelism);
-        Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
-        JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
-                .map(new RankFunction(new HashMap<Integer, Long>(counts)));
-        return finalRdd.rdd();
-    }
-
-       @SuppressWarnings("serial")
-       private static class ToPairRdd implements 
-               PairFunction<Tuple, Integer, Long>, Serializable {
-
-        @Override
-        public Tuple2<Integer, Long> call(Tuple t) {
-            try {
-                Integer key = (Integer) t.get(0);
-                Long value = (Long) t.get(1);
-                return new Tuple2<Integer, Long>(key, value);
-            } catch (ExecException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-       
-       @SuppressWarnings("serial")
-       private static class IndexCounters implements 
-               PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>, 
-               Serializable {
-               @Override
-               public Tuple2<Integer, Long> call(Tuple2<Integer, 
-                               Iterable<Long>> input) {
-                       long lastVaue = 0L;
-                       
-                       for (Long t : input._2()) {
-                               lastVaue = (t > lastVaue) ? t : lastVaue;
-                       }
-
-                       return new Tuple2<Integer, Long>(input._1(), lastVaue);
-               }
-    }
-       
-       @SuppressWarnings("serial")
-       private static class RankFunction implements Function<Tuple, Tuple>, 
-                       Serializable {
-               private final HashMap<Integer, Long> counts;
-               
-               private RankFunction(HashMap<Integer, Long> counts) {
-                       this.counts = counts;
-               }
-               
-               @Override
-               public Tuple call(Tuple input) throws Exception {
-                       Tuple output = TupleFactory.getInstance()
-                                       .newTuple(input.getAll().size() - 2);
-                       
-                       for (int i = 1; i < input.getAll().size() - 2; i ++) {
-                               output.set(i, input.get(i+2));
-                       }
-                       
-                       long offset = calculateOffset((Integer) input.get(0));
-                       output.set(0, offset + (Long)input.get(2));
-                       return output;
-               }
-               
-               private long calculateOffset(Integer index) {
-                       long offset = 0;
-                       
-                       if (index > 0) {
-                               for (int i = 0; i < index; i++) {
-                                       if (counts.containsKey(i)) {
-                                               offset += counts.get(i);
-                                       }
-                               }
-                       }
-                       return offset;
-               }
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+
+public class RankConverter implements RDDConverter<Tuple, Tuple, PORank> {
+
+    private static final Log LOG = LogFactory.getLog(RankConverter.class);
+    
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
+            throws IOException {
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
poRank);
+        SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
+                .mapToPair(new ToPairRdd());
+        JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
+                .groupByKey(parallelism);
+        JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
+                .mapToPair(new IndexCounters());
+        JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
+                .sortByKey(true, parallelism);
+        Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
+        JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
+                .map(new RankFunction(new HashMap<Integer, Long>(counts)));
+        return finalRdd.rdd();
+    }
+
+    @SuppressWarnings("serial")
+    private static class ToPairRdd implements 
+        PairFunction<Tuple, Integer, Long>, Serializable {
+
+        @Override
+        public Tuple2<Integer, Long> call(Tuple t) {
+            try {
+                Integer key = (Integer) t.get(0);
+                Long value = (Long) t.get(1);
+                return new Tuple2<Integer, Long>(key, value);
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    @SuppressWarnings("serial")
+    private static class IndexCounters implements 
+        PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>, 
+        Serializable {
+        @Override
+        public Tuple2<Integer, Long> call(Tuple2<Integer, 
+                Iterable<Long>> input) {
+            long lastVaue = 0L;
+            
+            for (Long t : input._2()) {
+                lastVaue = (t > lastVaue) ? t : lastVaue;
+            }
+
+            return new Tuple2<Integer, Long>(input._1(), lastVaue);
+        }
+    }
+    
+    @SuppressWarnings("serial")
+    private static class RankFunction implements Function<Tuple, Tuple>, 
+            Serializable {
+        private final HashMap<Integer, Long> counts;
+        
+        private RankFunction(HashMap<Integer, Long> counts) {
+            this.counts = counts;
+        }
+        
+        @Override
+        public Tuple call(Tuple input) throws Exception {
+            Tuple output = TupleFactory.getInstance()
+                    .newTuple(input.getAll().size() - 2);
+            
+            for (int i = 1; i < input.getAll().size() - 2; i ++) {
+                output.set(i, input.get(i+2));
+            }
+            
+            long offset = calculateOffset((Integer) input.get(0));
+            output.set(0, offset + (Long)input.get(2));
+            return output;
+        }
+        
+        private long calculateOffset(Integer index) {
+            long offset = 0;
+            
+            if (index > 0) {
+                for (int i = 0; i < index; i++) {
+                    if (counts.containsKey(i)) {
+                        offset += counts.get(i);
+                    }
+                }
+            }
+            return offset;
+        }
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
 Wed Apr 12 02:20:20 2017
@@ -31,6 +31,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
 import org.apache.pig.data.DataBag;
@@ -54,14 +55,14 @@ public class ReduceByConverter implement
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark 
op) throws IOException {
         SparkUtil.assertPredecessorSize(predecessors, op, 1);
-        int parallelism = SparkUtil.getParallelism(predecessors, op);
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
op);
 
         RDD<Tuple> rdd = predecessors.get(0);
         RDD<Tuple2<IndexedKey, Tuple>> rddPair
-                = rdd.map(new LocalRearrangeFunction(op.getLgr(), 
op.isUseSecondaryKey(), op.getSecondarySortOrder())
+                = rdd.map(new LocalRearrangeFunction(op.getLROp(), 
op.isUseSecondaryKey(), op.getSecondarySortOrder())
                 , SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
         if (op.isUseSecondaryKey()) {
-            return SecondaryKeySortUtil.handleSecondarySort(rddPair, 
op.getPkg());
+            return SecondaryKeySortUtil.handleSecondarySort(rddPair, 
op.getPKGOp());
         } else {
             PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
                     = new PairRDDFunctions<>(rddPair,
@@ -189,8 +190,8 @@ public class ReduceByConverter implement
                 t.append(key);
                 t.append(bag);
 
-                poReduce.getPkg().getPkgr().attachInput(key, new 
DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
-                Tuple packagedTuple = (Tuple) 
poReduce.getPkg().getPkgr().getNext().result;
+                poReduce.getPKGOp().getPkgr().attachInput(key, new 
DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+                Tuple packagedTuple = (Tuple) 
poReduce.getPKGOp().getPkgr().getNext().result;
 
                 // Perform the operation
                 LOG.debug("MergeValuesFunction packagedTuple : " + t);
@@ -242,8 +243,8 @@ public class ReduceByConverter implement
                 bag.add((Tuple) v1._2().get(1));
                 t.append(key);
                 t.append(bag);
-                poReduce.getPkg().getPkgr().attachInput(key, new 
DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
-                packagedTuple = (Tuple) 
poReduce.getPkg().getPkgr().getNext().result;
+                poReduce.getPKGOp().getPkgr().attachInput(key, new 
DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+                packagedTuple = (Tuple) 
poReduce.getPKGOp().getPkgr().getNext().result;
             } catch (ExecException e) {
                 throw new RuntimeException(e);
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 Wed Apr 12 02:20:20 2017
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.util.Pair;
@@ -84,10 +85,10 @@ public class SkewedJoinConverter impleme
         RDD<Tuple> rdd1 = predecessors.get(0);
         RDD<Tuple> rdd2 = predecessors.get(1);
 
-        Broadcast<List<Tuple>> keyDist = 
SparkUtil.getBroadcastedVars().get(skewedJoinPartitionFile);
+        Broadcast<List<Tuple>> keyDist = 
SparkPigContext.get().getBroadcastedVars().get(skewedJoinPartitionFile);
 
         // if no keyDist,  we need  defaultParallelism
-        Integer defaultParallelism = SparkUtil.getParallelism(predecessors, 
poSkewedJoin);
+        Integer defaultParallelism = 
SparkPigContext.get().getParallelism(predecessors, poSkewedJoin);
 
         // with partition id
         SkewPartitionIndexKeyFunction skewFun = new 
SkewPartitionIndexKeyFunction(this, keyDist, defaultParallelism);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
 Wed Apr 12 02:20:20 2017
@@ -28,6 +28,7 @@ import scala.runtime.AbstractFunction1;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -46,7 +47,7 @@ public class SortConverter implements RD
             throws IOException {
         SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
-        int parallelism = SparkUtil.getParallelism(predecessors, sortOperator);
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
sortOperator);
         RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(),
                 SparkUtil.<Tuple, Object> getTuple2Manifest());
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 Wed Apr 12 02:20:20 2017
@@ -71,7 +71,7 @@ public class StoreConverter implements
         RDD<Tuple> rdd = predecessors.get(0);
 
         
SparkPigStatusReporter.getInstance().createCounter(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP,
-                SparkStatsUtil.getStoreSparkCounterName(op));
+                SparkStatsUtil.getCounterName(op));
 
         // convert back to KV pairs
         JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(
@@ -166,7 +166,7 @@ public class StoreConverter implements
         if (!op.isTmpStore() && !disableCounter) {
             ftf.setDisableCounter(disableCounter);
             ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP);
-            ftf.setCounterName(SparkStatsUtil.getStoreSparkCounterName(op));
+            ftf.setCounterName(SparkStatsUtil.getCounterName(op));
             SparkPigStatusReporter counterReporter = 
SparkPigStatusReporter.getInstance();
             ftf.setSparkCounters(counterReporter.getCounters());
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 Wed Apr 12 02:20:20 2017
@@ -31,50 +31,50 @@ import org.apache.spark.api.java.functio
 import org.apache.spark.rdd.RDD;
 
 public class StreamConverter implements
-               RDDConverter<Tuple, Tuple, POStream> {
+        RDDConverter<Tuple, Tuple, POStream> {
 
-       @Override
-       public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
-                       POStream poStream) throws IOException {
-               SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
-               RDD<Tuple> rdd = predecessors.get(0);
-               StreamFunction streamFunction = new StreamFunction(poStream);
-               return rdd.toJavaRDD().mapPartitions(streamFunction, 
true).rdd();
-       }
-
-       private static class StreamFunction implements
-                       FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
-               private POStream poStream;
-
-               private StreamFunction(POStream poStream) {
-                       this.poStream = poStream;
-               }
-
-               public Iterable<Tuple> call(final Iterator<Tuple> input) {
-                       return new Iterable<Tuple>() {
-                               @Override
-                               public Iterator<Tuple> iterator() {
-                                       return new 
OutputConsumerIterator(input) {
-
-                                               @Override
-                                               protected void attach(Tuple 
tuple) {
-                                                       
poStream.setInputs(null);
-                                                       
poStream.attachInput(tuple);
-                                               }
-
-                                               @Override
-                                               protected Result 
getNextResult() throws ExecException {
-                                                       Result result = 
poStream.getNextTuple();
-                                                       return result;
-                                               }
-
-                                               @Override
-                                               protected void endOfInput() {
-                                                       
poStream.setFetchable(true);
-                                               }
-                                       };
-                               }
-                       };
-               }
-       }
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+            POStream poStream) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        StreamFunction streamFunction = new StreamFunction(poStream);
+        return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd();
+    }
+
+    private static class StreamFunction implements
+            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+        private POStream poStream;
+
+        private StreamFunction(POStream poStream) {
+            this.poStream = poStream;
+        }
+
+        public Iterable<Tuple> call(final Iterator<Tuple> input) {
+            return new Iterable<Tuple>() {
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poStream.setInputs(null);
+                            poStream.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            Result result = poStream.getNextTuple();
+                            return result;
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                            poStream.setFetchable(true);
+                        }
+                    };
+                }
+            };
+        }
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
 Wed Apr 12 02:20:20 2017
@@ -43,11 +43,11 @@ public class POJoinGroupSpark extends Ph
         this.pkgOp = pkgOp;
     }
 
-    public List<POLocalRearrange> getLraOps() {
+    public List<POLocalRearrange> getLROps() {
         return lraOps;
     }
 
-    public POGlobalRearrangeSpark getGlaOp() {
+    public POGlobalRearrangeSpark getGROp() {
         return glaOp;
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
 Wed Apr 12 02:20:20 2017
@@ -32,11 +32,10 @@ import org.apache.pig.impl.plan.VisitorE
 
 public class POPoissonSampleSpark extends POPoissonSample {
     private static final Log LOG = 
LogFactory.getLog(POPoissonSampleSpark.class);
-    private static final long serialVersionUID = 1L;
-
-
+    //TODO verify can be removed?
+    //private static final long serialVersionUID = 1L;
     // Only for Spark
-    private boolean endOfInput = false;
+    private transient boolean endOfInput = false;
 
     public boolean isEndOfInput() {
         return endOfInput;
@@ -50,12 +49,6 @@ public class POPoissonSampleSpark extend
         super(k, rp, sr, hp, tm);
     }
 
-
-    @Override
-    public void visit(PhyPlanVisitor v) throws VisitorException {
-        v.visitPoissonSampleSpark(this);
-    }
-
     @Override
     public Result getNextTuple() throws ExecException {
         if (!initialized) {
@@ -115,10 +108,12 @@ public class POPoissonSampleSpark extend
         Result pickedSample = newSample;
         updateSkipInterval((Tuple) pickedSample.result);
 
-        LOG.debug("pickedSample:");
-        if(pickedSample.result!=null){
-            for(int i=0;i<((Tuple) pickedSample.result).size();i++) {
-                LOG.debug("the "+i+" ele:"+((Tuple) 
pickedSample.result).get(i));
+        if( LOG.isDebugEnabled()) {
+            LOG.debug("pickedSample:");
+            if (pickedSample.result != null) {
+                for (int i = 0; i < ((Tuple) pickedSample.result).size(); i++) 
{
+                    LOG.debug("the " + i + " ele:" + ((Tuple) 
pickedSample.result).get(i));
+                }
             }
         }
         return pickedSample;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
 Wed Apr 12 02:20:20 2017
@@ -43,7 +43,7 @@ public class POReduceBySpark extends POF
         this.addOriginalLocation(lr.getAlias(), lr.getOriginalLocations());
     }
 
-    public POPackage getPkg() {
+    public POPackage getPKGOp() {
         return pkg;
     }
 
@@ -98,7 +98,7 @@ public class POReduceBySpark extends POF
         this.customPartitioner = customPartitioner;
     }
 
-    public POLocalRearrange getLgr() {
+    public POLocalRearrange getLROp() {
         return lr;
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
 Wed Apr 12 02:20:20 2017
@@ -17,6 +17,10 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
 
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -24,6 +28,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
+import java.util.List;
+
 /**
  * A visitor to optimize plans that determines if a vertex plan can run in
  * accumulative mode.
@@ -31,13 +37,24 @@ import org.apache.pig.impl.plan.VisitorE
 public class AccumulatorOptimizer extends SparkOpPlanVisitor {
 
     public AccumulatorOptimizer(SparkOperPlan plan) {
-               super(plan, new DepthFirstWalker<SparkOperator, 
SparkOperPlan>(plan));
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
     }
 
     @Override
     public void visitSparkOp(SparkOperator sparkOperator) throws
-                       VisitorException {
-        AccumulatorOptimizerUtil.addAccumulatorSpark(sparkOperator
-                .physicalPlan);
+            VisitorException {
+        PhysicalPlan plan = sparkOperator.physicalPlan;
+        List<PhysicalOperator> pos = plan.getRoots();
+        if (pos == null || pos.size() == 0) {
+            return;
+        }
+
+        List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(plan,
+                POGlobalRearrange.class);
+
+        for (POGlobalRearrange glr : glrs) {
+            List<PhysicalOperator> successors = plan.getSuccessors(glr);
+            AccumulatorOptimizerUtil.addAccumulator(plan, successors);
+        }
     }
 }
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
 Wed Apr 12 02:20:20 2017
@@ -325,7 +325,7 @@ public class CombinerOptimizer extends S
 
     // Update the ReduceBy Operator with the packaging used by Local rearrange.
     private void updatePackager(POReduceBySpark reduceOperator, 
POLocalRearrange lrearrange) throws OptimizerException {
-        Packager pkgr = reduceOperator.getPkg().getPkgr();
+        Packager pkgr = reduceOperator.getPKGOp().getPkgr();
         // annotate the package with information from the LORearrange
         // update the keyInfo information if already present in the POPackage
         Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = 
pkgr.getKeyInfo();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
 Wed Apr 12 02:20:20 2017
@@ -72,7 +72,7 @@ public class JoinGroupOptimizerSpark ext
                 try {
                     restructSparkOp(planWithJoinAndGroup, glrSpark, sparkOp);
                 } catch (PlanException e) {
-                    throw new 
RuntimeException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
+                    throw new 
VisitorException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
                 }
             }
         }
@@ -90,10 +90,7 @@ public class JoinGroupOptimizerSpark ext
             PhysicalPlan currentPlan = this.mCurrentWalker.getPlan();//If 
there are POSplit, we need traverse the POSplit.getPlans(), so use 
mCurrentWalker.getPlan()
             if( currentPlan != null) {
                 plansWithJoinAndGroup.add(currentPlan);
-            }else{
-                LOG.info("GlobalRearrangeDiscover#currentPlan is null");
             }
-
         }
 
         public List<PhysicalPlan> getPlansWithJoinAndGroup() {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
 Wed Apr 12 02:20:20 2017
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopFilterRemoverUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -30,7 +31,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -61,29 +61,11 @@ public class NoopFilterRemover extends S
                     if (value instanceof Boolean) {
                         Boolean filterValue = (Boolean) value;
                         if (filterValue) {
-                            removeFilter(filter, sparkOp.physicalPlan);
+                            NoopFilterRemoverUtil.removeFilter(filter, 
sparkOp.physicalPlan);
                         }
                     }
                 }
             }
         }
     }
-
-    private void removeFilter(POFilter filter, PhysicalPlan plan) {
-        if (plan.size() > 1) {
-            try {
-                List<PhysicalOperator> fInputs = filter.getInputs();
-                List<PhysicalOperator> sucs = plan.getSuccessors(filter);
-
-                plan.removeAndReconnect(filter);
-                if (sucs != null && sucs.size() != 0) {
-                    for (PhysicalOperator suc : sucs) {
-                        suc.setInputs(fInputs);
-                    }
-                }
-            } catch (PlanException pe) {
-                log.info("Couldn't remove a filter in optimizer: " + 
pe.getMessage());
-            }
-        }
-    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
 Wed Apr 12 02:20:20 2017
@@ -64,13 +64,13 @@ public class SecondaryKeyOptimizerSpark
         List<POLocalRearrange> rearranges = 
PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, 
POLocalRearrange.class);
         if (rearranges.isEmpty()) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("No POLocalRearranges found in the 
sparkOperator.Secondary key optimization is no need");
+                LOG.debug("No POLocalRearranges found in the spark operator" + 
sparkOperator.getOperatorKey() + ". Skipping secondary key optimization.");
             }
             return;
         }
 
         /**
-         * When every POLocalRearrange is encounted in the 
sparkOperator.physicalPlan,
+         * When ever POLocalRearrange is encountered in the 
sparkOperator.physicalPlan,
          * the sub-physicalplan between the previousLR(or root) to currentLR 
is considered as mapPlan(like what
          * we call in mapreduce) and the sub-physicalplan between the 
POGlobalRearrange(the successor of currentLR) and
          * nextLR(or leaf) is considered as reducePlan(like what we call in 
mapreduce).  After mapPlan and reducePlan are got,
@@ -109,9 +109,8 @@ public class SecondaryKeyOptimizerSpark
                 // The POLocalRearrange is sub-plan of a POSplit
                 mapPlan = PlanHelper.getLocalRearrangePlanFromSplit(mapPlan, 
currentLR.getOperatorKey());
             }
-
-            SecondaryKeyOptimizerUtil.setIsSparkMode(true);
-            SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = 
SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan, reducePlan);
+            SparkSecondaryKeyOptimizerUtil sparkSecondaryKeyOptUtil = new 
SparkSecondaryKeyOptimizerUtil();
+            SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = 
sparkSecondaryKeyOptUtil.applySecondaryKeySort(mapPlan, reducePlan);
             if (info != null) {
                 numSortRemoved += info.getNumSortRemoved();
                 numDistinctChanged += info.getNumDistinctChanged();

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java?rev=1791060&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
 Wed Apr 12 02:20:20 2017
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+
+import java.util.List;
+
+public class SparkSecondaryKeyOptimizerUtil extends SecondaryKeyOptimizerUtil{
+    private static Log log = 
LogFactory.getLog(SparkSecondaryKeyOptimizerUtil.class);
+
+    @Override
+    protected PhysicalOperator getCurrentNode(PhysicalOperator root, 
PhysicalPlan reducePlan) {
+        PhysicalOperator currentNode = null;
+
+        if (!(root instanceof POGlobalRearrange)) {
+            log.debug("Expected reduce root to be a POGlobalRearrange, skip 
secondary key optimizing");
+            currentNode = null;
+        } else {
+            List<PhysicalOperator> globalRearrangeSuccs = reducePlan
+                    .getSuccessors(root);
+            if (globalRearrangeSuccs.size() == 1) {
+                currentNode = globalRearrangeSuccs.get(0);
+            } else {
+                log.debug("Expected successor of a POGlobalRearrange is 
POPackage, skip secondary key optimizing");
+                currentNode = null;
+            }
+        }
+
+        return currentNode;
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
 Wed Apr 12 02:20:20 2017
@@ -41,8 +41,8 @@ public class DotSparkPrinter extends Dot
         DotSparkPrinter.InnerOperator,
         DotSparkPrinter.InnerPlan> {
 
-    static int counter = 0;
-    boolean isVerboseNesting = true;
+    private static int counter = 0;
+    private boolean isVerboseNesting = true;
 
     public DotSparkPrinter(SparkOperPlan plan, PrintStream ps) {
         this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>(),


Reply via email to