Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +@SuppressWarnings("serial") +public class FRJoinConverter implements + RDDConverter<Tuple, Tuple, POFRJoin> { + private static final Log LOG = LogFactory.getLog(FRJoinConverter.class); + + private Set<String> replicatedInputs; + + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POFRJoin poFRJoin) throws IOException { + SparkUtil.assertPredecessorSizeGreaterThan(predecessors, poFRJoin, 1); + RDD<Tuple> rdd = predecessors.get(0); + + attachReplicatedInputs((POFRJoinSpark) poFRJoin); + + FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin); + return rdd.toJavaRDD().mapPartitions(frJoinFunction, true).rdd(); + } + + private void attachReplicatedInputs(POFRJoinSpark poFRJoin) { + Map<String, List<Tuple>> replicatedInputMap = new HashMap<>(); + + for (String replicatedInput : replicatedInputs) { + replicatedInputMap.put(replicatedInput, SparkPigContext.get().getBroadcastedVars().get(replicatedInput).value()); + } + + poFRJoin.attachInputs(replicatedInputMap); + } + + private static class FRJoinFunction implements + FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + + private POFRJoin poFRJoin; + private FRJoinFunction(POFRJoin poFRJoin) { + this.poFRJoin = poFRJoin; + } + + @Override + public Iterable<Tuple> call(final Iterator<Tuple> input) throws Exception { + + return new Iterable<Tuple>() { + + @Override + public Iterator<Tuple> iterator() { + return new OutputConsumerIterator(input) { + + @Override + protected void attach(Tuple tuple) { + poFRJoin.setInputs(null); + poFRJoin.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + return poFRJoin.getNextTuple(); + } + + @Override + protected void endOfInput() { + } + }; + } + }; + } + + } + + public void setReplicatedInputs(Set<String> replicatedInputs) { + this.replicatedInputs = replicatedInputs; + } +} \ No newline at end of file
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.Serializable; +import java.util.List; + +import scala.runtime.AbstractFunction1; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.rdd.RDD; + +/** + * Converter that converts an RDD to a filtered RRD using POFilter + */ +@SuppressWarnings({ "serial" }) +public class FilterConverter implements RDDConverter<Tuple, Tuple, POFilter> { + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POFilter physicalOperator) { + SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); + RDD<Tuple> rdd = predecessors.get(0); + FilterFunction filterFunction = new FilterFunction(physicalOperator); + return rdd.filter(filterFunction); + } + + private static class FilterFunction extends + AbstractFunction1<Tuple, Object> implements Serializable { + + private POFilter poFilter; + + private FilterFunction(POFilter poFilter) { + this.poFilter = poFilter; + } + + @Override + public Boolean apply(Tuple v1) { + Result result; + try { + poFilter.setInputs(null); + poFilter.attachInput(v1); + result = poFilter.getNextTuple(); + } catch (ExecException e) { + throw new RuntimeException("Couldn't filter tuple", e); + } + + if (result == null) { + return false; + } + + switch (result.returnStatus) { + case POStatus.STATUS_OK: + return true; + case POStatus.STATUS_EOP: // TODO: probably also ok for EOS, + // END_OF_BATCH + return false; + default: + throw new RuntimeException( + "Unexpected response code from filter: " + result); + } + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.SchemaTupleBackend; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +/** + * Convert that is able to convert an RRD to another RRD using a POForEach + */ +@SuppressWarnings({"serial" }) +public class ForEachConverter implements RDDConverter<Tuple, Tuple, POForEach> { + + private JobConf jobConf; + + public ForEachConverter(JobConf jobConf) { + this.jobConf = jobConf; + } + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POForEach physicalOperator) { + + byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); + + SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); + RDD<Tuple> rdd = predecessors.get(0); + ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, confBytes); + + return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd(); + } + + private static class ForEachFunction implements + FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + + private POForEach poForEach; + private byte[] confBytes; + private transient JobConf jobConf; + + private ForEachFunction(POForEach poForEach, byte[] confBytes) { + this.poForEach = poForEach; + this.confBytes = confBytes; + } + + public Iterable<Tuple> call(final Iterator<Tuple> input) { + + initialize(); + + // Initialize a reporter as the UDF might want to report progress. + PhysicalOperator.setReporter(new ProgressableReporter()); + PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps(); + if (planLeafOps != null) { + for (PhysicalOperator op : planLeafOps) { + if (op.getClass() == POUserFunc.class) { + POUserFunc udf = (POUserFunc) op; + udf.setFuncInputSchema(); + } + } + } + + + return new Iterable<Tuple>() { + + @Override + public Iterator<Tuple> iterator() { + return new OutputConsumerIterator(input) { + + @Override + protected void attach(Tuple tuple) { + poForEach.setInputs(null); + poForEach.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + return poForEach.getNextTuple(); + } + + @Override + protected void endOfInput() { + } + }; + } + }; + } + + private void initialize() { + if (this.jobConf == null) { + try { + this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes); + PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext")); + SchemaTupleBackend.initialize(jobConf, pc); + } catch (IOException e) { + throw new RuntimeException("Couldn't initialize ForEachConverter"); + } + } + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.rdd.CoGroupedRDD; +import org.apache.spark.rdd.RDD; + +import scala.Product2; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.runtime.AbstractFunction1; + +@SuppressWarnings({ "serial" }) +public class GlobalRearrangeConverter implements + RDDConverter<Tuple, Tuple, POGlobalRearrangeSpark> { + private static final Log LOG = LogFactory + .getLog(GlobalRearrangeConverter.class); + + private static final TupleFactory tf = TupleFactory.getInstance(); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POGlobalRearrangeSpark op) throws IOException { + SparkUtil.assertPredecessorSizeGreaterThan(predecessors, + op, 0); + int parallelism = SparkPigContext.get().getParallelism(predecessors, + op); + +// TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input), +// vs using groupBy (like we do in this commented code), vs using +// reduceByKey(). This is a pending task in Pig on Spark Milestone 1 +// Once we figure that out, we can allow custom partitioning for +// secondary sort case as well. +// if (predecessors.size() == 1) { +// // GROUP BY +// JavaPairRDD<Object, Iterable<Tuple>> prdd; +// if (op.isUseSecondaryKey()) { +// prdd = handleSecondarySort(predecessors.get(0), op, parallelism); +// } else { +// JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD(); +// prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism); +// prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(), +// parallelism)); +// } +// JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op)); +// return jrdd2.rdd(); +// +// if (predecessors.size() == 1 && op.isUseSecondaryKey()) { +// return handleSecondarySort(predecessors.get(0), op, parallelism); +// } + List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>(); + + if (predecessors.size() == 1 && op.isUseSecondaryKey()) { + rddPairs.add(handleSecondarySort(predecessors.get(0), op, parallelism).rdd()); + } else { + for (RDD<Tuple> rdd : predecessors) { + JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class)); + JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction()); + rddPairs.add(rddPair.rdd()); + } + } + + // Something's wrong with the type parameters of CoGroupedRDD + // key and value are the same type ??? + CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>( + (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions + .asScalaBuffer(rddPairs).toSeq()), + SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism), SparkUtil.getManifest(Object.class)); + + RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd = + (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD; + return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd(); + } + + private JavaRDD<Tuple2<IndexedKey,Tuple>> handleSecondarySort( + RDD<Tuple> rdd, POGlobalRearrangeSpark op, int parallelism) { + + RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(), + SparkUtil.<Tuple, Object>getTuple2Manifest()); + + JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair, + SparkUtil.getManifest(Tuple.class), + SparkUtil.getManifest(Object.class)); + + //first sort the tuple by secondary key if enable useSecondaryKey sort + JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions( + new HashPartitioner(parallelism), + new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder())); + JavaRDD<Tuple> jrdd = sorted.keys(); + JavaRDD<Tuple2<IndexedKey,Tuple>> jrddPair = jrdd.map(new ToKeyValueFunction(op)); + return jrddPair; + } + + private static class RemoveValueFunction implements + FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable { + + private class Tuple2TransformIterable implements Iterable<Tuple> { + + Iterator<Tuple2<Tuple, Object>> in; + + Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) { + in = input; + } + + public Iterator<Tuple> iterator() { + return new IteratorTransform<Tuple2<Tuple, Object>, Tuple>(in) { + @Override + protected Tuple transform(Tuple2<Tuple, Object> next) { + return next._1(); + } + }; + } + } + + @Override + public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) { + return new Tuple2TransformIterable(input); + } + } + + private static class ToKeyNullValueFunction extends + AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements + Serializable { + + @Override + public Tuple2<Tuple, Object> apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.debug("ToKeyNullValueFunction in " + t); + } + + Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null); + if (LOG.isDebugEnabled()) { + LOG.debug("ToKeyNullValueFunction out " + out); + } + + return out; + } + } + + /** + * Function that extract keys from locally rearranged tuples. + */ + private static class GetKeyFunction implements Function<Tuple, Object>, Serializable { + public final POGlobalRearrangeSpark glrSpark; + + public GetKeyFunction(POGlobalRearrangeSpark globalRearrangeSpark) { + this.glrSpark = globalRearrangeSpark; + } + + public Object call(Tuple t) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("GetKeyFunction in " + t); + } + + Object key; + if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) { + key = ((Tuple) t.get(1)).get(0); + } else { + key = t.get(1); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("GetKeyFunction out " + key); + } + + return key; + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Function that converts elements of PairRDD to regular RDD. + * - Input (PairRDD) contains elements of the form + * Tuple2<key, Iterable<(index, key, value)>>. + * - Output (regular RDD) contains elements of the form + * Tuple<(key, iterator to (index, key, value))> + */ + private static class GroupTupleFunction + implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>, + Serializable { + public final POGlobalRearrangeSpark glrSpark; + + public GroupTupleFunction(POGlobalRearrangeSpark globalRearrangeSpark) { + this.glrSpark = globalRearrangeSpark; + } + + public Tuple call(Tuple2<Object, Iterable<Tuple>> v1) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("GroupTupleFunction in " + v1); + } + + Tuple tuple = tf.newTuple(2); + tuple.set(0, v1._1()); // key + // Note that v1._2() is (index, key, value) tuple, and + // v1._2().iterator() is iterator to Seq<Tuple> (aka bag of values) + tuple.set(1, v1._2().iterator()); + if (LOG.isDebugEnabled()) { + LOG.debug("GroupTupleFunction out " + tuple); + } + + return tuple; + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + } + + + /** + * Converts incoming locally rearranged tuple, which is of the form + * (index, key, value) into Tuple2<key, value> + */ + private static class ToKeyValueFunction implements + Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable { + + private POGlobalRearrangeSpark glrSpark = null; + + public ToKeyValueFunction(POGlobalRearrangeSpark glrSpark) { + this.glrSpark = glrSpark; + } + + public ToKeyValueFunction() { + + } + + @Override + public Tuple2<IndexedKey, Tuple> call(Tuple t) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("ToKeyValueFunction in " + t); + } + + Object key = null; + if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) { + key = ((Tuple) t.get(1)).get(0); + } else { + key = t.get(1); + } + + Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>( + new IndexedKey((Byte) t.get(0), key), + (Tuple) t.get(2)); + if (LOG.isDebugEnabled()) { + LOG.debug("ToKeyValueFunction out " + out); + } + + return out; + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Converts cogroup output where each element is {key, bag[]} represented + * as Tuple2<Object, Seq<Seq<Tuple>>> into tuple of form + * (key, Iterator<(index, key, value)>) + */ + private static class ToGroupKeyValueFunction implements + Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable { + + @Override + public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("ToGroupKeyValueFunction in " + input); + } + + final Object key = input._1().getKey(); + Object obj = input._2(); + // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq + Seq<Tuple>[] bags = (Seq<Tuple>[])obj; + int i = 0; + List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>(); + for (int j = 0; j < bags.length; j ++) { + Seq<Tuple> bag = bags[j]; + Iterator<Tuple> iterator = JavaConversions + .asJavaCollection(bag).iterator(); + final int index = i; + tupleIterators.add(new IteratorTransform<Tuple, Tuple>( + iterator) { + @Override + protected Tuple transform(Tuple next) { + try { + Tuple tuple = tf.newTuple(3); + tuple.set(0, index); + tuple.set(1, next); + return tuple; + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + }); + ++ i; + } + + Tuple out = tf.newTuple(2); + out.set(0, key); + out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator())); + if (LOG.isDebugEnabled()) { + LOG.debug("ToGroupKeyValueFunction out " + out); + } + + return out; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private static class IteratorUnion<T> implements Iterator<T> { + + private final Iterator<Iterator<T>> iterators; + + private Iterator<T> current; + + public IteratorUnion(Iterator<Iterator<T>> iterators) { + super(); + this.iterators = iterators; + } + + @Override + public boolean hasNext() { + if (current != null && current.hasNext()) { + return true; + } else if (iterators.hasNext()) { + current = iterators.next(); + return hasNext(); + } else { + return false; + } + } + + @Override + public T next() { + return current.next(); + } + + @Override + public void remove() { + current.remove(); + } + + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Mon May 29 15:00:39 2017 @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.Serializable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; + +import org.joda.time.DateTime; + +/** + * IndexedKey records the index and key info. + * This is used as key for JOINs. It addresses the case where key is + * either empty (or is a tuple with one or more empty fields). In this case, + * we must respect the SQL standard as documented in the equals() method. + */ +public class IndexedKey implements Serializable, Comparable { + private static final Log LOG = LogFactory.getLog(IndexedKey.class); + private byte index; + private Object key; + private boolean useSecondaryKey; + private boolean[] secondarySortOrder; + + public IndexedKey(byte index, Object key) { + this.index = index; + this.key = key; + } + + public byte getIndex() { + return index; + } + + public Object getKey() { + return key; + } + + @Override + public String toString() { + return "IndexedKey{" + + "index=" + index + + ", key=" + key + + '}'; + } + + /** + * If key is empty, we'd like compute equality based on key and index. + * If key is not empty, we'd like to compute equality based on just the key (like we normally do). + * There are two possible cases when two tuples are compared: + * 1) Compare tuples of same table (same index) + * 2) Compare tuples of different tables (different index values) + * In 1) + * key1 key2 equal? + * null null Y + * foo null N + * null foo N + * foo foo Y + * (1,1) (1,1) Y + * (1,) (1,) Y + * (1,2) (1,2) Y + * <p/> + * <p/> + * In 2) + * key1 key2 equal? + * null null N + * foo null N + * null foo N + * foo foo Y + * (1,1) (1,1) Y + * (1,) (1,) N + * (1,2) (1,2) Y + * + * @param o + * @return + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IndexedKey that = (IndexedKey) o; + if (index == that.index) { + if (key == null && that.key == null) { + return true; + } else if (key == null || that.key == null) { + return false; + } else { + if (key instanceof DateTime) { + //In case of DateTime we use the less strict isEqual method so that + //e.g. 2017.01.01T10:00:00.000Z and 2017.01.01T11:00:00.000+01:00 are equal + return ((DateTime) key).isEqual((DateTime)that.key); + } else { + return key.equals(that.key); + } + } + } else { + if (key == null || that.key == null) { + return false; + } else if (key.equals(that.key) && !containNullfields(key)) { + return true; + } else { + return false; + } + } + } + + private boolean containNullfields(Object key) { + if (key instanceof Tuple) { + for (int i = 0; i < ((Tuple) key).size(); i++) { + try { + if (((Tuple) key).get(i) == null) { + return true; + } + } catch (ExecException e) { + throw new RuntimeException("exception found in " + + "containNullfields", e); + + } + } + } + return false; + + } + + /** + * Calculate hashCode by index and key + * if key is empty, return index value + * if key is not empty, return the key.hashCode() + */ + @Override + public int hashCode() { + int result = 0; + if (key == null) { + result = (int) index; + } else { + if (key instanceof DateTime) { + //In case of DateTime we use a custom hashCode() to avoid chronology taking part in the hash value + DateTime dt = (DateTime)key; + result = (int) (dt.getMillis() ^ dt.getMillis() >>> 32); + } else { + result = key.hashCode(); + } + } + return result; + } + + //firstly compare the index + //secondly compare the key (both first and secondary key) + @Override + public int compareTo(Object o) { + IndexedKey that = (IndexedKey) o; + int res = index - that.getIndex(); + if (res > 0) { + return 1; + } else if (res < 0) { + return -1; + } else { + if (useSecondaryKey) { + Tuple thisCompoundKey = (Tuple) key; + Tuple thatCompoundKey = (Tuple)that.getKey(); + PigSecondaryKeyComparatorSpark comparator = new PigSecondaryKeyComparatorSpark(secondarySortOrder); + return comparator.compareCompoundKey(thisCompoundKey, thatCompoundKey); + } else { + return DataType.compare(key, that.getKey()); + } + } + } + + public void setUseSecondaryKey(boolean useSecondaryKey) { + this.useSecondaryKey = useSecondaryKey; + } + + public void setSecondarySortOrder(boolean[] secondarySortOrder) { + this.secondarySortOrder = secondarySortOrder; + } +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java Mon May 29 15:00:39 2017 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.util.Iterator; + +abstract class IteratorTransform<IN, OUT> implements Iterator<OUT> { + private Iterator<IN> delegate; + + public IteratorTransform(Iterator<IN> delegate) { + super(); + this.delegate = delegate; + } + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public OUT next() { + return transform(delegate.next()); + } + + abstract protected OUT transform(IN next); + + @Override + public void remove() { + delegate.remove(); + } + +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.rdd.CoGroupedRDD; +import org.apache.spark.rdd.RDD; + +import scala.Product2; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.runtime.AbstractFunction1; + +public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> { + private static final Log LOG = LogFactory + .getLog(JoinGroupSparkConverter.class); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark op) throws IOException { + SparkUtil.assertPredecessorSizeGreaterThan(predecessors, + op, 0); + List<POLocalRearrange> lraOps = op.getLROps(); + POGlobalRearrangeSpark glaOp = op.getGROp(); + POPackage pkgOp = op.getPkgOp(); + int parallelism = SparkPigContext.get().getParallelism(predecessors, glaOp); + List<RDD<Tuple2<IndexedKey, Tuple>>> rddAfterLRA = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>(); + boolean useSecondaryKey = glaOp.isUseSecondaryKey(); + + for (int i = 0; i < predecessors.size(); i++) { + RDD<Tuple> rdd = predecessors.get(i); + rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp), + SparkUtil.<IndexedKey, Tuple>getTuple2Manifest())); + } + if (rddAfterLRA.size() == 1 && useSecondaryKey) { + return SecondaryKeySortUtil.handleSecondarySort(rddAfterLRA.get(0), pkgOp); + } else { + + CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>( + (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions + .asScalaBuffer(rddAfterLRA).toSeq()), + SparkUtil.getPartitioner(glaOp.getCustomPartitioner(), parallelism), + SparkUtil.getManifest(Object.class)); + + RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd = + (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD; + return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp)).rdd(); + } + } + + private static class LocalRearrangeFunction extends + AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable { + + private final POLocalRearrange lra; + + private boolean useSecondaryKey; + private boolean[] secondarySortOrder; + + public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) { + if( glaOp.isUseSecondaryKey()) { + this.useSecondaryKey = glaOp.isUseSecondaryKey(); + this.secondarySortOrder = glaOp.getSecondarySortOrder(); + } + this.lra = lra; + } + + @Override + public Tuple2<IndexedKey, Tuple> apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction in " + t); + } + Result result; + try { + lra.setInputs(null); + lra.attachInput(t); + result = lra.getNextTuple(); + + if (result == null) { + throw new RuntimeException( + "Null response found for LocalRearange on tuple: " + + t); + } + + switch (result.returnStatus) { + case POStatus.STATUS_OK: + // (index, key, value without keys) + Tuple resultTuple = (Tuple) result.result; + Object key = resultTuple.get(1); + IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key); + if( useSecondaryKey) { + indexedKey.setUseSecondaryKey(useSecondaryKey); + indexedKey.setSecondarySortOrder(secondarySortOrder); + } + Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey, + (Tuple) resultTuple.get(2)); + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction out " + out); + } + return out; + default: + throw new RuntimeException( + "Unexpected response code from operator " + + lra + " : " + result); + } + } catch (ExecException e) { + throw new RuntimeException( + "Couldn't do LocalRearange on tuple: " + t, e); + } + } + + } + + /** + * Send cogroup output where each element is {key, bag[]} to PoPackage + * then call PoPackage#getNextTuple to get the result + */ + private static class GroupPkgFunction implements + Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable { + + private final POPackage pkgOp; + + public GroupPkgFunction(POPackage pkgOp) { + this.pkgOp = pkgOp; + } + + @Override + public Tuple call(final Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("GroupPkgFunction in " + input); + } + + final PigNullableWritable key = new PigNullableWritable() { + + public Object getValueAsPigType() { + IndexedKey keyTuple = input._1(); + return keyTuple.getKey(); + } + }; + Object obj = input._2(); + // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq + Seq<Tuple>[] bags = (Seq<Tuple>[]) obj; + int i = 0; + List<Iterator<NullableTuple>> tupleIterators = new ArrayList<Iterator<NullableTuple>>(); + for (int j = 0; j < bags.length; j++) { + Seq<Tuple> bag = bags[j]; + Iterator<Tuple> iterator = JavaConversions + .asJavaCollection(bag).iterator(); + final int index = i; + tupleIterators.add(new IteratorTransform<Tuple, NullableTuple>( + iterator) { + @Override + protected NullableTuple transform(Tuple next) { + NullableTuple nullableTuple = new NullableTuple(next); + nullableTuple.setIndex((byte) index); + return nullableTuple; + } + }); + ++i; + } + + + pkgOp.setInputs(null); + pkgOp.attachInput(key, new IteratorUnion<NullableTuple>(tupleIterators.iterator())); + Result result = pkgOp.getNextTuple(); + if (result == null) { + throw new RuntimeException( + "Null response found for Package on key: " + key); + } + Tuple out; + switch (result.returnStatus) { + case POStatus.STATUS_OK: + // (key, {(value)...}) + out = (Tuple) result.result; + break; + case POStatus.STATUS_NULL: + out = null; + break; + default: + throw new RuntimeException( + "Unexpected response code from operator " + + pkgOp + " : " + result + " " + + result.returnStatus); + } + if (LOG.isDebugEnabled()) { + LOG.debug("GroupPkgFunction out " + out); + } + return out; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + + private static class IteratorUnion<T> implements Iterator<T> { + + private final Iterator<Iterator<T>> iterators; + + private Iterator<T> current; + + public IteratorUnion(Iterator<Iterator<T>> iterators) { + super(); + this.iterators = iterators; + } + + @Override + public boolean hasNext() { + if (current != null && current.hasNext()) { + return true; + } else if (iterators.hasNext()) { + current = iterators.next(); + return hasNext(); + } else { + return false; + } + } + + @Override + public T next() { + return current.next(); + } + + @Override + public void remove() { + current.remove(); + } + + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +@SuppressWarnings({ "serial" }) +public class LimitConverter implements RDDConverter<Tuple, Tuple, POLimit> { + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POLimit poLimit) + throws IOException { + SparkUtil.assertPredecessorSize(predecessors, poLimit, 1); + RDD<Tuple> rdd = predecessors.get(0); + LimitFunction limitFunction = new LimitFunction(poLimit); + RDD<Tuple> rdd2 = rdd.coalesce(1, false, null); + return rdd2.toJavaRDD().mapPartitions(limitFunction, false).rdd(); + } + + private static class LimitFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> { + + private final POLimit poLimit; + + public LimitFunction(POLimit poLimit) { + this.poLimit = poLimit; + } + + @Override + public Iterable<Tuple> call(final Iterator<Tuple> tuples) { + + return new Iterable<Tuple>() { + + public Iterator<Tuple> iterator() { + return new OutputConsumerIterator(tuples) { + + @Override + protected void attach(Tuple tuple) { + poLimit.setInputs(null); + poLimit.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + return poLimit.getNextTuple(); + } + + @Override + protected void endOfInput() { + } + }; + } + }; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import scala.Function1; +import scala.Tuple2; +import scala.runtime.AbstractFunction1; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.LoadFunc; +import org.apache.pig.PigConstants; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.spark.SparkCounters; +import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter; +import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; +import org.apache.spark.SparkContext; +import org.apache.spark.TaskContext; +import org.apache.spark.rdd.RDD; + +import com.google.common.collect.Lists; + +/** + * Converter that loads data via POLoad and converts it to RRD<Tuple>. Abuses + * the interface a bit in that there is no input RRD to convert in this case. + * Instead input is the source path of the POLoad. + */ +@SuppressWarnings({ "serial" }) +public class LoadConverter implements RDDConverter<Tuple, Tuple, POLoad> { + private static Log LOG = LogFactory.getLog(LoadConverter.class); + + private PigContext pigContext; + private PhysicalPlan physicalPlan; + private SparkContext sparkContext; + private JobConf jobConf; + private SparkEngineConf sparkEngineConf; + + public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan, + SparkContext sparkContext, JobConf jobConf, SparkEngineConf sparkEngineConf) { + this.pigContext = pigContext; + this.physicalPlan = physicalPlan; + this.sparkContext = sparkContext; + this.jobConf = jobConf; + this.sparkEngineConf = sparkEngineConf; + } + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessorRdds, POLoad op) + throws IOException { + configureLoader(physicalPlan, op, jobConf); + + // Set the input directory for input formats that are backed by a + // filesystem. (Does not apply to HBase, for example). + jobConf.set("mapreduce.input.fileinputformat.inputdir", + op.getLFile().getFileName()); + + // internally set pig.noSplitCombination as true ONLY for + // the POLoad operator which has POMergeJoin successor. + if (hasMergeJoinSuccessor(op)) { + jobConf.set("pig.noSplitCombination", "true"); + } + + + //serialize the UDFContext#udfConfs in jobConf + UDFContext.getUDFContext().serialize(jobConf); + + //SparkContext.newAPIHadoop will broadcast the jobConf to other worker nodes. + //Later in PigInputFormatSpark#createRecordReader, jobConf will be used to + //initialize PigContext,UDFContext and SchemaTupleBackend. + RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD( + jobConf, PigInputFormatSpark.class, Text.class, Tuple.class); + + registerUdfFiles(); + + ToTupleFunction ttf = new ToTupleFunction(sparkEngineConf); + + //create SparkCounter and set it for ToTupleFunction + boolean disableCounter = jobConf.getBoolean("pig.disable.counter", false); + if (!op.isTmpLoad() && !disableCounter) { + String counterName = SparkStatsUtil.getCounterName(op); + SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance(); + if (counterReporter.getCounters() != null) { + counterReporter.getCounters().createCounter( + SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP, + counterName); + } + + ttf.setDisableCounter(disableCounter); + ttf.setCounterGroupName(SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP); + ttf.setCounterName(counterName); + ttf.setSparkCounters(SparkPigStatusReporter.getInstance().getCounters()); + } + + // map to get just RDD<Tuple> + return hadoopRDD.map(ttf, + SparkUtil.getManifest(Tuple.class)); + } + + private void registerUdfFiles() throws MalformedURLException{ + Map<String, File> scriptFiles = pigContext.getScriptFiles(); + for (Map.Entry<String, File> scriptFile : scriptFiles.entrySet()) { + File script = scriptFile.getValue(); + if (script.exists()) { + sparkContext.addFile(script.toURI().toURL().toExternalForm()); + } + } + } + + private static class ToTupleFunction extends + AbstractFunction1<Tuple2<Text, Tuple>, Tuple> implements + Function1<Tuple2<Text, Tuple>, Tuple>, Serializable { + + private String counterGroupName; + private String counterName; + private SparkCounters sparkCounters; + private boolean disableCounter; + private SparkEngineConf sparkEngineConf; + private boolean initialized; + + public ToTupleFunction(SparkEngineConf sparkEngineConf){ + this.sparkEngineConf = sparkEngineConf; + } + + @Override + public Tuple apply(Tuple2<Text, Tuple> v1) { + if (!initialized) { + long partitionId = TaskContext.get().partitionId(); + Configuration jobConf = PigMapReduce.sJobConfInternal.get(); + jobConf.set(PigConstants.TASK_INDEX, Long.toString(partitionId)); + jobConf.set(MRConfiguration.TASK_ID, Long.toString(partitionId)); + initialized = true; + } + if (sparkCounters != null && disableCounter == false) { + sparkCounters.increment(counterGroupName, counterName, 1L); + } + return v1._2(); + } + + public void setCounterGroupName(String counterGroupName) { + this.counterGroupName = counterGroupName; + } + + public void setCounterName(String counterName) { + this.counterName = counterName; + } + + public void setSparkCounters(SparkCounters sparkCounters) { + this.sparkCounters = sparkCounters; + } + + public void setDisableCounter(boolean disableCounter) { + this.disableCounter = disableCounter; + } + } + + /** + * stolen from JobControlCompiler TODO: refactor it to share this + * + * @param physicalPlan + * @param poLoad + * @param jobConf + * @return + * @throws java.io.IOException + */ + private static JobConf configureLoader(PhysicalPlan physicalPlan, + POLoad poLoad, JobConf jobConf) throws IOException { + + Job job = new Job(jobConf); + LoadFunc loadFunc = poLoad.getLoadFunc(); + + loadFunc.setLocation(poLoad.getLFile().getFileName(), job); + + // stolen from JobControlCompiler + ArrayList<FileSpec> pigInputs = new ArrayList<FileSpec>(); + // Store the inp filespecs + pigInputs.add(poLoad.getLFile()); + + ArrayList<List<OperatorKey>> inpTargets = Lists.newArrayList(); + ArrayList<String> inpSignatures = Lists.newArrayList(); + ArrayList<Long> inpLimits = Lists.newArrayList(); + // Store the target operators for tuples read + // from this input + List<PhysicalOperator> loadSuccessors = physicalPlan + .getSuccessors(poLoad); + List<OperatorKey> loadSuccessorsKeys = Lists.newArrayList(); + if (loadSuccessors != null) { + for (PhysicalOperator loadSuccessor : loadSuccessors) { + loadSuccessorsKeys.add(loadSuccessor.getOperatorKey()); + } + } + inpTargets.add(loadSuccessorsKeys); + inpSignatures.add(poLoad.getSignature()); + inpLimits.add(poLoad.getLimit()); + + jobConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(pigInputs)); + jobConf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets)); + jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, + ObjectSerializer.serialize(inpSignatures)); + jobConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits)); + return jobConf; + } + + private static boolean hasMergeJoinSuccessor(PhysicalOperator op) { + if (op == null || op.getParentPlan() == null) { + return false; + } + List<PhysicalOperator> successors = op.getParentPlan().getSuccessors(op); + if (successors == null ) { + return false; + } + for (PhysicalOperator successor : successors){ + if (successor instanceof POMergeJoin){ + return true; + } + if (hasMergeJoinSuccessor(successor)){ + return true; + } + } + return false; + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import scala.runtime.AbstractFunction1; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.rdd.RDD; + +@SuppressWarnings({ "serial" }) +public class LocalRearrangeConverter implements + RDDConverter<Tuple, Tuple, PhysicalOperator> { + private static final Log LOG = LogFactory + .getLog(LocalRearrangeConverter.class); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + PhysicalOperator physicalOperator) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); + RDD<Tuple> rdd = predecessors.get(0); + // call local rearrange to get key and value + return rdd.map(new LocalRearrangeFunction(physicalOperator), + SparkUtil.getManifest(Tuple.class)); + + } + + private static class LocalRearrangeFunction extends + AbstractFunction1<Tuple, Tuple> implements Serializable { + + private final PhysicalOperator physicalOperator; + + public LocalRearrangeFunction(PhysicalOperator physicalOperator) { + this.physicalOperator = physicalOperator; + } + + @Override + public Tuple apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction in " + t); + } + Result result; + try { + physicalOperator.setInputs(null); + physicalOperator.attachInput(t); + result = physicalOperator.getNextTuple(); + + if (result == null) { + throw new RuntimeException( + "Null response found for LocalRearange on tuple: " + + t); + } + + switch (result.returnStatus) { + case POStatus.STATUS_OK: + // (index, key, value without keys) + Tuple resultTuple = (Tuple) result.result; + if (LOG.isDebugEnabled()) + LOG.debug("LocalRearrangeFunction out " + resultTuple); + return resultTuple; + default: + throw new RuntimeException( + "Unexpected response code from operator " + + physicalOperator + " : " + result); + } + } catch (ExecException e) { + throw new RuntimeException( + "Couldn't do LocalRearange on tuple: " + t, e); + } + } + + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + + +public class MergeCogroupConverter implements RDDConverter<Tuple, Tuple, POMergeCogroup> { + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POMergeCogroup physicalOperator) { + SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); + RDD<Tuple> rdd = predecessors.get(0); + MergeCogroupFunction mergeCogroupFunction = new MergeCogroupFunction(physicalOperator); + return rdd.toJavaRDD().mapPartitions(mergeCogroupFunction, true).rdd(); + } + + private static class MergeCogroupFunction implements + FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + + private POMergeCogroup poMergeCogroup; + + @Override + public Iterable<Tuple> call(final Iterator<Tuple> input) throws Exception { + return new Iterable<Tuple>() { + + @Override + public Iterator<Tuple> iterator() { + return new OutputConsumerIterator(input) { + + @Override + protected void attach(Tuple tuple) { + poMergeCogroup.setInputs(null); + poMergeCogroup.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + return poMergeCogroup.getNextTuple(); + } + + @Override + protected void endOfInput() { + poMergeCogroup.setEndOfInput(true); + } + }; + } + }; + } + + private MergeCogroupFunction(POMergeCogroup poMergeCogroup) { + this.poMergeCogroup = poMergeCogroup; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + + +@SuppressWarnings("serial") +public class MergeJoinConverter implements + RDDConverter<Tuple, Tuple, POMergeJoin> { + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POMergeJoin poMergeJoin) throws IOException { + + SparkUtil.assertPredecessorSize(predecessors, poMergeJoin, 1); + RDD<Tuple> rdd = predecessors.get(0); + MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin); + + return rdd.toJavaRDD().mapPartitions(mergeJoinFunction, true).rdd(); + } + + private static class MergeJoinFunction implements + FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + + private POMergeJoin poMergeJoin; + + private MergeJoinFunction(POMergeJoin poMergeJoin) { + this.poMergeJoin = poMergeJoin; + } + + public Iterable<Tuple> call(final Iterator<Tuple> input) { + + return new Iterable<Tuple>() { + @Override + public Iterator<Tuple> iterator() { + return new OutputConsumerIterator(input) { + + @Override + protected void attach(Tuple tuple) { + poMergeJoin.setInputs(null); + poMergeJoin.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + return poMergeJoin.getNextTuple(); + } + + @Override + protected void endOfInput() { + poMergeJoin.setEndOfInput(true); + } + }; + } + }; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java Mon May 29 15:00:39 2017 @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.data.Tuple; + +abstract class OutputConsumerIterator implements java.util.Iterator<Tuple> { + private final java.util.Iterator<Tuple> input; + private Result result = null; + private boolean returned = true; + private boolean done = false; + + OutputConsumerIterator(java.util.Iterator<Tuple> input) { + this.input = input; + } + + abstract protected void attach(Tuple tuple); + + abstract protected Result getNextResult() throws ExecException; + + /** + * Certain operators may buffer the output. + * We need to flush the last set of records from such operators, + * when we encounter the last input record, before calling + * getNextTuple() for the last time. + */ + abstract protected void endOfInput(); + + private void readNext() { + while (true) { + try { + // result is set in hasNext() call and returned + // to the user in next() call + if (result != null && !returned) { + return; + } + + if (result == null) { + if (!input.hasNext()) { + done = true; + return; + } + Tuple v1 = input.next(); + attach(v1); + } + + if (!input.hasNext()) { + endOfInput(); + } + + result = getNextResult(); + returned = false; + switch (result.returnStatus) { + case POStatus.STATUS_OK: + returned = false; + break; + case POStatus.STATUS_NULL: + returned = true; + break; + case POStatus.STATUS_EOP: + done = !input.hasNext(); + if (!done) { + result = null; + } + break; + case POStatus.STATUS_ERR: + throw new RuntimeException("Error while processing " + result); + } + + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public boolean hasNext() { + readNext(); + return !done; + } + + @Override + public Tuple next() { + readNext(); + if (done) { + throw new RuntimeException("Past the end. Call hasNext() before calling next()"); + } + if (result == null || result.returnStatus != POStatus.STATUS_OK) { + throw new RuntimeException("Unexpected response code from operator: " + + result); + } + returned = true; + return (Tuple) result.result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} \ No newline at end of file
