Author: xuefu
Date: Wed Feb 15 04:28:47 2017
New Revision: 1783062

URL: http://svn.apache.org/viewvc?rev=1783062&view=rev
Log:
PIG-5044: Create SparkCompiler#getSamplingJob in spark mode (Liyun via Xuefu)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1783062&r1=1783061&r2=1783062&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 Wed Feb 15 04:28:47 2017
@@ -74,6 +74,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -367,4 +368,9 @@ public class PhyPlanVisitor extends Plan
 
     public void visitBroadcastSpark(POBroadcastSpark poBroadcastSpark) {
     }
+
+       public void visitPoissonSampleSpark(
+                       POPoissonSampleSpark poPoissonSampleSpark) {
+               // TODO Auto-generated method stub
+       }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1783062&r1=1783061&r2=1783062&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed Feb 15 04:28:47 2017
@@ -87,18 +87,22 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.PoissonSampleConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SparkSampleSortConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark;
@@ -212,6 +216,8 @@ public class SparkLauncher extends Launc
         convertMap.put(POReduceBySpark.class, new ReduceByConverter());
         convertMap.put(POPreCombinerLocalRearrange.class, new 
LocalRearrangeConverter());
         convertMap.put(POBroadcastSpark.class, new 
BroadcastConverter(sparkContext));
+        convertMap.put(POSampleSortSpark.class, new 
SparkSampleSortConverter());
+        convertMap.put(POPoissonSampleSpark.class, new 
PoissonSampleConverter());
 
         uploadResources(sparkplan);
         new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, 
