Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings({ "serial" })
+public class PackageConverter implements RDDConverter<Tuple, Tuple, POPackage> 
{
+    private static final Log LOG = LogFactory.getLog(PackageConverter.class);
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+            POPackage physicalOperator) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        // package will generate the group from the result of the local
+        // rearrange
+        return rdd.map(new PackageFunction(physicalOperator),
+                SparkUtil.getManifest(Tuple.class));
+    }
+
+    private static class PackageFunction extends
+            AbstractFunction1<Tuple, Tuple> implements Serializable {
+
+        private final POPackage physicalOperator;
+
+        public PackageFunction(POPackage physicalOperator) {
+            this.physicalOperator = physicalOperator;
+        }
+
+        @Override
+        public Tuple apply(final Tuple t) {
+            // (key, Seq<Tuple>:{(index, key, value without key)})
+            if (LOG.isDebugEnabled())
+                LOG.debug("PackageFunction in " + t);
+            Result result;
+            try {
+                PigNullableWritable key = new PigNullableWritable() {
+
+                    public Object getValueAsPigType() {
+                        try {
+                            Object keyTuple = t.get(0);
+                            return keyTuple;
+                        } catch (ExecException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                };
+                final Iterator<Tuple> bagIterator = (Iterator<Tuple>) t.get(1);
+                Iterator<NullableTuple> iterator = new 
Iterator<NullableTuple>() {
+                    public boolean hasNext() {
+                        return bagIterator.hasNext();
+                    }
+
+                    public NullableTuple next() {
+                        try {
+                            // we want the value and index only
+                            Tuple next = bagIterator.next();
+                            NullableTuple nullableTuple = new NullableTuple(
+                                    (Tuple) next.get(1));
+                            nullableTuple.setIndex(((Number) next.get(0))
+                                    .byteValue());
+                            if (LOG.isDebugEnabled())
+                                LOG.debug("Setting index to " + next.get(0) +
+                                    " for tuple " + (Tuple)next.get(1));
+                            return nullableTuple;
+                        } catch (ExecException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+                physicalOperator.setInputs(null);
+                physicalOperator.attachInput(key, iterator);
+                result = physicalOperator.getNextTuple();
+            } catch (ExecException e) {
+                throw new RuntimeException(
+                        "Couldn't do Package on tuple: " + t, e);
+            }
+
+            if (result == null) {
+                throw new RuntimeException(
+                        "Null response found for Package on tuple: " + t);
+            }
+            Tuple out;
+            switch (result.returnStatus) {
+            case POStatus.STATUS_OK:
+                // (key, {(value)...})
+                if (LOG.isDebugEnabled())
+                    LOG.debug("PackageFunction out " + result.result);
+                out = (Tuple) result.result;
+                break;
+            case POStatus.STATUS_NULL:
+                out = null;
+                break;
+            default:
+                throw new RuntimeException(
+                        "Unexpected response code from operator "
+                                + physicalOperator + " : " + result + " "
+                                + result.returnStatus);
+            }
+            if (LOG.isDebugEnabled())
+                LOG.debug("PackageFunction out " + out);
+            return out;
+        }
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Utility class that handles secondary key for sorting.
+ */
+class PigSecondaryKeyComparatorSpark implements Comparator, Serializable {
+    private static final Log LOG = 
LogFactory.getLog(PigSecondaryKeyComparatorSpark.class);
+    private static final long serialVersionUID = 1L;
+
+    private static boolean[] secondarySortOrder;
+
+    public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) {
+        secondarySortOrder = pSecondarySortOrder;
+    }
+
+    //IndexedKeyPartitioner will put the tuple with same mainKey together, in 
PigSecondaryKeyComparatorSpark#compare
+    // (Object o1, Object o2)
+    //we only compare the secondaryKey
+    @Override
+    public int compare(Object o1, Object o2) {
+        Tuple t1 = (Tuple) o1;
+        Tuple t2 = (Tuple) o2;
+        try {
+            if ((t1.size() < 3) || (t2.size() < 3)) {
+                throw new RuntimeException("tuple size must bigger than 3, 
tuple[0] stands for index, tuple[1]" +
+                        "stands for the compound key, tuple[3] stands for the 
value");
+            }
+            Tuple compoundKey1 = (Tuple) t1.get(1);
+            Tuple compoundKey2 = (Tuple) t2.get(1);
+            if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
+                throw new RuntimeException("compoundKey size must bigger than, 
compoundKey[0] stands for firstKey," +
+                        "compoundKey[1] stands for secondaryKey");
+            }
+            Object secondaryKey1 = compoundKey1.get(1);
+            Object secondaryKey2 = compoundKey2.get(1);
+            int res = compareKeys(secondaryKey1, secondaryKey2, 
secondarySortOrder);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res);
+            }
+            return res;
+        } catch (ExecException e) {
+            throw new RuntimeException("Fail to get the compoundKey", e);
+        }
+    }
+
+    //compare the mainKey and secondaryKey
+    public int compareCompoundKey(Tuple compoundKey1, Tuple compoundKey2){
+        try {
+            if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
+                throw new RuntimeException("compoundKey size must bigger than, 
compoundKey[0] stands for firstKey," +
+                        "compoundKey[1] stands for secondaryKey");
+            }
+            Object mainKey1 = compoundKey1.get(0);
+            Object mainKey2 = compoundKey2.get(0);
+            int res = compareKeys(mainKey1,mainKey2, null);
+            if ( res !=0 ){
+                return res;
+            } else {
+                Object secondaryKey1 = compoundKey1.get(1);
+                Object secondaryKey2 = compoundKey2.get(1);
+                res = compareKeys(secondaryKey1, secondaryKey2, 
secondarySortOrder);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("compoundKey1:" + compoundKey1 + "compoundKey2:" 
+ compoundKey2 + " res:" + res);
+                }
+                return res;
+            }
+        } catch (ExecException e) {
+            throw new RuntimeException("Fail to get the compoundKey", e);
+        }
+    }
+
+    private int compareKeys(Object o1, Object o2, boolean[] asc) {
+        int rc = 0;
+        if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof 
Tuple) {
+            // objects are Tuples, we may need to apply sort order inside them
+            Tuple t1 = (Tuple) o1;
+            Tuple t2 = (Tuple) o2;
+            int sz1 = t1.size();
+            int sz2 = t2.size();
+            if (sz2 < sz1) {
+                return 1;
+            } else if (sz2 > sz1) {
+                return -1;
+            } else {
+                for (int i = 0; i < sz1; i++) {
+                    try {
+                        rc = DataType.compare(t1.get(i), t2.get(i));
+                        if (rc != 0 && asc != null && asc.length > 1 && 
!asc[i])
+                            rc *= -1;
+                        if ((t1.get(i) == null) || (t2.get(i) == null)) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("t1.get(i) is:" + t1.get(i) + " 
t2.get(i) is:" + t2.get(i));
+                            }
+                        }
+                        if (rc != 0) break;
+                    } catch (ExecException e) {
+                        throw new RuntimeException("Unable to compare tuples", 
e);
+                    }
+                }
+            }
+        } else {
+            // objects are NOT Tuples, delegate to DataType.compare()
+            rc = DataType.compare(o1, o2);
+        }
+        // apply sort order for keys that are not tuples or for whole tuples
+        if (asc != null && asc.length == 1 && !asc[0])
+            rc *= -1;
+        return rc;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class PoissonSampleConverter implements RDDConverter<Tuple, Tuple, 
POPoissonSampleSpark> {
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POPoissonSampleSpark po) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, po, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        PoissionSampleFunction poissionSampleFunction = new 
PoissionSampleFunction(po);
+        return rdd.toJavaRDD().mapPartitions(poissionSampleFunction, 
false).rdd();
+    }
+
+    private static class PoissionSampleFunction implements 
FlatMapFunction<Iterator<Tuple>, Tuple> {
+
+        private final POPoissonSampleSpark po;
+
+        public PoissionSampleFunction(POPoissonSampleSpark po) {
+            this.po = po;
+        }
+
+        @Override
+        public Iterable<Tuple> call(final Iterator<Tuple> tuples) {
+
+            return new Iterable<Tuple>() {
+
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(tuples) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            po.setInputs(null);
+                            po.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return po.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                            po.setEndOfInput(true);
+                        }
+                    };
+                }
+            };
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.util.List;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+import org.apache.spark.rdd.RDD;
+
+/**
+ * Given an RDD and a PhysicalOperater, and implementation of this class can
+ * convert the RDD to another RDD.
+ */
+public interface RDDConverter<IN, OUT, T extends PhysicalOperator> {
+    RDD<OUT> convert(List<RDD<IN>> rdd, T physicalOperator) throws IOException;
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+
+public class RankConverter implements RDDConverter<Tuple, Tuple, PORank> {
+
+    private static final Log LOG = LogFactory.getLog(RankConverter.class);
+    
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
+            throws IOException {
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
poRank);
+        SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
+                .mapToPair(new ToPairRdd());
+        JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
+                .groupByKey(parallelism);
+        JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
+                .mapToPair(new IndexCounters());
+        JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
+                .sortByKey(true, parallelism);
+        Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
+        JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
+                .map(new RankFunction(new HashMap<Integer, Long>(counts)));
+        return finalRdd.rdd();
+    }
+
+    @SuppressWarnings("serial")
+    private static class ToPairRdd implements 
+        PairFunction<Tuple, Integer, Long>, Serializable {
+
+        @Override
+        public Tuple2<Integer, Long> call(Tuple t) {
+            try {
+                Integer key = (Integer) t.get(0);
+                Long value = (Long) t.get(1);
+                return new Tuple2<Integer, Long>(key, value);
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    @SuppressWarnings("serial")
+    private static class IndexCounters implements 
+        PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>, 
+        Serializable {
+        @Override
+        public Tuple2<Integer, Long> call(Tuple2<Integer, 
+                Iterable<Long>> input) {
+            long lastVaue = 0L;
+            
+            for (Long t : input._2()) {
+                lastVaue = (t > lastVaue) ? t : lastVaue;
+            }
+
+            return new Tuple2<Integer, Long>(input._1(), lastVaue);
+        }
+    }
+    
+    @SuppressWarnings("serial")
+    private static class RankFunction implements Function<Tuple, Tuple>, 
+            Serializable {
+        private final HashMap<Integer, Long> counts;
+        
+        private RankFunction(HashMap<Integer, Long> counts) {
+            this.counts = counts;
+        }
+        
+        @Override
+        public Tuple call(Tuple input) throws Exception {
+            Tuple output = TupleFactory.getInstance()
+                    .newTuple(input.getAll().size() - 2);
+            
+            for (int i = 1; i < input.getAll().size() - 2; i ++) {
+                output.set(i, input.get(i+2));
+            }
+            
+            long offset = calculateOffset((Integer) input.get(0));
+            output.set(0, offset + (Long)input.get(2));
+            return output;
+        }
+        
+        private long calculateOffset(Integer index) {
+            long offset = 0;
+            
+            if (index > 0) {
+                for (int i = 0; i < index; i++) {
+                    if (counts.containsKey(i)) {
+                        offset += counts.get(i);
+                    }
+                }
+            }
+            return offset;
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.AbstractFunction2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.rdd.PairRDDFunctions;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings({"serial"})
+public class ReduceByConverter implements RDDConverter<Tuple, Tuple, 
POReduceBySpark> {
+    private static final Log LOG = LogFactory.getLog(ReduceByConverter.class);
+
+    private static final TupleFactory tf = TupleFactory.getInstance();
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark 
op) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, op, 1);
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
op);
+
+        RDD<Tuple> rdd = predecessors.get(0);
+        RDD<Tuple2<IndexedKey, Tuple>> rddPair
+                = rdd.map(new LocalRearrangeFunction(op.getLROp(), 
op.isUseSecondaryKey(), op.getSecondarySortOrder())
+                , SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
+        if (op.isUseSecondaryKey()) {
+            return SecondaryKeySortUtil.handleSecondarySort(rddPair, 
op.getPKGOp());
+        } else {
+            PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
+                    = new PairRDDFunctions<>(rddPair,
+                    SparkUtil.getManifest(IndexedKey.class),
+                    SparkUtil.getManifest(Tuple.class), null);
+
+            RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = 
pairRDDFunctions.reduceByKey(
+                    SparkUtil.getPartitioner(op.getCustomPartitioner(), 
parallelism),
+                    new MergeValuesFunction(op));
+            LOG.debug("Custom Partitioner and parallelims used : " + 
op.getCustomPartitioner() + ", " + parallelism);
+
+            return tupleRDD.map(new ToTupleFunction(op), 
SparkUtil.getManifest(Tuple.class));
+        }
+    }
+
+    private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
+            RDD<Tuple> rdd, POReduceBySpark op, int parallelism) {
+
+        RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new 
ToKeyNullValueFunction(),
+                SparkUtil.<Tuple, Object>getTuple2Manifest());
+
+        JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, 
Object>(rddPair,
+                SparkUtil.getManifest(Tuple.class),
+                SparkUtil.getManifest(Object.class));
+
+        //first sort the tuple by secondary key if enable useSecondaryKey sort
+        JavaPairRDD<Tuple, Object> sorted = 
pairRDD.repartitionAndSortWithinPartitions(
+                new HashPartitioner(parallelism),
+                new 
PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
+        JavaRDD<Tuple> jrdd = sorted.keys();
+        JavaRDD<Tuple2<IndexedKey, Tuple>> jrddPair = jrdd.map(new 
ToKeyValueFunction(op));
+        return jrddPair;
+    }
+
+    private static class ToKeyNullValueFunction extends
+            AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+            Serializable {
+
+        @Override
+        public Tuple2<Tuple, Object> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ToKeyNullValueFunction in " + t);
+            }
+
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ToKeyNullValueFunction out " + out);
+            }
+
+            return out;
+        }
+    }
+
+    /**
+     * Converts incoming locally rearranged tuple, which is of the form
+     * (index, key, value) into Tuple2<key, Tuple(key, value)>
+     */
+    private static class ToKeyValueFunction implements
+            Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
+
+        private POReduceBySpark poReduce = null;
+
+        public ToKeyValueFunction(POReduceBySpark poReduce) {
+            this.poReduce = poReduce;
+        }
+
+        @Override
+        public Tuple2<IndexedKey, Tuple> call(Tuple t) {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToKeyValueFunction in " + t);
+                }
+
+                Object key;
+                if ((poReduce != null) && (poReduce.isUseSecondaryKey())) {
+                    key = ((Tuple) t.get(1)).get(0);
+                } else {
+                    key = t.get(1);
+                }
+
+                Tuple tupleWithKey = tf.newTuple();
+                tupleWithKey.append(key);
+                tupleWithKey.append(t.get(2));
+
+                Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, 
Tuple>(new IndexedKey((Byte) t.get(0), key), tupleWithKey);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToKeyValueFunction out " + out);
+                }
+
+                return out;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Given two input tuples, this function outputs the resultant tuple.
+     * Additionally, it packages the input tuples to ensure the Algebraic 
Functions can work on them.
+     */
+    private static final class MergeValuesFunction extends 
AbstractFunction2<Tuple, Tuple, Tuple>
+            implements Serializable {
+        private final POReduceBySpark poReduce;
+
+        public MergeValuesFunction(POReduceBySpark poReduce) {
+            this.poReduce = poReduce;
+        }
+
+        @Override
+        public Tuple apply(Tuple v1, Tuple v2) {
+            LOG.debug("MergeValuesFunction in : " + v1 + " , " + v2);
+            Tuple result = tf.newTuple(2);
+            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
+            Tuple t = new DefaultTuple();
+            try {
+                // Package the input tuples so they can be processed by 
Algebraic functions.
+                Object key = v1.get(0);
+                if (key == null) {
+                    key = "";
+                } else {
+                    result.set(0, key);
+                }
+                bag.add((Tuple) v1.get(1));
+                bag.add((Tuple) v2.get(1));
+                t.append(key);
+                t.append(bag);
+
+                poReduce.getPKGOp().getPkgr().attachInput(key, new 
DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+                Tuple packagedTuple = (Tuple) 
poReduce.getPKGOp().getPkgr().getNext().result;
+
+                // Perform the operation
+                LOG.debug("MergeValuesFunction packagedTuple : " + t);
+                poReduce.attachInput(packagedTuple);
+                Result r = poReduce.getNext(poReduce.getResultType());
+
+                // Ensure output is consistent with the output of 
KeyValueFunction
+                // If we return r.result, the result will be something like 
this:
+                // (ABC,(2),(3)) - A tuple with key followed by values.
+                // But, we want the result to look like this:
+                // (ABC,((2),(3))) - A tuple with key and a value tuple 
(containing values).
+                // Hence, the construction of a new value tuple
+
+                Tuple valueTuple = tf.newTuple();
+                for (Object o : ((Tuple) r.result).getAll()) {
+                    if (!o.equals(key)) {
+                        valueTuple.append(o);
+                    }
+                }
+                result.set(1,valueTuple);
+                LOG.debug("MergeValuesFunction out : " + result);
+                return result;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * This function transforms the Tuple to ensure it is packaged as per 
requirements of the Operator's packager.
+     */
+    private static final class ToTupleFunction extends 
AbstractFunction1<Tuple2<IndexedKey, Tuple>, Tuple>
+            implements Serializable {
+
+        private final POReduceBySpark poReduce;
+
+        public ToTupleFunction(POReduceBySpark poReduce) {
+            this.poReduce = poReduce;
+        }
+
+        @Override
+        public Tuple apply(Tuple2<IndexedKey, Tuple> v1) {
+            LOG.debug("ToTupleFunction in : " + v1);
+            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
+            Tuple t = new DefaultTuple();
+            Tuple packagedTuple = null;
+            try {
+                Object key = v1._2().get(0);
+                bag.add((Tuple) v1._2().get(1));
+                t.append(key);
+                t.append(bag);
+                poReduce.getPKGOp().getPkgr().attachInput(key, new 
DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+                packagedTuple = (Tuple) 
poReduce.getPKGOp().getPkgr().getNext().result;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+            LOG.debug("ToTupleFunction out : " + packagedTuple);
+            return packagedTuple;
+        }
+    }
+
+    /**
+     * Converts incoming locally rearranged tuple, which is of the form
+     * (index, key, value) into Tuple2<key, Tuple(key, value)>
+     */
+    private static class LocalRearrangeFunction extends
+            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements 
Serializable {
+
+        private final POLocalRearrange lra;
+
+        private boolean useSecondaryKey;
+        private boolean[] secondarySortOrder;
+
+        public LocalRearrangeFunction(POLocalRearrange lra, boolean 
useSecondaryKey, boolean[] secondarySortOrder) {
+            if( useSecondaryKey ) {
+                this.useSecondaryKey = useSecondaryKey;
+                this.secondarySortOrder = secondarySortOrder;
+            }
+            this.lra = lra;
+        }
+
+        @Override
+        public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LocalRearrangeFunction in " + t);
+            }
+            Result result;
+            try {
+                lra.setInputs(null);
+                lra.attachInput(t);
+                result = lra.getNextTuple();
+
+                if (result == null) {
+                    throw new RuntimeException(
+                            "Null response found for LocalRearange on tuple: "
+                                    + t);
+                }
+
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        // (index, key, Tuple(key, value))
+                        Tuple resultTuple = (Tuple) result.result;
+                        Object key = resultTuple.get(1);
+                        IndexedKey indexedKey = new IndexedKey((Byte) 
resultTuple.get(0), key);
+                        if( useSecondaryKey) {
+                            indexedKey.setUseSecondaryKey(useSecondaryKey);
+                            
indexedKey.setSecondarySortOrder(secondarySortOrder);
+                        }
+                        Tuple outValue =  
TupleFactory.getInstance().newTuple();
+                        outValue.append(key);
+                        outValue.append(resultTuple.get(2));
+                        Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, 
Tuple>(indexedKey,
+                               outValue);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("LocalRearrangeFunction out " + out);
+                        }
+                        return out;
+                    default:
+                        throw new RuntimeException(
+                                "Unexpected response code from operator "
+                                        + lra + " : " + result);
+                }
+            } catch (ExecException e) {
+                throw new RuntimeException(
+                        "Couldn't do LocalRearange on tuple: " + t, e);
+            }
+        }
+
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Objects;
+
+import scala.Tuple2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+/**
+ * Provide utility functions which is used by ReducedByConverter and 
JoinGroupSparkConverter.
+ */
+public class SecondaryKeySortUtil {
+    private static final Log LOG = LogFactory
+            .getLog(SecondaryKeySortUtil.class);
+
+    public static RDD<Tuple> handleSecondarySort(
+            RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
+        JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, 
SparkUtil.getManifest(IndexedKey.class),
+                SparkUtil.getManifest(Tuple.class));
+
+        int partitionNums = pairRDD.partitions().size();
+        //repartition to group tuples with same indexedkey to same partition
+        JavaPairRDD<IndexedKey, Tuple> sorted = 
pairRDD.repartitionAndSortWithinPartitions(
+                new IndexedKeyPartitioner(partitionNums));
+        //Package tuples with same indexedkey as the result: 
(key,(val1,val2,val3,...))
+        return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
+    }
+
+    //Package tuples with same indexedkey as the result: 
(key,(val1,val2,val3,...))
+    //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the 
result
+    private static class AccumulateByKey implements 
FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
+            Serializable {
+        private POPackage pkgOp;
+
+        public AccumulateByKey(POPackage pkgOp) {
+            this.pkgOp = pkgOp;
+        }
+
+        @Override
+        public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> 
it) throws Exception {
+            return new Iterable<Tuple>() {
+                Object curKey = null;
+                ArrayList curValues = new ArrayList();
+                boolean initialized = false;
+
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new Iterator<Tuple>() {
+
+                        @Override
+                        public boolean hasNext() {
+                            return it.hasNext() || curKey != null;
+                        }
+
+                        @Override
+                        public Tuple next() {
+                            while (it.hasNext()) {
+                                Tuple2<IndexedKey, Tuple> t = it.next();
+                                //key changes, restruct the last tuple by 
curKey, curValues and return
+                                Object tMainKey = null;
+                                try {
+                                    tMainKey = ((Tuple) 
(t._1()).getKey()).get(0);
+
+                                    //If the key has changed and we've seen at 
least 1 already
+                                    if (initialized &&
+                                            ((curKey == null && tMainKey != 
null) ||
+                                                    (curKey != null && 
!curKey.equals(tMainKey)))){
+                                        Tuple result = restructTuple(curKey, 
new ArrayList(curValues));
+                                        curValues.clear();
+                                        curKey = tMainKey;
+                                        curValues.add(t._2());
+                                        return result;
+                                    }
+                                    curKey = tMainKey;
+                                    //if key does not change, just append the 
value to the same key
+                                    curValues.add(t._2());
+                                    initialized = true;
+
+                                } catch (ExecException e) {
+                                    throw new 
RuntimeException("AccumulateByKey throw exception: ", e);
+                                }
+                            }
+                            if (!initialized) {
+                                throw new RuntimeException("No tuples seen");
+                            }
+
+                            //if we get here, this should be the last record
+                            Tuple res = restructTuple(curKey, curValues);
+                            curKey = null;
+                            return res;
+                        }
+
+
+                        @Override
+                        public void remove() {
+                            // Not implemented.
+                            // throw Unsupported Method Invocation Exception.
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+            };
+        }
+
+        private Tuple restructTuple(final Object curKey, final 
ArrayList<Tuple> curValues) {
+            try {
+                Tuple retVal = null;
+                PigNullableWritable retKey = new PigNullableWritable() {
+
+                    public Object getValueAsPigType() {
+                        return curKey;
+                    }
+                };
+
+                //Here restruct a tupleIterator, later POPackage#tupIter will 
use it.
+                final Iterator<Tuple> tupleItearator = curValues.iterator();
+                Iterator<NullableTuple> iterator = new 
Iterator<NullableTuple>() {
+                    public boolean hasNext() {
+                        return tupleItearator.hasNext();
+                    }
+
+                    public NullableTuple next() {
+                        Tuple t = tupleItearator.next();
+                        return new NullableTuple(t);
+                    }
+
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+                pkgOp.setInputs(null);
+                pkgOp.attachInput(retKey, iterator);
+                Result res = pkgOp.getNextTuple();
+                if (res.returnStatus == POStatus.STATUS_OK) {
+                    retVal = (Tuple) res.result;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("AccumulateByKey out: " + retVal);
+                }
+                return retVal;
+            } catch (ExecException e) {
+                throw new RuntimeException("AccumulateByKey#restructTuple 
throws exception: ", e);
+            }
+        }
+    }
+
+    //Group tuples with same IndexKey into same partition
+    private static class IndexedKeyPartitioner extends Partitioner {
+        private int partition;
+
+        public IndexedKeyPartitioner(int partition) {
+            this.partition = partition;
+        }
+
+        @Override
+        public int getPartition(Object obj) {
+            IndexedKey indexedKey = (IndexedKey) obj;
+            Tuple key = (Tuple) indexedKey.getKey();
+
+            int hashCode = 0;
+            try {
+                hashCode = Objects.hashCode(key.get(0));
+            } catch (ExecException e) {
+                throw new 
RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
+            }
+            return Math.abs(hashCode) % partition;
+        }
+
+        @Override
+        public int numPartitions() {
+            return partition;
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.util.Pair;
+import org.apache.spark.Partitioner;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+public class SkewedJoinConverter implements
+        RDDConverter<Tuple, Tuple, POSkewedJoin>, Serializable {
+
+    private static Log log = LogFactory.getLog(SkewedJoinConverter.class);
+
+    private POLocalRearrange[] LRs;
+    private POSkewedJoin poSkewedJoin;
+
+    private String skewedJoinPartitionFile;
+
+    public void setSkewedJoinPartitionFile(String partitionFile) {
+        skewedJoinPartitionFile = partitionFile;
+    }
+
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POSkewedJoin poSkewedJoin) throws IOException {
+
+        SparkUtil.assertPredecessorSize(predecessors, poSkewedJoin, 2);
+        LRs = new POLocalRearrange[2];
+        this.poSkewedJoin = poSkewedJoin;
+
+        createJoinPlans(poSkewedJoin.getJoinPlans());
+
+        // extract the two RDDs
+        RDD<Tuple> rdd1 = predecessors.get(0);
+        RDD<Tuple> rdd2 = predecessors.get(1);
+
+        Broadcast<List<Tuple>> keyDist = 
SparkPigContext.get().getBroadcastedVars().get(skewedJoinPartitionFile);
+
+        // if no keyDist,  we need  defaultParallelism
+        Integer defaultParallelism = 
SparkPigContext.get().getParallelism(predecessors, poSkewedJoin);
+
+        // with partition id
+        SkewPartitionIndexKeyFunction skewFun = new 
SkewPartitionIndexKeyFunction(this, keyDist, defaultParallelism);
+        RDD<Tuple2<PartitionIndexedKey, Tuple>> skewIdxKeyRDD = 
rdd1.map(skewFun,
+                SparkUtil.<PartitionIndexedKey, Tuple>getTuple2Manifest());
+
+        // Tuple2 RDD to Pair RDD
+        JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD = new 
JavaPairRDD<PartitionIndexedKey, Tuple>(
+                skewIdxKeyRDD, 
SparkUtil.getManifest(PartitionIndexedKey.class),
+                SparkUtil.getManifest(Tuple.class));
+
+        // with partition id
+        StreamPartitionIndexKeyFunction streamFun = new 
StreamPartitionIndexKeyFunction(this, keyDist, defaultParallelism);
+        JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = 
rdd2.toJavaRDD().flatMap(streamFun);
+
+        // Tuple2 RDD to Pair RDD
+        JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD = new 
JavaPairRDD<PartitionIndexedKey, Tuple>(
+                streamIdxKeyJavaRDD.rdd(), 
SparkUtil.getManifest(PartitionIndexedKey.class),
+                SparkUtil.getManifest(Tuple.class));
+
+        JavaRDD<Tuple> result = doJoin(skewIndexedJavaPairRDD,
+                streamIndexedJavaPairRDD,
+                buildPartitioner(keyDist, defaultParallelism),
+                keyDist);
+
+        // return type is RDD<Tuple>, so take it from JavaRDD<Tuple>
+        return result.rdd();
+    }
+
+    private void createJoinPlans(MultiMap<PhysicalOperator, PhysicalPlan> 
inpPlans) throws PlanException {
+
+        int i = -1;
+        for (PhysicalOperator inpPhyOp : inpPlans.keySet()) {
+            ++i;
+            POLocalRearrange lr = new POLocalRearrange(genKey());
+            try {
+                lr.setIndex(i);
+            } catch (ExecException e) {
+                throw new PlanException(e.getMessage(), e.getErrorCode(), 
e.getErrorSource(), e);
+            }
+            lr.setResultType(DataType.TUPLE);
+            lr.setKeyType(DataType.TUPLE);//keyTypes.get(i).size() > 1 ? 
DataType.TUPLE : keyTypes.get(i).get(0));
+            lr.setPlans(inpPlans.get(inpPhyOp));
+            LRs[i] = lr;
+        }
+    }
+
+    private OperatorKey genKey() {
+        return new OperatorKey(poSkewedJoin.getOperatorKey().scope, 
NodeIdGenerator.getGenerator().getNextNodeId(poSkewedJoin.getOperatorKey().scope));
+    }
+
+    /**
+     * @param <L> be generic because it can be Optional<Tuple> or Tuple
+     * @param <R> be generic because it can be Optional<Tuple> or Tuple
+     */
+    private static class ToValueFunction<L, R> implements
+            FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, 
R>>>, Tuple>, Serializable {
+
+        private boolean[] innerFlags;
+        private int[] schemaSize;
+
+        private final Broadcast<List<Tuple>> keyDist;
+
+        transient private boolean initialized = false;
+        transient protected Map<Tuple, Pair<Integer, Integer>> reducerMap;
+
+        public ToValueFunction(boolean[] innerFlags, int[] schemaSize, 
Broadcast<List<Tuple>> keyDist) {
+            this.innerFlags = innerFlags;
+            this.schemaSize = schemaSize;
+            this.keyDist = keyDist;
+        }
+
+        private class Tuple2TransformIterable implements Iterable<Tuple> {
+
+            Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> in;
+
+            Tuple2TransformIterable(
+                    Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) 
{
+                in = input;
+            }
+
+            public Iterator<Tuple> iterator() {
+                return new IteratorTransform<Tuple2<PartitionIndexedKey, 
Tuple2<L, R>>, Tuple>(
+                        in) {
+                    @Override
+                    protected Tuple transform(
+                            Tuple2<PartitionIndexedKey, Tuple2<L, R>> next) {
+                        try {
+
+                            L left = next._2._1;
+                            R right = next._2._2;
+
+                            TupleFactory tf = TupleFactory.getInstance();
+                            Tuple result = tf.newTuple();
+
+                            Tuple leftTuple = tf.newTuple();
+                            if (!innerFlags[0]) {
+                                // left should be Optional<Tuple>
+                                Optional<Tuple> leftOption = (Optional<Tuple>) 
left;
+                                if (!leftOption.isPresent()) {
+                                    // Add an empty left record for RIGHT 
OUTER JOIN.
+                                    // Notice: if it is a skewed, only join 
the first reduce key
+                                    if (isFirstReduceKey(next._1)) {
+                                        for (int i = 0; i < schemaSize[0]; 
i++) {
+                                            leftTuple.append(null);
+                                        }
+                                    } else {
+                                        return this.next();
+                                    }
+                                } else {
+                                    leftTuple = leftOption.get();
+                                }
+                            } else {
+                                leftTuple = (Tuple) left;
+                            }
+                            for (int i = 0; i < leftTuple.size(); i++) {
+                                result.append(leftTuple.get(i));
+                            }
+
+                            Tuple rightTuple = tf.newTuple();
+                            if (!innerFlags[1]) {
+                                // right should be Optional<Tuple>
+                                Optional<Tuple> rightOption = 
(Optional<Tuple>) right;
+                                if (!rightOption.isPresent()) {
+                                    for (int i = 0; i < schemaSize[1]; i++) {
+                                        rightTuple.append(null);
+                                    }
+                                } else {
+                                    rightTuple = rightOption.get();
+                                }
+                            } else {
+                                rightTuple = (Tuple) right;
+                            }
+                            for (int i = 0; i < rightTuple.size(); i++) {
+                                result.append(rightTuple.get(i));
+                            }
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("MJC: Result = " + 
result.toDelimitedString(" "));
+                            }
+
+                            return result;
+                        } catch (Exception e) {
+                            log.warn(e);
+                        }
+                        return null;
+                    }
+                };
+            }
+        }
+
+        @Override
+        public Iterable<Tuple> call(
+                Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) {
+            return new Tuple2TransformIterable(input);
+        }
+
+        private boolean isFirstReduceKey(PartitionIndexedKey pKey) {
+            // non-skewed key
+            if (pKey.getPartitionId() == -1) {
+                return true;
+            }
+
+            if (!initialized) {
+                Integer[] reducers = new Integer[1];
+                reducerMap = loadKeyDistribution(keyDist, reducers);
+                initialized = true;
+            }
+
+            Pair<Integer, Integer> indexes = reducerMap.get(pKey.getKey());
+            if (indexes != null && pKey.getPartitionId() != indexes.first) {
+                // return false only when the key is skewed
+                // and it is not the first reduce key.
+                return false;
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     * Utility function.
+     * 1. Get parallelism
+     * 2. build a key distribution map from the broadcasted key distribution 
file
+     *
+     * @param keyDist
+     * @param totalReducers
+     * @return
+     */
+    private static Map<Tuple, Pair<Integer, Integer>> 
loadKeyDistribution(Broadcast<List<Tuple>> keyDist,
+                                                                          
Integer[] totalReducers) {
+        Map<Tuple, Pair<Integer, Integer>> reducerMap = new HashMap<>();
+        totalReducers[0] = -1; // set a default value
+
+        if (keyDist == null || keyDist.value() == null || 
keyDist.value().size() == 0) {
+            // this could happen if sampling is empty
+            log.warn("Empty dist file: ");
+            return reducerMap;
+        }
+
+        try {
+            final TupleFactory tf = TupleFactory.getInstance();
+
+            Tuple t = keyDist.value().get(0);
+
+            Map<String, Object> distMap = (Map<String, Object>) t.get(0);
+            DataBag partitionList = (DataBag) 
distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+
+            totalReducers[0] = Integer.valueOf("" + 
distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+
+            Iterator<Tuple> it = partitionList.iterator();
+            while (it.hasNext()) {
+                Tuple idxTuple = it.next();
+                Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+                Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+                // Used to replace the maxIndex with the number of reducers
+                if (maxIndex < minIndex) {
+                    maxIndex = totalReducers[0] + maxIndex;
+                }
+
+                // remove the last 2 fields of the tuple, i.e: minIndex and 
maxIndex and store
+                // it in the reducer map
+                Tuple keyTuple = tf.newTuple();
+                for (int i = 0; i < idxTuple.size() - 2; i++) {
+                    keyTuple.append(idxTuple.get(i));
+                }
+
+                // number of reducers
+                Integer cnt = maxIndex - minIndex;
+                reducerMap.put(keyTuple, new Pair(minIndex, cnt));
+            }
+
+        } catch (ExecException e) {
+            log.warn(e.getMessage());
+        }
+
+        return reducerMap;
+    }
+
+    private static class PartitionIndexedKey extends IndexedKey {
+        // for user defined partitioner
+        int partitionId;
+
+        public PartitionIndexedKey(byte index, Object key) {
+            super(index, key);
+            partitionId = -1;
+        }
+
+        public PartitionIndexedKey(byte index, Object key, int pid) {
+            super(index, key);
+            partitionId = pid;
+        }
+
+        public int getPartitionId() {
+            return partitionId;
+        }
+
+        private void setPartitionId(int pid) {
+            partitionId = pid;
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionIndexedKey{" +
+                    "index=" + getIndex() +
+                    ", partitionId=" + getPartitionId() +
+                    ", key=" + getKey() +
+                    '}';
+        }
+    }
+
+    /**
+     * append a Partition id to the records from skewed table.
+     * so that the SkewedJoinPartitioner can send skewed records to different 
reducer
+     * <p>
+     * see: https://wiki.apache.org/pig/PigSkewedJoinSpec
+     */
+    private static class SkewPartitionIndexKeyFunction extends
+            AbstractFunction1<Tuple, Tuple2<PartitionIndexedKey, Tuple>> 
implements
+            Serializable {
+
+        private final SkewedJoinConverter poSkewedJoin;
+
+        private final Broadcast<List<Tuple>> keyDist;
+        private final Integer defaultParallelism;
+
+        transient private boolean initialized = false;
+        transient protected Map<Tuple, Pair<Integer, Integer>> reducerMap;
+        transient private Integer parallelism = -1;
+        transient private Map<Tuple, Integer> currentIndexMap;
+
+        public SkewPartitionIndexKeyFunction(SkewedJoinConverter poSkewedJoin,
+                                             Broadcast<List<Tuple>> keyDist,
+                                             Integer defaultParallelism) {
+            this.poSkewedJoin = poSkewedJoin;
+            this.keyDist = keyDist;
+            this.defaultParallelism = defaultParallelism;
+        }
+
+        @Override
+        public Tuple2<PartitionIndexedKey, Tuple> apply(Tuple tuple) {
+            // attach tuple to LocalRearrange
+            poSkewedJoin.LRs[0].attachInput(tuple);
+
+            try {
+                Result lrOut = poSkewedJoin.LRs[0].getNextTuple();
+
+                // If tuple is (AA, 5) and key index is $1, then it lrOut is 0 
5
+                // (AA), so get(1) returns key
+                Byte index = (Byte) ((Tuple) lrOut.result).get(0);
+                Object key = ((Tuple) lrOut.result).get(1);
+
+                Tuple keyTuple = (Tuple) key;
+                int partitionId = getPartitionId(keyTuple);
+                PartitionIndexedKey pIndexKey = new PartitionIndexedKey(index, 
keyTuple, partitionId);
+
+                // make a (key, value) pair
+                Tuple2<PartitionIndexedKey, Tuple> tuple_KeyValue = new 
Tuple2<PartitionIndexedKey, Tuple>(
+                        pIndexKey,
+                        tuple);
+
+                return tuple_KeyValue;
+            } catch (Exception e) {
+                System.out.print(e);
+                return null;
+            }
+        }
+
+        private Integer getPartitionId(Tuple keyTuple) {
+            if (!initialized) {
+                Integer[] reducers = new Integer[1];
+                reducerMap = loadKeyDistribution(keyDist, reducers);
+                parallelism = reducers[0];
+
+                if (parallelism <= 0) {
+                    parallelism = defaultParallelism;
+                }
+
+                currentIndexMap = Maps.newHashMap();
+
+                initialized = true;
+            }
+
+            // for partition table, compute the index based on the sampler 
output
+            Pair<Integer, Integer> indexes;
+            Integer curIndex = -1;
+
+            indexes = reducerMap.get(keyTuple);
+
+            // if the reducerMap does not contain the key return -1 so that the
+            // partitioner will do the default hash based partitioning
+            if (indexes == null) {
+                return -1;
+            }
+
+            if (currentIndexMap.containsKey(keyTuple)) {
+                curIndex = currentIndexMap.get(keyTuple);
+            }
+
+            if (curIndex >= (indexes.first + indexes.second) || curIndex == 
-1) {
+                curIndex = indexes.first;
+            } else {
+                curIndex++;
+            }
+
+            // set it in the map
+            currentIndexMap.put(keyTuple, curIndex);
+            return (curIndex % parallelism);
+        }
+
+    }
+
+    /**
+     * POPartitionRearrange is not used in spark mode now,
+     * Here, use flatMap and CopyStreamWithPidFunction to copy the
+     * stream records to the multiple reducers
+     * <p>
+     * see: https://wiki.apache.org/pig/PigSkewedJoinSpec
+     */
+    private static class StreamPartitionIndexKeyFunction implements 
FlatMapFunction<Tuple, Tuple2<PartitionIndexedKey, Tuple>> {
+
+        private SkewedJoinConverter poSkewedJoin;
+        private final Broadcast<List<Tuple>> keyDist;
+        private final Integer defaultParallelism;
+
+        private transient boolean initialized = false;
+        protected transient Map<Tuple, Pair<Integer, Integer>> reducerMap;
+        private transient Integer parallelism;
+
+        public StreamPartitionIndexKeyFunction(SkewedJoinConverter 
poSkewedJoin,
+                                               Broadcast<List<Tuple>> keyDist,
+                                               Integer defaultParallelism) {
+            this.poSkewedJoin = poSkewedJoin;
+            this.keyDist = keyDist;
+            this.defaultParallelism = defaultParallelism;
+        }
+
+        public Iterable<Tuple2<PartitionIndexedKey, Tuple>> call(Tuple tuple) 
throws Exception {
+            if (!initialized) {
+                Integer[] reducers = new Integer[1];
+                reducerMap = loadKeyDistribution(keyDist, reducers);
+                parallelism = reducers[0];
+                if (parallelism <= 0) {
+                    parallelism = defaultParallelism;
+                }
+                initialized = true;
+            }
+
+            // streamed table
+            poSkewedJoin.LRs[1].attachInput(tuple);
+            Result lrOut = poSkewedJoin.LRs[1].getNextTuple();
+
+            Byte index = (Byte) ((Tuple) lrOut.result).get(0);
+            Tuple key = (Tuple) ((Tuple) lrOut.result).get(1);
+
+            ArrayList<Tuple2<PartitionIndexedKey, Tuple>> l = new ArrayList();
+            Pair<Integer, Integer> indexes = reducerMap.get(key);
+
+            // For non skewed keys, we set the partition index to be -1
+            // so that the partitioner will do the default hash based 
partitioning
+            if (indexes == null) {
+                indexes = new Pair<>(-1, 0);
+            }
+
+            for (Integer reducerIdx = indexes.first, cnt = 0; cnt <= 
indexes.second; reducerIdx++, cnt++) {
+                if (reducerIdx >= parallelism) {
+                    reducerIdx = 0;
+                }
+
+                // set the partition index
+                int partitionId = reducerIdx.intValue();
+                PartitionIndexedKey pIndexKey = new PartitionIndexedKey(index, 
key, partitionId);
+
+                l.add(new Tuple2(pIndexKey, tuple));
+            }
+
+            return l;
+        }
+    }
+
+    /**
+     * user defined spark partitioner for skewed join
+     */
+    private static class SkewedJoinPartitioner extends Partitioner {
+        private int numPartitions;
+
+        public SkewedJoinPartitioner(int parallelism) {
+            numPartitions = parallelism;
+        }
+
+        @Override
+        public int numPartitions() {
+            return numPartitions;
+        }
+
+        @Override
+        public int getPartition(Object IdxKey) {
+            if (IdxKey instanceof PartitionIndexedKey) {
+                int partitionId = ((PartitionIndexedKey) 
IdxKey).getPartitionId();
+                if (partitionId >= 0) {
+                    return partitionId;
+                }
+            }
+
+            //else: by default using hashcode
+            Tuple key = (Tuple) ((PartitionIndexedKey) IdxKey).getKey();
+
+
+            int code = key.hashCode() % numPartitions;
+            if (code >= 0) {
+                return code;
+            } else {
+                return code + numPartitions;
+            }
+        }
+    }
+
+    /**
+     * use parallelism from keyDist or the default parallelism to
+     * create user defined partitioner
+     *
+     * @param keyDist
+     * @param defaultParallelism
+     * @return
+     */
+    private SkewedJoinPartitioner buildPartitioner(Broadcast<List<Tuple>> 
keyDist, Integer defaultParallelism) {
+        Integer parallelism = -1;
+        Integer[] reducers = new Integer[1];
+        loadKeyDistribution(keyDist, reducers);
+        parallelism = reducers[0];
+        if (parallelism <= 0) {
+            parallelism = defaultParallelism;
+        }
+
+        return new SkewedJoinPartitioner(parallelism);
+    }
+
+    /**
+     * do all kinds of Join (inner, left outer, right outer, full outer)
+     *
+     * @param skewIndexedJavaPairRDD
+     * @param streamIndexedJavaPairRDD
+     * @param partitioner
+     * @return
+     */
+    private JavaRDD<Tuple> doJoin(
+            JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD,
+            JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD,
+            SkewedJoinPartitioner partitioner,
+            Broadcast<List<Tuple>> keyDist) {
+
+        boolean[] innerFlags = poSkewedJoin.getInnerFlags();
+        int[] schemaSize = {0, 0};
+        for (int i = 0; i < 2; i++) {
+            if (poSkewedJoin.getSchema(i) != null) {
+                schemaSize[i] = poSkewedJoin.getSchema(i).size();
+            }
+        }
+
+        ToValueFunction toValueFun = new ToValueFunction(innerFlags, 
schemaSize, keyDist);
+
+        if (innerFlags[0] && innerFlags[1]) {
+            // inner join
+            JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> 
resultKeyValue = skewIndexedJavaPairRDD.
+                    join(streamIndexedJavaPairRDD, partitioner);
+
+            return resultKeyValue.mapPartitions(toValueFun);
+        } else if (innerFlags[0] && !innerFlags[1]) {
+            // left outer join
+            JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Optional<Tuple>>> 
resultKeyValue = skewIndexedJavaPairRDD.
+                    leftOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+            return resultKeyValue.mapPartitions(toValueFun);
+        } else if (!innerFlags[0] && innerFlags[1]) {
+            // right outer join
+            JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Tuple>> 
resultKeyValue = skewIndexedJavaPairRDD.
+                    rightOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+            return resultKeyValue.mapPartitions(toValueFun);
+        } else {
+            // full outer join
+            JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, 
Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
+                    fullOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+            return resultKeyValue.mapPartitions(toValueFun);
+        }
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings("serial")
+public class SortConverter implements RDDConverter<Tuple, Tuple, POSort> {
+    private static final Log LOG = LogFactory.getLog(SortConverter.class);
+
+    private static final FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, 
Tuple> TO_VALUE_FUNCTION = new ToValueFunction();
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSort 
sortOperator)
+            throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
sortOperator);
+        RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(),
+                SparkUtil.<Tuple, Object> getTuple2Manifest());
+
+        JavaPairRDD<Tuple, Object> r = new JavaPairRDD<Tuple, Object>(rddPair,
+                SparkUtil.getManifest(Tuple.class),
+                SparkUtil.getManifest(Object.class));
+
+        JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
+                sortOperator.getMComparator(), true, parallelism);
+        JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION);
+
+        return mapped.rdd();
+    }
+
+    private static class ToValueFunction implements
+            FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, 
Serializable {
+
+        private class Tuple2TransformIterable implements Iterable<Tuple> {
+
+            Iterator<Tuple2<Tuple, Object>> in;
+
+            Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) {
+                in = input;
+            }
+
+            public Iterator<Tuple> iterator() {
+                return new IteratorTransform<Tuple2<Tuple, Object>, Tuple>(in) 
{
+                    @Override
+                    protected Tuple transform(Tuple2<Tuple, Object> next) {
+                        return next._1();
+                    }
+                };
+            }
+        }
+
+        @Override
+        public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
+            return new Tuple2TransformIterable(input);
+        }
+    }
+
+    private static class ToKeyValueFunction extends
+            AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+            Serializable {
+
+        @Override
+        public Tuple2<Tuple, Object> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sort ToKeyValueFunction in " + t);
+            }
+            Tuple key = t;
+            Object value = null;
+            // (key, value)
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sort ToKeyValueFunction out " + out);
+            }
+            return out;
+        }
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.rdd.RDD;
+  /*
+   sort the sample data and convert the sample data to the format 
(all,{(sampleEle1),(sampleEle2),...})
+
+   */
+@SuppressWarnings("serial")
+public class SparkSampleSortConverter implements RDDConverter<Tuple, Tuple, 
POSampleSortSpark> {
+    private static final Log LOG = 
LogFactory.getLog(SparkSampleSortConverter.class);
+    private static TupleFactory tf = TupleFactory.getInstance();
+    private static BagFactory bf = DefaultBagFactory.getInstance();
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSampleSortSpark 
sortOperator)
+            throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(),
+                SparkUtil.<Tuple, Object> getTuple2Manifest());
+
+        JavaPairRDD<Tuple, Object> r = new JavaPairRDD<Tuple, Object>(rddPair,
+                SparkUtil.getManifest(Tuple.class),
+                SparkUtil.getManifest(Object.class));
+         //sort sample data
+        JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true);
+         //convert every element in sample data from element to (all, element) 
format
+        JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(new 
AggregateFunction());
+        //use groupByKey to aggregate all values( the format will be 
((all),{(sampleEle1),(sampleEle2),...} )
+        JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new 
ToValueFunction());
+        return  groupByKey.rdd();
+    }
+
+
+    private static class MergeFunction implements 
org.apache.spark.api.java.function.Function2<Tuple, Tuple, Tuple>
+            , Serializable {
+
+        @Override
+        public Tuple call(Tuple v1, Tuple v2) {
+                Tuple res = tf.newTuple();
+                res.append(v1);
+                res.append(v2);
+                LOG.info("MergeFunction out:"+res);
+                return res;
+        }
+    }
+
+    // input: Tuple2<Tuple,Object>
+    // output: Tuple2("all", Tuple)
+    private static class AggregateFunction implements
+            PairFlatMapFunction<Iterator<Tuple2<Tuple, Object>>, 
String,Tuple>, Serializable {
+
+        private class Tuple2TransformIterable implements 
Iterable<Tuple2<String,Tuple>> {
+
+            Iterator<Tuple2<Tuple, Object>> in;
+
+            Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) {
+                in = input;
+            }
+
+            public Iterator<Tuple2<String,Tuple>> iterator() {
+                return new IteratorTransform<Tuple2<Tuple, Object>, 
Tuple2<String,Tuple>>(in) {
+                    @Override
+                    protected Tuple2<String,Tuple> transform(Tuple2<Tuple, 
Object> next) {
+                        LOG.info("AggregateFunction in:"+ next._1()) ;
+                        return new Tuple2<String,Tuple>("all",next._1());
+                    }
+                };
+            }
+        }
+
+        @Override
+        public Iterable<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, 
Object>> input) throws Exception {
+            return new Tuple2TransformIterable(input);
+        }
+
+    }
+
+    private static class ToValueFunction implements Function<Tuple2<String, 
Iterable<Tuple>>, Tuple> {
+        @Override
+        public Tuple call(Tuple2<String, Iterable<Tuple>> next) throws 
Exception {
+            Tuple res = tf.newTuple();
+            res.append(next._1());
+            Iterator<Tuple> iter = next._2().iterator();
+            DataBag bag = bf.newDefaultBag();
+            while(iter.hasNext()) {
+                bag.add(iter.next());
+            }
+            res.append(bag);
+            LOG.info("ToValueFunction1 out:" + res);
+            return res;
+        }
+    }
+
+    private static class ToKeyValueFunction extends
+            AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+            Serializable {
+
+        @Override
+        public Tuple2<Tuple, Object> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Sort ToKeyValueFunction in " + t);
+            }
+            Tuple key = t;
+            Object value = null;
+            // (key, value)
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Sort ToKeyValueFunction out " + out);
+            }
+            return out;
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.util.List;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.rdd.RDD;
+
+public class SplitConverter implements RDDConverter<Tuple, Tuple, POSplit> {
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSplit poSplit)
+            throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, poSplit, 1);
+        return predecessors.get(0);
+    }
+
+}


Reply via email to