Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +import scala.runtime.AbstractFunction1; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.spark.rdd.RDD; + +@SuppressWarnings({ "serial" }) +public class PackageConverter implements RDDConverter<Tuple, Tuple, POPackage> { + private static final Log LOG = LogFactory.getLog(PackageConverter.class); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POPackage physicalOperator) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); + RDD<Tuple> rdd = predecessors.get(0); + // package will generate the group from the result of the local + // rearrange + return rdd.map(new PackageFunction(physicalOperator), + SparkUtil.getManifest(Tuple.class)); + } + + private static class PackageFunction extends + AbstractFunction1<Tuple, Tuple> implements Serializable { + + private final POPackage physicalOperator; + + public PackageFunction(POPackage physicalOperator) { + this.physicalOperator = physicalOperator; + } + + @Override + public Tuple apply(final Tuple t) { + // (key, Seq<Tuple>:{(index, key, value without key)}) + if (LOG.isDebugEnabled()) + LOG.debug("PackageFunction in " + t); + Result result; + try { + PigNullableWritable key = new PigNullableWritable() { + + public Object getValueAsPigType() { + try { + Object keyTuple = t.get(0); + return keyTuple; + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + }; + final Iterator<Tuple> bagIterator = (Iterator<Tuple>) t.get(1); + Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() { + public boolean hasNext() { + return bagIterator.hasNext(); + } + + public NullableTuple next() { + try { + // we want the value and index only + Tuple next = bagIterator.next(); + NullableTuple nullableTuple = new NullableTuple( + (Tuple) next.get(1)); + nullableTuple.setIndex(((Number) next.get(0)) + .byteValue()); + if (LOG.isDebugEnabled()) + LOG.debug("Setting index to " + next.get(0) + + " for tuple " + (Tuple)next.get(1)); + return nullableTuple; + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + physicalOperator.setInputs(null); + physicalOperator.attachInput(key, iterator); + result = physicalOperator.getNextTuple(); + } catch (ExecException e) { + throw new RuntimeException( + "Couldn't do Package on tuple: " + t, e); + } + + if (result == null) { + throw new RuntimeException( + "Null response found for Package on tuple: " + t); + } + Tuple out; + switch (result.returnStatus) { + case POStatus.STATUS_OK: + // (key, {(value)...}) + if (LOG.isDebugEnabled()) + LOG.debug("PackageFunction out " + result.result); + out = (Tuple) result.result; + break; + case POStatus.STATUS_NULL: + out = null; + break; + default: + throw new RuntimeException( + "Unexpected response code from operator " + + physicalOperator + " : " + result + " " + + result.returnStatus); + } + if (LOG.isDebugEnabled()) + LOG.debug("PackageFunction out " + out); + return out; + } + } + +}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.Serializable; +import java.util.Comparator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; + +/** + * Utility class that handles secondary key for sorting. + */ +class PigSecondaryKeyComparatorSpark implements Comparator, Serializable { + private static final Log LOG = LogFactory.getLog(PigSecondaryKeyComparatorSpark.class); + private static final long serialVersionUID = 1L; + + private static boolean[] secondarySortOrder; + + public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) { + secondarySortOrder = pSecondarySortOrder; + } + + //IndexedKeyPartitioner will put the tuple with same mainKey together, in PigSecondaryKeyComparatorSpark#compare + // (Object o1, Object o2) + //we only compare the secondaryKey + @Override + public int compare(Object o1, Object o2) { + Tuple t1 = (Tuple) o1; + Tuple t2 = (Tuple) o2; + try { + if ((t1.size() < 3) || (t2.size() < 3)) { + throw new RuntimeException("tuple size must bigger than 3, tuple[0] stands for index, tuple[1]" + + "stands for the compound key, tuple[3] stands for the value"); + } + Tuple compoundKey1 = (Tuple) t1.get(1); + Tuple compoundKey2 = (Tuple) t2.get(1); + if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) { + throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," + + "compoundKey[1] stands for secondaryKey"); + } + Object secondaryKey1 = compoundKey1.get(1); + Object secondaryKey2 = compoundKey2.get(1); + int res = compareKeys(secondaryKey1, secondaryKey2, secondarySortOrder); + if (LOG.isDebugEnabled()) { + LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res); + } + return res; + } catch (ExecException e) { + throw new RuntimeException("Fail to get the compoundKey", e); + } + } + + //compare the mainKey and secondaryKey + public int compareCompoundKey(Tuple compoundKey1, Tuple compoundKey2){ + try { + if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) { + throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," + + "compoundKey[1] stands for secondaryKey"); + } + Object mainKey1 = compoundKey1.get(0); + Object mainKey2 = compoundKey2.get(0); + int res = compareKeys(mainKey1,mainKey2, null); + if ( res !=0 ){ + return res; + } else { + Object secondaryKey1 = compoundKey1.get(1); + Object secondaryKey2 = compoundKey2.get(1); + res = compareKeys(secondaryKey1, secondaryKey2, secondarySortOrder); + if (LOG.isDebugEnabled()) { + LOG.debug("compoundKey1:" + compoundKey1 + "compoundKey2:" + compoundKey2 + " res:" + res); + } + return res; + } + } catch (ExecException e) { + throw new RuntimeException("Fail to get the compoundKey", e); + } + } + + private int compareKeys(Object o1, Object o2, boolean[] asc) { + int rc = 0; + if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) { + // objects are Tuples, we may need to apply sort order inside them + Tuple t1 = (Tuple) o1; + Tuple t2 = (Tuple) o2; + int sz1 = t1.size(); + int sz2 = t2.size(); + if (sz2 < sz1) { + return 1; + } else if (sz2 > sz1) { + return -1; + } else { + for (int i = 0; i < sz1; i++) { + try { + rc = DataType.compare(t1.get(i), t2.get(i)); + if (rc != 0 && asc != null && asc.length > 1 && !asc[i]) + rc *= -1; + if ((t1.get(i) == null) || (t2.get(i) == null)) { + if (LOG.isDebugEnabled()) { + LOG.debug("t1.get(i) is:" + t1.get(i) + " t2.get(i) is:" + t2.get(i)); + } + } + if (rc != 0) break; + } catch (ExecException e) { + throw new RuntimeException("Unable to compare tuples", e); + } + } + } + } else { + // objects are NOT Tuples, delegate to DataType.compare() + rc = DataType.compare(o1, o2); + } + // apply sort order for keys that are not tuples or for whole tuples + if (asc != null && asc.length == 1 && !asc[0]) + rc *= -1; + return rc; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class PoissonSampleConverter implements RDDConverter<Tuple, Tuple, POPoissonSampleSpark> { + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POPoissonSampleSpark po) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, po, 1); + RDD<Tuple> rdd = predecessors.get(0); + PoissionSampleFunction poissionSampleFunction = new PoissionSampleFunction(po); + return rdd.toJavaRDD().mapPartitions(poissionSampleFunction, false).rdd(); + } + + private static class PoissionSampleFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> { + + private final POPoissonSampleSpark po; + + public PoissionSampleFunction(POPoissonSampleSpark po) { + this.po = po; + } + + @Override + public Iterable<Tuple> call(final Iterator<Tuple> tuples) { + + return new Iterable<Tuple>() { + + public Iterator<Tuple> iterator() { + return new OutputConsumerIterator(tuples) { + + @Override + protected void attach(Tuple tuple) { + po.setInputs(null); + po.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + return po.getNextTuple(); + } + + @Override + protected void endOfInput() { + po.setEndOfInput(true); + } + }; + } + }; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; + +import org.apache.spark.rdd.RDD; + +/** + * Given an RDD and a PhysicalOperater, and implementation of this class can + * convert the RDD to another RDD. + */ +public interface RDDConverter<IN, OUT, T extends PhysicalOperator> { + RDD<OUT> convert(List<RDD<IN>> rdd, T physicalOperator) throws IOException; +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import scala.Tuple2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; + +public class RankConverter implements RDDConverter<Tuple, Tuple, PORank> { + + private static final Log LOG = LogFactory.getLog(RankConverter.class); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank) + throws IOException { + int parallelism = SparkPigContext.get().getParallelism(predecessors, poRank); + SparkUtil.assertPredecessorSize(predecessors, poRank, 1); + RDD<Tuple> rdd = predecessors.get(0); + JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD() + .mapToPair(new ToPairRdd()); + JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd + .groupByKey(parallelism); + JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex + .mapToPair(new IndexCounters()); + JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex + .sortByKey(true, parallelism); + Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap(); + JavaRDD<Tuple> finalRdd = rdd.toJavaRDD() + .map(new RankFunction(new HashMap<Integer, Long>(counts))); + return finalRdd.rdd(); + } + + @SuppressWarnings("serial") + private static class ToPairRdd implements + PairFunction<Tuple, Integer, Long>, Serializable { + + @Override + public Tuple2<Integer, Long> call(Tuple t) { + try { + Integer key = (Integer) t.get(0); + Long value = (Long) t.get(1); + return new Tuple2<Integer, Long>(key, value); + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + } + + @SuppressWarnings("serial") + private static class IndexCounters implements + PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>, + Serializable { + @Override + public Tuple2<Integer, Long> call(Tuple2<Integer, + Iterable<Long>> input) { + long lastVaue = 0L; + + for (Long t : input._2()) { + lastVaue = (t > lastVaue) ? t : lastVaue; + } + + return new Tuple2<Integer, Long>(input._1(), lastVaue); + } + } + + @SuppressWarnings("serial") + private static class RankFunction implements Function<Tuple, Tuple>, + Serializable { + private final HashMap<Integer, Long> counts; + + private RankFunction(HashMap<Integer, Long> counts) { + this.counts = counts; + } + + @Override + public Tuple call(Tuple input) throws Exception { + Tuple output = TupleFactory.getInstance() + .newTuple(input.getAll().size() - 2); + + for (int i = 1; i < input.getAll().size() - 2; i ++) { + output.set(i, input.get(i+2)); + } + + long offset = calculateOffset((Integer) input.get(0)); + output.set(0, offset + (Long)input.get(2)); + return output; + } + + private long calculateOffset(Integer index) { + long offset = 0; + + if (index > 0) { + for (int i = 0; i < index; i++) { + if (counts.containsKey(i)) { + offset += counts.get(i); + } + } + } + return offset; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import scala.Tuple2; +import scala.runtime.AbstractFunction1; +import scala.runtime.AbstractFunction2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DefaultBagFactory; +import org.apache.pig.data.DefaultTuple; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.rdd.PairRDDFunctions; +import org.apache.spark.rdd.RDD; + +@SuppressWarnings({"serial"}) +public class ReduceByConverter implements RDDConverter<Tuple, Tuple, POReduceBySpark> { + private static final Log LOG = LogFactory.getLog(ReduceByConverter.class); + + private static final TupleFactory tf = TupleFactory.getInstance(); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark op) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, op, 1); + int parallelism = SparkPigContext.get().getParallelism(predecessors, op); + + RDD<Tuple> rdd = predecessors.get(0); + RDD<Tuple2<IndexedKey, Tuple>> rddPair + = rdd.map(new LocalRearrangeFunction(op.getLROp(), op.isUseSecondaryKey(), op.getSecondarySortOrder()) + , SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()); + if (op.isUseSecondaryKey()) { + return SecondaryKeySortUtil.handleSecondarySort(rddPair, op.getPKGOp()); + } else { + PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions + = new PairRDDFunctions<>(rddPair, + SparkUtil.getManifest(IndexedKey.class), + SparkUtil.getManifest(Tuple.class), null); + + RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = pairRDDFunctions.reduceByKey( + SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism), + new MergeValuesFunction(op)); + LOG.debug("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism); + + return tupleRDD.map(new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class)); + } + } + + private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort( + RDD<Tuple> rdd, POReduceBySpark op, int parallelism) { + + RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(), + SparkUtil.<Tuple, Object>getTuple2Manifest()); + + JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair, + SparkUtil.getManifest(Tuple.class), + SparkUtil.getManifest(Object.class)); + + //first sort the tuple by secondary key if enable useSecondaryKey sort + JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions( + new HashPartitioner(parallelism), + new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder())); + JavaRDD<Tuple> jrdd = sorted.keys(); + JavaRDD<Tuple2<IndexedKey, Tuple>> jrddPair = jrdd.map(new ToKeyValueFunction(op)); + return jrddPair; + } + + private static class ToKeyNullValueFunction extends + AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements + Serializable { + + @Override + public Tuple2<Tuple, Object> apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.debug("ToKeyNullValueFunction in " + t); + } + + Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null); + if (LOG.isDebugEnabled()) { + LOG.debug("ToKeyNullValueFunction out " + out); + } + + return out; + } + } + + /** + * Converts incoming locally rearranged tuple, which is of the form + * (index, key, value) into Tuple2<key, Tuple(key, value)> + */ + private static class ToKeyValueFunction implements + Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable { + + private POReduceBySpark poReduce = null; + + public ToKeyValueFunction(POReduceBySpark poReduce) { + this.poReduce = poReduce; + } + + @Override + public Tuple2<IndexedKey, Tuple> call(Tuple t) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("ToKeyValueFunction in " + t); + } + + Object key; + if ((poReduce != null) && (poReduce.isUseSecondaryKey())) { + key = ((Tuple) t.get(1)).get(0); + } else { + key = t.get(1); + } + + Tuple tupleWithKey = tf.newTuple(); + tupleWithKey.append(key); + tupleWithKey.append(t.get(2)); + + Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte) t.get(0), key), tupleWithKey); + if (LOG.isDebugEnabled()) { + LOG.debug("ToKeyValueFunction out " + out); + } + + return out; + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Given two input tuples, this function outputs the resultant tuple. + * Additionally, it packages the input tuples to ensure the Algebraic Functions can work on them. + */ + private static final class MergeValuesFunction extends AbstractFunction2<Tuple, Tuple, Tuple> + implements Serializable { + private final POReduceBySpark poReduce; + + public MergeValuesFunction(POReduceBySpark poReduce) { + this.poReduce = poReduce; + } + + @Override + public Tuple apply(Tuple v1, Tuple v2) { + LOG.debug("MergeValuesFunction in : " + v1 + " , " + v2); + Tuple result = tf.newTuple(2); + DataBag bag = DefaultBagFactory.getInstance().newDefaultBag(); + Tuple t = new DefaultTuple(); + try { + // Package the input tuples so they can be processed by Algebraic functions. + Object key = v1.get(0); + if (key == null) { + key = ""; + } else { + result.set(0, key); + } + bag.add((Tuple) v1.get(1)); + bag.add((Tuple) v2.get(1)); + t.append(key); + t.append(bag); + + poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true}); + Tuple packagedTuple = (Tuple) poReduce.getPKGOp().getPkgr().getNext().result; + + // Perform the operation + LOG.debug("MergeValuesFunction packagedTuple : " + t); + poReduce.attachInput(packagedTuple); + Result r = poReduce.getNext(poReduce.getResultType()); + + // Ensure output is consistent with the output of KeyValueFunction + // If we return r.result, the result will be something like this: + // (ABC,(2),(3)) - A tuple with key followed by values. + // But, we want the result to look like this: + // (ABC,((2),(3))) - A tuple with key and a value tuple (containing values). + // Hence, the construction of a new value tuple + + Tuple valueTuple = tf.newTuple(); + for (Object o : ((Tuple) r.result).getAll()) { + if (!o.equals(key)) { + valueTuple.append(o); + } + } + result.set(1,valueTuple); + LOG.debug("MergeValuesFunction out : " + result); + return result; + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + } + + /** + * This function transforms the Tuple to ensure it is packaged as per requirements of the Operator's packager. + */ + private static final class ToTupleFunction extends AbstractFunction1<Tuple2<IndexedKey, Tuple>, Tuple> + implements Serializable { + + private final POReduceBySpark poReduce; + + public ToTupleFunction(POReduceBySpark poReduce) { + this.poReduce = poReduce; + } + + @Override + public Tuple apply(Tuple2<IndexedKey, Tuple> v1) { + LOG.debug("ToTupleFunction in : " + v1); + DataBag bag = DefaultBagFactory.getInstance().newDefaultBag(); + Tuple t = new DefaultTuple(); + Tuple packagedTuple = null; + try { + Object key = v1._2().get(0); + bag.add((Tuple) v1._2().get(1)); + t.append(key); + t.append(bag); + poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true}); + packagedTuple = (Tuple) poReduce.getPKGOp().getPkgr().getNext().result; + } catch (ExecException e) { + throw new RuntimeException(e); + } + LOG.debug("ToTupleFunction out : " + packagedTuple); + return packagedTuple; + } + } + + /** + * Converts incoming locally rearranged tuple, which is of the form + * (index, key, value) into Tuple2<key, Tuple(key, value)> + */ + private static class LocalRearrangeFunction extends + AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable { + + private final POLocalRearrange lra; + + private boolean useSecondaryKey; + private boolean[] secondarySortOrder; + + public LocalRearrangeFunction(POLocalRearrange lra, boolean useSecondaryKey, boolean[] secondarySortOrder) { + if( useSecondaryKey ) { + this.useSecondaryKey = useSecondaryKey; + this.secondarySortOrder = secondarySortOrder; + } + this.lra = lra; + } + + @Override + public Tuple2<IndexedKey, Tuple> apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction in " + t); + } + Result result; + try { + lra.setInputs(null); + lra.attachInput(t); + result = lra.getNextTuple(); + + if (result == null) { + throw new RuntimeException( + "Null response found for LocalRearange on tuple: " + + t); + } + + switch (result.returnStatus) { + case POStatus.STATUS_OK: + // (index, key, Tuple(key, value)) + Tuple resultTuple = (Tuple) result.result; + Object key = resultTuple.get(1); + IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key); + if( useSecondaryKey) { + indexedKey.setUseSecondaryKey(useSecondaryKey); + indexedKey.setSecondarySortOrder(secondarySortOrder); + } + Tuple outValue = TupleFactory.getInstance().newTuple(); + outValue.append(key); + outValue.append(resultTuple.get(2)); + Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey, + outValue); + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction out " + out); + } + return out; + default: + throw new RuntimeException( + "Unexpected response code from operator " + + lra + " : " + result); + } + } catch (ExecException e) { + throw new RuntimeException( + "Couldn't do LocalRearange on tuple: " + t, e); + } + } + + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java Mon May 29 15:00:39 2017 @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Objects; + +import scala.Tuple2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +/** + * Provide utility functions which is used by ReducedByConverter and JoinGroupSparkConverter. + */ +public class SecondaryKeySortUtil { + private static final Log LOG = LogFactory + .getLog(SecondaryKeySortUtil.class); + + public static RDD<Tuple> handleSecondarySort( + RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) { + JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class), + SparkUtil.getManifest(Tuple.class)); + + int partitionNums = pairRDD.partitions().size(); + //repartition to group tuples with same indexedkey to same partition + JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions( + new IndexedKeyPartitioner(partitionNums)); + //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...)) + return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd(); + } + + //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...)) + //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result + private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>, + Serializable { + private POPackage pkgOp; + + public AccumulateByKey(POPackage pkgOp) { + this.pkgOp = pkgOp; + } + + @Override + public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception { + return new Iterable<Tuple>() { + Object curKey = null; + ArrayList curValues = new ArrayList(); + boolean initialized = false; + + @Override + public Iterator<Tuple> iterator() { + return new Iterator<Tuple>() { + + @Override + public boolean hasNext() { + return it.hasNext() || curKey != null; + } + + @Override + public Tuple next() { + while (it.hasNext()) { + Tuple2<IndexedKey, Tuple> t = it.next(); + //key changes, restruct the last tuple by curKey, curValues and return + Object tMainKey = null; + try { + tMainKey = ((Tuple) (t._1()).getKey()).get(0); + + //If the key has changed and we've seen at least 1 already + if (initialized && + ((curKey == null && tMainKey != null) || + (curKey != null && !curKey.equals(tMainKey)))){ + Tuple result = restructTuple(curKey, new ArrayList(curValues)); + curValues.clear(); + curKey = tMainKey; + curValues.add(t._2()); + return result; + } + curKey = tMainKey; + //if key does not change, just append the value to the same key + curValues.add(t._2()); + initialized = true; + + } catch (ExecException e) { + throw new RuntimeException("AccumulateByKey throw exception: ", e); + } + } + if (!initialized) { + throw new RuntimeException("No tuples seen"); + } + + //if we get here, this should be the last record + Tuple res = restructTuple(curKey, curValues); + curKey = null; + return res; + } + + + @Override + public void remove() { + // Not implemented. + // throw Unsupported Method Invocation Exception. + throw new UnsupportedOperationException(); + } + }; + } + }; + } + + private Tuple restructTuple(final Object curKey, final ArrayList<Tuple> curValues) { + try { + Tuple retVal = null; + PigNullableWritable retKey = new PigNullableWritable() { + + public Object getValueAsPigType() { + return curKey; + } + }; + + //Here restruct a tupleIterator, later POPackage#tupIter will use it. + final Iterator<Tuple> tupleItearator = curValues.iterator(); + Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() { + public boolean hasNext() { + return tupleItearator.hasNext(); + } + + public NullableTuple next() { + Tuple t = tupleItearator.next(); + return new NullableTuple(t); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + pkgOp.setInputs(null); + pkgOp.attachInput(retKey, iterator); + Result res = pkgOp.getNextTuple(); + if (res.returnStatus == POStatus.STATUS_OK) { + retVal = (Tuple) res.result; + } + if (LOG.isDebugEnabled()) { + LOG.debug("AccumulateByKey out: " + retVal); + } + return retVal; + } catch (ExecException e) { + throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e); + } + } + } + + //Group tuples with same IndexKey into same partition + private static class IndexedKeyPartitioner extends Partitioner { + private int partition; + + public IndexedKeyPartitioner(int partition) { + this.partition = partition; + } + + @Override + public int getPartition(Object obj) { + IndexedKey indexedKey = (IndexedKey) obj; + Tuple key = (Tuple) indexedKey.getKey(); + + int hashCode = 0; + try { + hashCode = Objects.hashCode(key.get(0)); + } catch (ExecException e) { + throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e); + } + return Math.abs(hashCode) % partition; + } + + @Override + public int numPartitions() { + return partition; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,641 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.data.DataBag; +import org.apache.pig.impl.builtin.PartitionSkewedKeys; +import org.apache.pig.impl.util.Pair; +import org.apache.spark.Partitioner; +import org.apache.spark.broadcast.Broadcast; +import scala.Tuple2; +import scala.runtime.AbstractFunction1; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.util.MultiMap; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +public class SkewedJoinConverter implements + RDDConverter<Tuple, Tuple, POSkewedJoin>, Serializable { + + private static Log log = LogFactory.getLog(SkewedJoinConverter.class); + + private POLocalRearrange[] LRs; + private POSkewedJoin poSkewedJoin; + + private String skewedJoinPartitionFile; + + public void setSkewedJoinPartitionFile(String partitionFile) { + skewedJoinPartitionFile = partitionFile; + } + + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POSkewedJoin poSkewedJoin) throws IOException { + + SparkUtil.assertPredecessorSize(predecessors, poSkewedJoin, 2); + LRs = new POLocalRearrange[2]; + this.poSkewedJoin = poSkewedJoin; + + createJoinPlans(poSkewedJoin.getJoinPlans()); + + // extract the two RDDs + RDD<Tuple> rdd1 = predecessors.get(0); + RDD<Tuple> rdd2 = predecessors.get(1); + + Broadcast<List<Tuple>> keyDist = SparkPigContext.get().getBroadcastedVars().get(skewedJoinPartitionFile); + + // if no keyDist, we need defaultParallelism + Integer defaultParallelism = SparkPigContext.get().getParallelism(predecessors, poSkewedJoin); + + // with partition id + SkewPartitionIndexKeyFunction skewFun = new SkewPartitionIndexKeyFunction(this, keyDist, defaultParallelism); + RDD<Tuple2<PartitionIndexedKey, Tuple>> skewIdxKeyRDD = rdd1.map(skewFun, + SparkUtil.<PartitionIndexedKey, Tuple>getTuple2Manifest()); + + // Tuple2 RDD to Pair RDD + JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD = new JavaPairRDD<PartitionIndexedKey, Tuple>( + skewIdxKeyRDD, SparkUtil.getManifest(PartitionIndexedKey.class), + SparkUtil.getManifest(Tuple.class)); + + // with partition id + StreamPartitionIndexKeyFunction streamFun = new StreamPartitionIndexKeyFunction(this, keyDist, defaultParallelism); + JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(streamFun); + + // Tuple2 RDD to Pair RDD + JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD = new JavaPairRDD<PartitionIndexedKey, Tuple>( + streamIdxKeyJavaRDD.rdd(), SparkUtil.getManifest(PartitionIndexedKey.class), + SparkUtil.getManifest(Tuple.class)); + + JavaRDD<Tuple> result = doJoin(skewIndexedJavaPairRDD, + streamIndexedJavaPairRDD, + buildPartitioner(keyDist, defaultParallelism), + keyDist); + + // return type is RDD<Tuple>, so take it from JavaRDD<Tuple> + return result.rdd(); + } + + private void createJoinPlans(MultiMap<PhysicalOperator, PhysicalPlan> inpPlans) throws PlanException { + + int i = -1; + for (PhysicalOperator inpPhyOp : inpPlans.keySet()) { + ++i; + POLocalRearrange lr = new POLocalRearrange(genKey()); + try { + lr.setIndex(i); + } catch (ExecException e) { + throw new PlanException(e.getMessage(), e.getErrorCode(), e.getErrorSource(), e); + } + lr.setResultType(DataType.TUPLE); + lr.setKeyType(DataType.TUPLE);//keyTypes.get(i).size() > 1 ? DataType.TUPLE : keyTypes.get(i).get(0)); + lr.setPlans(inpPlans.get(inpPhyOp)); + LRs[i] = lr; + } + } + + private OperatorKey genKey() { + return new OperatorKey(poSkewedJoin.getOperatorKey().scope, NodeIdGenerator.getGenerator().getNextNodeId(poSkewedJoin.getOperatorKey().scope)); + } + + /** + * @param <L> be generic because it can be Optional<Tuple> or Tuple + * @param <R> be generic because it can be Optional<Tuple> or Tuple + */ + private static class ToValueFunction<L, R> implements + FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable { + + private boolean[] innerFlags; + private int[] schemaSize; + + private final Broadcast<List<Tuple>> keyDist; + + transient private boolean initialized = false; + transient protected Map<Tuple, Pair<Integer, Integer>> reducerMap; + + public ToValueFunction(boolean[] innerFlags, int[] schemaSize, Broadcast<List<Tuple>> keyDist) { + this.innerFlags = innerFlags; + this.schemaSize = schemaSize; + this.keyDist = keyDist; + } + + private class Tuple2TransformIterable implements Iterable<Tuple> { + + Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> in; + + Tuple2TransformIterable( + Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) { + in = input; + } + + public Iterator<Tuple> iterator() { + return new IteratorTransform<Tuple2<PartitionIndexedKey, Tuple2<L, R>>, Tuple>( + in) { + @Override + protected Tuple transform( + Tuple2<PartitionIndexedKey, Tuple2<L, R>> next) { + try { + + L left = next._2._1; + R right = next._2._2; + + TupleFactory tf = TupleFactory.getInstance(); + Tuple result = tf.newTuple(); + + Tuple leftTuple = tf.newTuple(); + if (!innerFlags[0]) { + // left should be Optional<Tuple> + Optional<Tuple> leftOption = (Optional<Tuple>) left; + if (!leftOption.isPresent()) { + // Add an empty left record for RIGHT OUTER JOIN. + // Notice: if it is a skewed, only join the first reduce key + if (isFirstReduceKey(next._1)) { + for (int i = 0; i < schemaSize[0]; i++) { + leftTuple.append(null); + } + } else { + return this.next(); + } + } else { + leftTuple = leftOption.get(); + } + } else { + leftTuple = (Tuple) left; + } + for (int i = 0; i < leftTuple.size(); i++) { + result.append(leftTuple.get(i)); + } + + Tuple rightTuple = tf.newTuple(); + if (!innerFlags[1]) { + // right should be Optional<Tuple> + Optional<Tuple> rightOption = (Optional<Tuple>) right; + if (!rightOption.isPresent()) { + for (int i = 0; i < schemaSize[1]; i++) { + rightTuple.append(null); + } + } else { + rightTuple = rightOption.get(); + } + } else { + rightTuple = (Tuple) right; + } + for (int i = 0; i < rightTuple.size(); i++) { + result.append(rightTuple.get(i)); + } + + if (log.isDebugEnabled()) { + log.debug("MJC: Result = " + result.toDelimitedString(" ")); + } + + return result; + } catch (Exception e) { + log.warn(e); + } + return null; + } + }; + } + } + + @Override + public Iterable<Tuple> call( + Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) { + return new Tuple2TransformIterable(input); + } + + private boolean isFirstReduceKey(PartitionIndexedKey pKey) { + // non-skewed key + if (pKey.getPartitionId() == -1) { + return true; + } + + if (!initialized) { + Integer[] reducers = new Integer[1]; + reducerMap = loadKeyDistribution(keyDist, reducers); + initialized = true; + } + + Pair<Integer, Integer> indexes = reducerMap.get(pKey.getKey()); + if (indexes != null && pKey.getPartitionId() != indexes.first) { + // return false only when the key is skewed + // and it is not the first reduce key. + return false; + } + + return true; + } + } + + /** + * Utility function. + * 1. Get parallelism + * 2. build a key distribution map from the broadcasted key distribution file + * + * @param keyDist + * @param totalReducers + * @return + */ + private static Map<Tuple, Pair<Integer, Integer>> loadKeyDistribution(Broadcast<List<Tuple>> keyDist, + Integer[] totalReducers) { + Map<Tuple, Pair<Integer, Integer>> reducerMap = new HashMap<>(); + totalReducers[0] = -1; // set a default value + + if (keyDist == null || keyDist.value() == null || keyDist.value().size() == 0) { + // this could happen if sampling is empty + log.warn("Empty dist file: "); + return reducerMap; + } + + try { + final TupleFactory tf = TupleFactory.getInstance(); + + Tuple t = keyDist.value().get(0); + + Map<String, Object> distMap = (Map<String, Object>) t.get(0); + DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST); + + totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS)); + + Iterator<Tuple> it = partitionList.iterator(); + while (it.hasNext()) { + Tuple idxTuple = it.next(); + Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1); + Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2); + // Used to replace the maxIndex with the number of reducers + if (maxIndex < minIndex) { + maxIndex = totalReducers[0] + maxIndex; + } + + // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store + // it in the reducer map + Tuple keyTuple = tf.newTuple(); + for (int i = 0; i < idxTuple.size() - 2; i++) { + keyTuple.append(idxTuple.get(i)); + } + + // number of reducers + Integer cnt = maxIndex - minIndex; + reducerMap.put(keyTuple, new Pair(minIndex, cnt)); + } + + } catch (ExecException e) { + log.warn(e.getMessage()); + } + + return reducerMap; + } + + private static class PartitionIndexedKey extends IndexedKey { + // for user defined partitioner + int partitionId; + + public PartitionIndexedKey(byte index, Object key) { + super(index, key); + partitionId = -1; + } + + public PartitionIndexedKey(byte index, Object key, int pid) { + super(index, key); + partitionId = pid; + } + + public int getPartitionId() { + return partitionId; + } + + private void setPartitionId(int pid) { + partitionId = pid; + } + + @Override + public String toString() { + return "PartitionIndexedKey{" + + "index=" + getIndex() + + ", partitionId=" + getPartitionId() + + ", key=" + getKey() + + '}'; + } + } + + /** + * append a Partition id to the records from skewed table. + * so that the SkewedJoinPartitioner can send skewed records to different reducer + * <p> + * see: https://wiki.apache.org/pig/PigSkewedJoinSpec + */ + private static class SkewPartitionIndexKeyFunction extends + AbstractFunction1<Tuple, Tuple2<PartitionIndexedKey, Tuple>> implements + Serializable { + + private final SkewedJoinConverter poSkewedJoin; + + private final Broadcast<List<Tuple>> keyDist; + private final Integer defaultParallelism; + + transient private boolean initialized = false; + transient protected Map<Tuple, Pair<Integer, Integer>> reducerMap; + transient private Integer parallelism = -1; + transient private Map<Tuple, Integer> currentIndexMap; + + public SkewPartitionIndexKeyFunction(SkewedJoinConverter poSkewedJoin, + Broadcast<List<Tuple>> keyDist, + Integer defaultParallelism) { + this.poSkewedJoin = poSkewedJoin; + this.keyDist = keyDist; + this.defaultParallelism = defaultParallelism; + } + + @Override + public Tuple2<PartitionIndexedKey, Tuple> apply(Tuple tuple) { + // attach tuple to LocalRearrange + poSkewedJoin.LRs[0].attachInput(tuple); + + try { + Result lrOut = poSkewedJoin.LRs[0].getNextTuple(); + + // If tuple is (AA, 5) and key index is $1, then it lrOut is 0 5 + // (AA), so get(1) returns key + Byte index = (Byte) ((Tuple) lrOut.result).get(0); + Object key = ((Tuple) lrOut.result).get(1); + + Tuple keyTuple = (Tuple) key; + int partitionId = getPartitionId(keyTuple); + PartitionIndexedKey pIndexKey = new PartitionIndexedKey(index, keyTuple, partitionId); + + // make a (key, value) pair + Tuple2<PartitionIndexedKey, Tuple> tuple_KeyValue = new Tuple2<PartitionIndexedKey, Tuple>( + pIndexKey, + tuple); + + return tuple_KeyValue; + } catch (Exception e) { + System.out.print(e); + return null; + } + } + + private Integer getPartitionId(Tuple keyTuple) { + if (!initialized) { + Integer[] reducers = new Integer[1]; + reducerMap = loadKeyDistribution(keyDist, reducers); + parallelism = reducers[0]; + + if (parallelism <= 0) { + parallelism = defaultParallelism; + } + + currentIndexMap = Maps.newHashMap(); + + initialized = true; + } + + // for partition table, compute the index based on the sampler output + Pair<Integer, Integer> indexes; + Integer curIndex = -1; + + indexes = reducerMap.get(keyTuple); + + // if the reducerMap does not contain the key return -1 so that the + // partitioner will do the default hash based partitioning + if (indexes == null) { + return -1; + } + + if (currentIndexMap.containsKey(keyTuple)) { + curIndex = currentIndexMap.get(keyTuple); + } + + if (curIndex >= (indexes.first + indexes.second) || curIndex == -1) { + curIndex = indexes.first; + } else { + curIndex++; + } + + // set it in the map + currentIndexMap.put(keyTuple, curIndex); + return (curIndex % parallelism); + } + + } + + /** + * POPartitionRearrange is not used in spark mode now, + * Here, use flatMap and CopyStreamWithPidFunction to copy the + * stream records to the multiple reducers + * <p> + * see: https://wiki.apache.org/pig/PigSkewedJoinSpec + */ + private static class StreamPartitionIndexKeyFunction implements FlatMapFunction<Tuple, Tuple2<PartitionIndexedKey, Tuple>> { + + private SkewedJoinConverter poSkewedJoin; + private final Broadcast<List<Tuple>> keyDist; + private final Integer defaultParallelism; + + private transient boolean initialized = false; + protected transient Map<Tuple, Pair<Integer, Integer>> reducerMap; + private transient Integer parallelism; + + public StreamPartitionIndexKeyFunction(SkewedJoinConverter poSkewedJoin, + Broadcast<List<Tuple>> keyDist, + Integer defaultParallelism) { + this.poSkewedJoin = poSkewedJoin; + this.keyDist = keyDist; + this.defaultParallelism = defaultParallelism; + } + + public Iterable<Tuple2<PartitionIndexedKey, Tuple>> call(Tuple tuple) throws Exception { + if (!initialized) { + Integer[] reducers = new Integer[1]; + reducerMap = loadKeyDistribution(keyDist, reducers); + parallelism = reducers[0]; + if (parallelism <= 0) { + parallelism = defaultParallelism; + } + initialized = true; + } + + // streamed table + poSkewedJoin.LRs[1].attachInput(tuple); + Result lrOut = poSkewedJoin.LRs[1].getNextTuple(); + + Byte index = (Byte) ((Tuple) lrOut.result).get(0); + Tuple key = (Tuple) ((Tuple) lrOut.result).get(1); + + ArrayList<Tuple2<PartitionIndexedKey, Tuple>> l = new ArrayList(); + Pair<Integer, Integer> indexes = reducerMap.get(key); + + // For non skewed keys, we set the partition index to be -1 + // so that the partitioner will do the default hash based partitioning + if (indexes == null) { + indexes = new Pair<>(-1, 0); + } + + for (Integer reducerIdx = indexes.first, cnt = 0; cnt <= indexes.second; reducerIdx++, cnt++) { + if (reducerIdx >= parallelism) { + reducerIdx = 0; + } + + // set the partition index + int partitionId = reducerIdx.intValue(); + PartitionIndexedKey pIndexKey = new PartitionIndexedKey(index, key, partitionId); + + l.add(new Tuple2(pIndexKey, tuple)); + } + + return l; + } + } + + /** + * user defined spark partitioner for skewed join + */ + private static class SkewedJoinPartitioner extends Partitioner { + private int numPartitions; + + public SkewedJoinPartitioner(int parallelism) { + numPartitions = parallelism; + } + + @Override + public int numPartitions() { + return numPartitions; + } + + @Override + public int getPartition(Object IdxKey) { + if (IdxKey instanceof PartitionIndexedKey) { + int partitionId = ((PartitionIndexedKey) IdxKey).getPartitionId(); + if (partitionId >= 0) { + return partitionId; + } + } + + //else: by default using hashcode + Tuple key = (Tuple) ((PartitionIndexedKey) IdxKey).getKey(); + + + int code = key.hashCode() % numPartitions; + if (code >= 0) { + return code; + } else { + return code + numPartitions; + } + } + } + + /** + * use parallelism from keyDist or the default parallelism to + * create user defined partitioner + * + * @param keyDist + * @param defaultParallelism + * @return + */ + private SkewedJoinPartitioner buildPartitioner(Broadcast<List<Tuple>> keyDist, Integer defaultParallelism) { + Integer parallelism = -1; + Integer[] reducers = new Integer[1]; + loadKeyDistribution(keyDist, reducers); + parallelism = reducers[0]; + if (parallelism <= 0) { + parallelism = defaultParallelism; + } + + return new SkewedJoinPartitioner(parallelism); + } + + /** + * do all kinds of Join (inner, left outer, right outer, full outer) + * + * @param skewIndexedJavaPairRDD + * @param streamIndexedJavaPairRDD + * @param partitioner + * @return + */ + private JavaRDD<Tuple> doJoin( + JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD, + JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD, + SkewedJoinPartitioner partitioner, + Broadcast<List<Tuple>> keyDist) { + + boolean[] innerFlags = poSkewedJoin.getInnerFlags(); + int[] schemaSize = {0, 0}; + for (int i = 0; i < 2; i++) { + if (poSkewedJoin.getSchema(i) != null) { + schemaSize[i] = poSkewedJoin.getSchema(i).size(); + } + } + + ToValueFunction toValueFun = new ToValueFunction(innerFlags, schemaSize, keyDist); + + if (innerFlags[0] && innerFlags[1]) { + // inner join + JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> resultKeyValue = skewIndexedJavaPairRDD. + join(streamIndexedJavaPairRDD, partitioner); + + return resultKeyValue.mapPartitions(toValueFun); + } else if (innerFlags[0] && !innerFlags[1]) { + // left outer join + JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD. + leftOuterJoin(streamIndexedJavaPairRDD, partitioner); + + return resultKeyValue.mapPartitions(toValueFun); + } else if (!innerFlags[0] && innerFlags[1]) { + // right outer join + JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Tuple>> resultKeyValue = skewIndexedJavaPairRDD. + rightOuterJoin(streamIndexedJavaPairRDD, partitioner); + + return resultKeyValue.mapPartitions(toValueFun); + } else { + // full outer join + JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD. + fullOuterJoin(streamIndexedJavaPairRDD, partitioner); + + return resultKeyValue.mapPartitions(toValueFun); + } + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +import scala.Tuple2; +import scala.runtime.AbstractFunction1; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +@SuppressWarnings("serial") +public class SortConverter implements RDDConverter<Tuple, Tuple, POSort> { + private static final Log LOG = LogFactory.getLog(SortConverter.class); + + private static final FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction(); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSort sortOperator) + throws IOException { + SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1); + RDD<Tuple> rdd = predecessors.get(0); + int parallelism = SparkPigContext.get().getParallelism(predecessors, sortOperator); + RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(), + SparkUtil.<Tuple, Object> getTuple2Manifest()); + + JavaPairRDD<Tuple, Object> r = new JavaPairRDD<Tuple, Object>(rddPair, + SparkUtil.getManifest(Tuple.class), + SparkUtil.getManifest(Object.class)); + + JavaPairRDD<Tuple, Object> sorted = r.sortByKey( + sortOperator.getMComparator(), true, parallelism); + JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION); + + return mapped.rdd(); + } + + private static class ToValueFunction implements + FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable { + + private class Tuple2TransformIterable implements Iterable<Tuple> { + + Iterator<Tuple2<Tuple, Object>> in; + + Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) { + in = input; + } + + public Iterator<Tuple> iterator() { + return new IteratorTransform<Tuple2<Tuple, Object>, Tuple>(in) { + @Override + protected Tuple transform(Tuple2<Tuple, Object> next) { + return next._1(); + } + }; + } + } + + @Override + public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) { + return new Tuple2TransformIterable(input); + } + } + + private static class ToKeyValueFunction extends + AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements + Serializable { + + @Override + public Tuple2<Tuple, Object> apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sort ToKeyValueFunction in " + t); + } + Tuple key = t; + Object value = null; + // (key, value) + Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value); + if (LOG.isDebugEnabled()) { + LOG.debug("Sort ToKeyValueFunction out " + out); + } + return out; + } + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import scala.Tuple2; +import scala.runtime.AbstractFunction1; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DefaultBagFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.rdd.RDD; + /* + sort the sample data and convert the sample data to the format (all,{(sampleEle1),(sampleEle2),...}) + + */ +@SuppressWarnings("serial") +public class SparkSampleSortConverter implements RDDConverter<Tuple, Tuple, POSampleSortSpark> { + private static final Log LOG = LogFactory.getLog(SparkSampleSortConverter.class); + private static TupleFactory tf = TupleFactory.getInstance(); + private static BagFactory bf = DefaultBagFactory.getInstance(); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSampleSortSpark sortOperator) + throws IOException { + SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1); + RDD<Tuple> rdd = predecessors.get(0); + RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(), + SparkUtil.<Tuple, Object> getTuple2Manifest()); + + JavaPairRDD<Tuple, Object> r = new JavaPairRDD<Tuple, Object>(rddPair, + SparkUtil.getManifest(Tuple.class), + SparkUtil.getManifest(Object.class)); + //sort sample data + JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true); + //convert every element in sample data from element to (all, element) format + JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(new AggregateFunction()); + //use groupByKey to aggregate all values( the format will be ((all),{(sampleEle1),(sampleEle2),...} ) + JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new ToValueFunction()); + return groupByKey.rdd(); + } + + + private static class MergeFunction implements org.apache.spark.api.java.function.Function2<Tuple, Tuple, Tuple> + , Serializable { + + @Override + public Tuple call(Tuple v1, Tuple v2) { + Tuple res = tf.newTuple(); + res.append(v1); + res.append(v2); + LOG.info("MergeFunction out:"+res); + return res; + } + } + + // input: Tuple2<Tuple,Object> + // output: Tuple2("all", Tuple) + private static class AggregateFunction implements + PairFlatMapFunction<Iterator<Tuple2<Tuple, Object>>, String,Tuple>, Serializable { + + private class Tuple2TransformIterable implements Iterable<Tuple2<String,Tuple>> { + + Iterator<Tuple2<Tuple, Object>> in; + + Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) { + in = input; + } + + public Iterator<Tuple2<String,Tuple>> iterator() { + return new IteratorTransform<Tuple2<Tuple, Object>, Tuple2<String,Tuple>>(in) { + @Override + protected Tuple2<String,Tuple> transform(Tuple2<Tuple, Object> next) { + LOG.info("AggregateFunction in:"+ next._1()) ; + return new Tuple2<String,Tuple>("all",next._1()); + } + }; + } + } + + @Override + public Iterable<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, Object>> input) throws Exception { + return new Tuple2TransformIterable(input); + } + + } + + private static class ToValueFunction implements Function<Tuple2<String, Iterable<Tuple>>, Tuple> { + @Override + public Tuple call(Tuple2<String, Iterable<Tuple>> next) throws Exception { + Tuple res = tf.newTuple(); + res.append(next._1()); + Iterator<Tuple> iter = next._2().iterator(); + DataBag bag = bf.newDefaultBag(); + while(iter.hasNext()) { + bag.add(iter.next()); + } + res.append(bag); + LOG.info("ToValueFunction1 out:" + res); + return res; + } + } + + private static class ToKeyValueFunction extends + AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements + Serializable { + + @Override + public Tuple2<Tuple, Object> apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.info("Sort ToKeyValueFunction in " + t); + } + Tuple key = t; + Object value = null; + // (key, value) + Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value); + if (LOG.isDebugEnabled()) { + LOG.info("Sort ToKeyValueFunction out " + out); + } + return out; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.rdd.RDD; + +public class SplitConverter implements RDDConverter<Tuple, Tuple, POSplit> { + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSplit poSplit) + throws IOException { + SparkUtil.assertPredecessorSize(predecessors, poSplit, 1); + return predecessors.get(0); + } + +}