jobMetricsListener, jobGroupID, jobConf, pigContext).visit();

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java?rev=1783062&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
 Wed Feb 15 04:28:47 2017
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class PoissonSampleConverter implements RDDConverter<Tuple, Tuple, 
POPoissonSampleSpark> {
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POPoissonSampleSpark po) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, po, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        PoissionSampleFunction poissionSampleFunction = new 
PoissionSampleFunction(po);
+        return rdd.toJavaRDD().mapPartitions(poissionSampleFunction, 
false).rdd();
+    }
+
+    private static class PoissionSampleFunction implements 
FlatMapFunction<Iterator<Tuple>, Tuple> {
+
+        private final POPoissonSampleSpark po;
+
+        public PoissionSampleFunction(POPoissonSampleSpark po) {
+            this.po = po;
+        }
+
+        @Override
+        public Iterable<Tuple> call(final Iterator<Tuple> tuples) {
+
+            return new Iterable<Tuple>() {
+
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(tuples) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            po.setInputs(null);
+                            po.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return po.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                            po.setEndOfInput(true);
+                        }
+                    };
+                }
+            };
+        }
+    }
+}

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1783062&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
 Wed Feb 15 04:28:47 2017
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.rdd.RDD;
+  /*
+   sort the sample data and convert the sample data to the format 
(all,{(sampleEle1),(sampleEle2),...})
+
+   */
+@SuppressWarnings("serial")
+public class SparkSampleSortConverter implements RDDConverter<Tuple, Tuple, 
POSampleSortSpark> {
+    private static final Log LOG = 
LogFactory.getLog(SparkSampleSortConverter.class);
+    private static TupleFactory tf = TupleFactory.getInstance();
+    private static BagFactory bf = DefaultBagFactory.getInstance();
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSampleSortSpark 
sortOperator)
+            throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(),
+                SparkUtil.<Tuple, Object> getTuple2Manifest());
+
+        JavaPairRDD<Tuple, Object> r = new JavaPairRDD<Tuple, Object>(rddPair,
+                SparkUtil.getManifest(Tuple.class),
+                SparkUtil.getManifest(Object.class));
+         //sort sample data
+        JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true);
+         //convert every element in sample data from element to (all, element) 
format
+        JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(new 
AggregateFunction());
+        //use groupByKey to aggregate all values( the format will be 
((all),{(sampleEle1),(sampleEle2),...} )
+        JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new 
ToValueFunction());
+        return  groupByKey.rdd();
+    }
+
+
+    private static class MergeFunction implements 
org.apache.spark.api.java.function.Function2<Tuple, Tuple, Tuple>
+            , Serializable {
+
+        @Override
+        public Tuple call(Tuple v1, Tuple v2) {
+                Tuple res = tf.newTuple();
+                res.append(v1);
+                res.append(v2);
+                LOG.info("MergeFunction out:"+res);
+                return res;
+        }
+    }
+
+    // input: Tuple2<Tuple,Object>
+    // output: Tuple2("all", Tuple)
+    private static class AggregateFunction implements
+            PairFlatMapFunction<Iterator<Tuple2<Tuple, Object>>, 
String,Tuple>, Serializable {
+
+        private class Tuple2TransformIterable implements 
Iterable<Tuple2<String,Tuple>> {
+
+            Iterator<Tuple2<Tuple, Object>> in;
+
+            Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) {
+                in = input;
+            }
+
+            public Iterator<Tuple2<String,Tuple>> iterator() {
+                return new IteratorTransform<Tuple2<Tuple, Object>, 
Tuple2<String,Tuple>>(in) {
+                    @Override
+                    protected Tuple2<String,Tuple> transform(Tuple2<Tuple, 
Object> next) {
+                        LOG.info("AggregateFunction in:"+ next._1()) ;
+                        return new Tuple2<String,Tuple>("all",next._1());
+                    }
+                };
+            }
+        }
+
+        @Override
+        public Iterable<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, 
Object>> input) throws Exception {
+            return new Tuple2TransformIterable(input);
+        }
+
+    }
+
+    private static class ToValueFunction implements Function<Tuple2<String, 
Iterable<Tuple>>, Tuple> {
+        @Override
+        public Tuple call(Tuple2<String, Iterable<Tuple>> next) throws 
Exception {
+            Tuple res = tf.newTuple();
+            res.append(next._1());
+            Iterator<Tuple> iter = next._2().iterator();
+            DataBag bag = bf.newDefaultBag();
+            while(iter.hasNext()) {
+                bag.add(iter.next());
+            }
+            res.append(bag);
+            LOG.info("ToValueFunction1 out:" + res);
+            return res;
+        }
+    }
+
+    private static class ToKeyValueFunction extends
+            AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+            Serializable {
+
+        @Override
+        public Tuple2<Tuple, Object> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Sort ToKeyValueFunction in " + t);
+            }
+            Tuple key = t;
+            Object value = null;
+            // (key, value)
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Sort ToKeyValueFunction out " + out);
+            }
+            return out;
+        }
+    }
+}

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1783062&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
 Wed Feb 15 04:28:47 2017
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POPoissonSampleSpark extends PhysicalOperator {
+    private static final Log LOG = 
LogFactory.getLog(POPoissonSampleSpark.class);
+    private static final long serialVersionUID = 1L;
+
+    // 17 is not a magic number. It can be obtained by using a poisson
+    // cumulative distribution function with the mean set to 10 (empirically,
+    // minimum number of samples) and the confidence set to 95%
+    public static final int DEFAULT_SAMPLE_RATE = 17;
+
+    private int sampleRate = 0;
+
+    private float heapPerc = 0f;
+
+    private Long totalMemory;
+
+    private transient boolean initialized;
+
+    // num of rows sampled so far
+    private transient int numRowsSampled;
+
+    // average size of tuple in memory, for tuples sampled
+    private transient long avgTupleMemSz;
+
+    // current row number
+    private transient long rowNum;
+
+    // number of tuples to skip after each sample
+    private transient long skipInterval;
+
+    // number of tuples which have been skipped.
+    private transient long numSkipped = 0;
+
+    // bytes in input to skip after every sample.
+    // divide this by avgTupleMemSize to get skipInterval
+    private transient long memToSkipPerSample;
+
+    // has the special row with row number information been returned
+    private transient boolean numRowSplTupleReturned;
+
+    // new Sample result
+    private transient Result newSample;
+
+    // Only for Spark
+    private boolean endOfInput = false;
+
+    public boolean isEndOfInput() {
+        return endOfInput;
+    }
+
+    public void setEndOfInput(boolean isEndOfInput) {
+        endOfInput = isEndOfInput;
+    }
+
+    public POPoissonSampleSpark(OperatorKey k, int rp, int sr, float hp, long 
tm) {
+        super(k, rp, null);
+        sampleRate = sr;
+        heapPerc = hp;
+        if (tm != -1) {
+            totalMemory = tm;
+        }
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitPoissonSampleSpark(this);
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        if (!initialized) {
+            numRowsSampled = 0;
+            avgTupleMemSz = 0;
+            rowNum = 0;
+            skipInterval = -1;
+            if (totalMemory == null) {
+                // Initialize in backend to get memory of task
+                totalMemory = Runtime.getRuntime().maxMemory();
+            }
+            long availRedMem = (long) (totalMemory * heapPerc);
+            memToSkipPerSample = availRedMem/sampleRate;
+            initialized = true;
+        }
+        if (numRowSplTupleReturned) {
+            // row num special row has been returned after all inputs
+            // were read, nothing more to read
+            return RESULT_EOP;
+        }
+
+        Result res;
+        res = processInput();
+
+        // if reaches at the end, pick last sampled record and return
+        if (this.isEndOfInput() && newSample != null) {
+            return createNumRowTuple((Tuple)newSample.result);
+        }
+
+        // just return to read next record from input
+        if (res.returnStatus == POStatus.STATUS_NULL) {
+            return new Result(POStatus.STATUS_NULL, null);
+        } else if (res.returnStatus == POStatus.STATUS_EOP
+                    || res.returnStatus == POStatus.STATUS_ERR) {
+            return res;
+        }
+
+        // got a 'OK' record
+        rowNum++;
+
+        if (numSkipped < skipInterval) {
+            numSkipped++;
+
+            // skip this tuple, and continue to read from input
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        // pick this record as sampled
+        newSample = res;
+        numSkipped = 0;
+        Result pickedSample = newSample;
+        updateSkipInterval((Tuple) pickedSample.result);
+
+        LOG.debug("pickedSample:");
+        if(pickedSample.result!=null){
+            for(int i=0;i<((Tuple) pickedSample.result).size();i++) {
+                LOG.debug("the "+i+" ele:"+((Tuple) 
pickedSample.result).get(i));
+            }
+        }
+        return pickedSample;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "PoissonSample - " + mKey.toString();
+    }
+
+    /**
+     * Update the average tuple size base on newly sampled tuple t
+     * and recalculate skipInterval
+     *
+     * @param t - tuple
+     */
+    private void updateSkipInterval(Tuple t) {
+        avgTupleMemSz =
+                ((avgTupleMemSz * numRowsSampled) + t.getMemorySize()) / 
(numRowsSampled + 1);
+        skipInterval = memToSkipPerSample / avgTupleMemSz;
+
+        // skipping fewer number of rows the first few times, to reduce the
+        // probability of first tuples size (if much smaller than rest)
+        // resulting in very few samples being sampled. Sampling a little extra
+        // is OK
+        if (numRowsSampled < 5) {
+            skipInterval = skipInterval / (10 - numRowsSampled);
+        }
+
+        ++numRowsSampled;
+    }
+
+    /**
+     * @param sample - sample tuple
+     * @return - Tuple appended with special marker string column, num-rows 
column
+     * @throws ExecException
+     */
+    private Result createNumRowTuple(Tuple sample) throws ExecException {
+        int sz = (sample == null) ? 0 : sample.size();
+        Tuple t = mTupleFactory.newTuple(sz + 2);
+
+        if (sample != null) {
+            for (int i = 0; i < sample.size(); i++) {
+                t.set(i, sample.get(i));
+            }
+        }
+
+        t.set(sz, PoissonSampleLoader.NUMROWS_TUPLE_MARKER);
+        t.set(sz + 1, rowNum);
+        numRowSplTupleReturned = true;
+        return new Result(POStatus.STATUS_OK, t);
+    }
+}

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java?rev=1783062&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
 Wed Feb 15 04:28:47 2017
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POSampleSortSpark extends POSort {
+
+    public POSampleSortSpark(POSort sort){
+        super(sort.getOperatorKey(), sort.getRequestedParallelism(), null, 
sort.getSortPlans(), sort.getMAscCols(), sort
+                .getMSortFunc());
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "POSparkSort" + "["
+                + DataType.findTypeName(resultType) + "]" + "("
+                + (super.getMSortFunc() != null ? 
super.getMSortFunc().getFuncSpec() : "") + ")"
+                + " - " + mKey.toString();
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1783062&r1=1783061&r2=1783062&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Wed Feb 15 04:28:47 2017
@@ -26,27 +26,34 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -61,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -68,14 +76,17 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
+import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -85,7 +96,9 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 
@@ -98,6 +111,7 @@ public class SparkCompiler extends PhyPl
     private static final Log LOG = LogFactory.getLog(SparkCompiler.class);
 
     private PigContext pigContext;
+    private Properties pigProperties;
 
        // The physicalPlan that is being compiled
        private PhysicalPlan physicalPlan;
@@ -691,6 +705,50 @@ public class SparkCompiler extends PhyPl
         }
     }
 
+//    /**
+//     * currently use regular join to replace skewedJoin
+//     * Skewed join currently works with two-table inner join.
+//     * More info about pig SkewedJoin, See 
https://wiki.apache.org/pig/PigSkewedJoinSpec
+//     *
+//     * @param op
+//     * @throws VisitorException
+//     */
+//    @Override
+//    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+//        try {
+//                     Random r = new Random();
+//                     String pigKeyDistFile = "pig.keyDistFile" + r.nextInt();
+//            // firstly, build sample job
+//            SparkOperator sampleSparkOp = getSkewedJoinSampleJob(op);
+//
+//                     buildBroadcastForSkewedJoin(sampleSparkOp, 
pigKeyDistFile);
+//
+//                     sampleSparkOp.markSampler();
+//                     sparkPlan.add(sampleSparkOp);
+//
+//                     // secondly, build the join job.
+//                     addToPlan(op);
+//                     curSparkOp.setSkewedJoinPartitionFile(pigKeyDistFile);
+//
+//                     // do sampling job before join job
+//                     sparkPlan.connect(sampleSparkOp, curSparkOp);
+//
+//                     phyToSparkOpMap.put(op, curSparkOp);
+//        } catch (Exception e) {
+//            int errCode = 2034;
+//            String msg = "Error compiling operator " +
+//                    op.getClass().getSimpleName();
+//            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+//        }
+//    }
+
+/*    private void buildBroadcastForSkewedJoin(SparkOperator sampleSparkOp, 
String pigKeyDistFile) throws PlanException {
+
+        POBroadcastSpark poBroadcast = new POBroadcastSpark(new 
OperatorKey(scope, nig.getNextNodeId(scope)));
+        poBroadcast.setBroadcastedVariableName(pigKeyDistFile);
+        sampleSparkOp.physicalPlan.addAsLeaf(poBroadcast);
+    }*/
+
     @Override
     public void visitFRJoin(POFRJoin op) throws VisitorException {
                try {
@@ -1141,4 +1199,359 @@ public class SparkCompiler extends PhyPl
         return new 
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }
+
+    private static class FindKeyTypeVisitor extends PhyPlanVisitor {
+
+        byte keyType = DataType.UNKNOWN;
+
+        FindKeyTypeVisitor(PhysicalPlan plan) {
+            super(plan,
+                    new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
+        }
+
+        @Override
+        public void visitProject(POProject p) throws VisitorException {
+            keyType = p.getResultType();
+        }
+    }
+
+
+    /**
+     * build a POPoissonSampleSpark operator for SkewedJoin's sampling job
+     */
+       private void addSampleOperatorForSkewedJoin(SparkOperator sampleSparkOp)
+                       throws PlanException {
+               Configuration conf = 
ConfigurationUtil.toConfiguration(pigProperties);
+               int sampleRate = conf.getInt(
+                               
PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE,
+                               POPoissonSampleSpark.DEFAULT_SAMPLE_RATE);
+               float heapPerc = conf.getFloat(
+                               PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
+                               PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+               long totalMemory = conf.getLong(
+                               PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1);
+
+               POPoissonSampleSpark poSample = new POPoissonSampleSpark(
+                               new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1,
+                               sampleRate, heapPerc, totalMemory);
+
+               sampleSparkOp.physicalPlan.addAsLeaf(poSample);
+       }
+
+    private SparkOperator getSortJob(
+            POSort sort,
+            SparkOperator quantJob,
+            FileSpec lFile,
+            FileSpec quantFile,
+            int rp, Pair<POProject, Byte>[] fields) throws PlanException {
+        SparkOperator sparkOper = startNew(lFile, quantJob);
+        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+        byte keyType = DataType.UNKNOWN;
+        if (fields == null) {
+            // This is project *
+            PhysicalPlan ep = new PhysicalPlan();
+            POProject prj = new POProject(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+            prj.setStar(true);
+            prj.setOverloaded(false);
+            prj.setResultType(DataType.TUPLE);
+            ep.add(prj);
+            eps1.add(ep);
+        } else {
+            /*
+            for (int i : fields) {
+                PhysicalPlan ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,
+                    nig.getNextNodeId(scope)));
+                prj.setColumn(i);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.BYTEARRAY);
+                ep.add(prj);
+                eps1.add(ep);
+            }
+            */
+            // Attach the sort plans to the local rearrange to get the
+            // projection.
+            eps1.addAll(sort.getSortPlans());
+
+            // Visit the first sort plan to figure out our key type.  We only
+            // have to visit the first because if we have more than one plan,
+            // then the key type will be tuple.
+            try {
+                FindKeyTypeVisitor fktv =
+                        new FindKeyTypeVisitor(sort.getSortPlans().get(0));
+                fktv.visit();
+                keyType = fktv.keyType;
+            } catch (VisitorException ve) {
+                int errCode = 2035;
+                String msg = "Internal error. Could not compute key type of 
sort operator.";
+                throw new PlanException(msg, errCode, PigException.BUG, ve);
+            }
+        }
+
+        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            int errCode = 2058;
+            String msg = "Unable to set index on newly created 
POLocalRearrange.";
+            throw new PlanException(msg, errCode, PigException.BUG, e);
+        }
+        lr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE :
+                keyType);
+        lr.setPlans(eps1);
+        lr.setResultType(DataType.TUPLE);
+        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+        sparkOper.physicalPlan.addAsLeaf(lr);
+
+        sparkOper.setGlobalSort(true);
+        pigContext.getProperties().setProperty("pig.reduce.keytype", 
Byte.toString(lr.getKeyType()));
+        sparkOper.requestedParallelism = rp;
+        sparkOper.physicalPlan.addAsLeaf(sort);
+
+        long limit = sort.getLimit();
+        if (limit != -1) {
+            POLimit pLimit2 = new POLimit(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+            pLimit2.setLimit(limit);
+            sparkOper.physicalPlan.addAsLeaf(pLimit2);
+            sparkOper.markLimitAfterSort();
+        }
+
+        return sparkOper;
+    }
+
+    /**
+     * Create a sampling job to collect statistics by sampling an input file. 
The sequence of operations is as
+     * following:
+     * <li>Transform input sample tuples into another tuple.</li>
+     * <li>Add an extra field &quot;all&quot; into the tuple </li>
+     * <li>Package all tuples into one bag </li>
+     * <li>Add constant field for number of reducers. </li>
+     * <li>Sorting the bag </li>
+     * <li>Invoke UDF with the number of reducers and the sorted bag.</li>
+     * <li>Data generated by UDF is stored into a file.</li>
+     *
+     * @param sort           the POSort operator used to sort the bag
+     * @param sampleOperator current sampling job
+     * @param rp             configured parallemism
+     * @param udfClassName   the class name of UDF
+     * @param udfArgs        the arguments of UDF
+     * @return pair<SparkOper,integer>
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    @SuppressWarnings("deprecation")
+    private SparkOperator getSamplingJob(POSort sort, SparkOperator 
sampleOperator, List<PhysicalPlan>
+            transformPlans,
+                                         int rp,
+                                         String udfClassName, String[] 
udfArgs) throws PlanException,
+            VisitorException, ExecException {
+        addSampleOperatorForSkewedJoin(sampleOperator);
+        List<Boolean> flat1 = new ArrayList<Boolean>();
+        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+
+        // if transform plans are not specified, project the columns of 
sorting keys
+        if (transformPlans == null) {
+            Pair<POProject, Byte>[] sortProjs = null;
+            try {
+                sortProjs = getSortCols(sort.getSortPlans());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            // Set up the projections of the key columns
+            if (sortProjs == null) {
+                PhysicalPlan ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,
+                        nig.getNextNodeId(scope)));
+                prj.setStar(true);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.TUPLE);
+                ep.add(prj);
+                eps1.add(ep);
+                flat1.add(false);
+            } else {
+                for (Pair<POProject, Byte> sortProj : sortProjs) {
+                    // Check for proj being null, null is used by getSortCols 
for a non POProject
+                    // operator. Since Order by does not allow expression 
operators,
+                    //it should never be set to null
+                    if (sortProj == null) {
+                        int errCode = 2174;
+                        String msg = "Internal exception. Could not create a 
sampler job";
+                        throw new SparkCompilerException(msg, errCode, 
PigException.BUG);
+                    }
+                    PhysicalPlan ep = new PhysicalPlan();
+                    POProject prj;
+                    try {
+                        prj = sortProj.first.clone();
+                    } catch (CloneNotSupportedException e) {
+                        //should not get here
+                        throw new AssertionError(
+                                "Error cloning project caught exception" + e
+                        );
+                    }
+                    ep.add(prj);
+                    eps1.add(ep);
+                    flat1.add(false);
+                }
+            }
+        } else {
+            for (int i = 0; i < transformPlans.size(); i++) {
+                eps1.add(transformPlans.get(i));
+                flat1.add(i == transformPlans.size() - 1 ? true : false);
+            }
+        }
+        // This foreach will pick the sort key columns from the 
RandomSampleLoader output
+        POForEach nfe1 = new POForEach(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, eps1, flat1);
+        sampleOperator.physicalPlan.addAsLeaf(nfe1);
+
+        //sort the sample
+        POSampleSortSpark poSparkSampleSort = new POSampleSortSpark(sort);
+        sampleOperator.physicalPlan.addAsLeaf(poSparkSampleSort);
+
+        // for the foreach
+        PhysicalPlan fe2Plan = new PhysicalPlan();
+        POProject topPrj = new POProject(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        topPrj.setColumn(1);
+        topPrj.setResultType(DataType.BAG);
+        topPrj.setOverloaded(true);
+        fe2Plan.add(topPrj);
+
+
+        // The plan which will have a constant representing the
+        // degree of parallelism for the final order by map-reduce job
+        // this will either come from a "order by parallel x" in the script
+        // or will be the default number of reducers for the cluster if
+        // "parallel x" is not used in the script
+        PhysicalPlan rpep = new PhysicalPlan();
+        ConstantExpression rpce = new ConstantExpression(new 
OperatorKey(scope, nig.getNextNodeId(scope)));
+        rpce.setRequestedParallelism(rp);
+
+        // We temporarily set it to rp and will adjust it at runtime, because 
the final degree of parallelism
+        // is unknown until we are ready to submit it. See PIG-2779.
+        rpce.setValue(rp);
+
+        rpce.setResultType(DataType.INTEGER);
+        rpep.add(rpce);
+
+        List<PhysicalPlan> genEps = new ArrayList<PhysicalPlan>();
+        genEps.add(rpep);
+        genEps.add(fe2Plan);
+
+        List<Boolean> flattened2 = new ArrayList<Boolean>();
+        flattened2.add(false);
+        flattened2.add(false);
+
+        POForEach nfe2 = new POForEach(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, genEps, flattened2);
+        sampleOperator.physicalPlan.addAsLeaf(nfe2);
+
+        // Let's connect the output from the foreach containing
+        // number of quantiles and the sorted bag of samples to
+        // another foreach with the FindQuantiles udf. The input
+        // to the FindQuantiles udf is a project(*) which takes the
+        // foreach input and gives it to the udf
+        PhysicalPlan ep4 = new PhysicalPlan();
+        POProject prjStar4 = new POProject(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        prjStar4.setResultType(DataType.TUPLE);
+        prjStar4.setStar(true);
+        ep4.add(prjStar4);
+
+        List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+        ufInps.add(prjStar4);
+
+        POUserFunc uf = new POUserFunc(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, ufInps,
+                new FuncSpec(udfClassName, udfArgs));
+        ep4.add(uf);
+        ep4.connect(prjStar4, uf);
+
+        List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
+        ep4s.add(ep4);
+        List<Boolean> flattened3 = new ArrayList<Boolean>();
+        flattened3.add(false);
+        POForEach nfe3 = new POForEach(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, ep4s, flattened3);
+
+        sampleOperator.physicalPlan.addAsLeaf(nfe3);
+
+        sampleOperator.requestedParallelism = 1;
+        sampleOperator.markSampler();
+        return sampleOperator;
+    }
+
+    private Pair<POProject, Byte>[] getSortCols(List<PhysicalPlan> plans) 
throws PlanException, ExecException {
+        if (plans != null) {
+            @SuppressWarnings("unchecked")
+            Pair<POProject, Byte>[] ret = new Pair[plans.size()];
+            int i = -1;
+            for (PhysicalPlan plan : plans) {
+                PhysicalOperator op = plan.getLeaves().get(0);
+                POProject proj;
+                if (op instanceof POProject) {
+                    if (((POProject) op).isStar()) return null;
+                    proj = (POProject) op;
+                } else {
+                    proj = null;
+                }
+                byte type = op.getResultType();
+                ret[++i] = new Pair<POProject, Byte>(proj, type);
+            }
+            return ret;
+        }
+        int errCode = 2026;
+        String msg = "No expression plan found in POSort.";
+        throw new PlanException(msg, errCode, PigException.BUG);
+    }
+
+    /**
+     * Create Sampling job for skewed join.
+     */
+    private SparkOperator getSkewedJoinSampleJob(POSkewedJoin skewedJoin) 
throws PlanException, VisitorException {
+        try {
+            SparkOperator sampleOperator = new SparkOperator(new 
OperatorKey(scope, nig.getNextNodeId(scope)));
+            sampleOperator.physicalPlan = 
compiledInputs[0].physicalPlan.clone();
+            MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = 
skewedJoin.getJoinPlans();
+
+            List<PhysicalOperator> l = 
physicalPlan.getPredecessors(skewedJoin);
+            List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+            List<Boolean> ascCol = new ArrayList<Boolean>();
+            for (int i = 0; i < groups.size(); i++) {
+                ascCol.add(false);
+            }
+
+            POSort sort = new POSort(skewedJoin.getOperatorKey(), 
skewedJoin.getRequestedParallelism(), null, groups,
+                    ascCol, null);
+
+            // set up transform plan to get keys and memory size of input 
tuples
+            // it first adds all the plans to get key columns,
+            List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
+            transformPlans.addAll(groups);
+
+            // then it adds a column for memory size
+            POProject prjStar = new POProject(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+            prjStar.setResultType(DataType.TUPLE);
+            prjStar.setStar(true);
+
+            List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+            ufInps.add(prjStar);
+
+            PhysicalPlan ep = new PhysicalPlan();
+            POUserFunc uf = new POUserFunc(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, ufInps,
+                    new FuncSpec(GetMemNumRows.class.getName(), (String[]) 
null));
+            uf.setResultType(DataType.TUPLE);
+            ep.add(uf);
+            ep.add(prjStar);
+            ep.connect(prjStar, uf);
+
+            transformPlans.add(ep);
+            // pass configurations to the User Function
+            String per = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage",
+                    
String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
+            String mc = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
+
+            return getSamplingJob(sort, sampleOperator, transformPlans, 
skewedJoin.getRequestedParallelism(),
+                    PartitionSkewedKeys.class.getName(), new String[]{per, 
mc});
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " +
+                    skewedJoin.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1783062&r1=1783061&r2=1783062&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 Wed Feb 15 04:28:47 2017
@@ -84,6 +84,15 @@ public class SparkOperator extends Opera
 
     private MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionMap 
= new MultiMap<OperatorKey, OperatorKey>();
 
+    // Indicates if a UDF comparator is used
+    boolean isUDFComparatorUsed = false;
+
+    //The quantiles file name if globalSort is true
+    private String quantFile;
+
+    //Indicates if this job is an order by job
+    private boolean globalSort = false;
+
     public SparkOperator(OperatorKey k) {
         super(k);
         physicalPlan = new PhysicalPlan();
@@ -277,4 +286,13 @@ public class SparkOperator extends Opera
     public MultiMap<OperatorKey, OperatorKey> 
getMultiQueryOptimizeConnectionItem() {
         return multiQueryOptimizeConnectionMap;
     }
+
+    public void setGlobalSort(boolean globalSort) {
+        this.globalSort = globalSort;
+    }
+
+    public boolean isGlobalSort() {
+        return globalSort;
+    }
+
 }


Reply via email to