Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java Wed Apr 12 02:20:20 2017 @@ -1,137 +1,137 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.backend.hadoop.executionengine.spark.converter; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.rdd.RDD; - -public class CounterConverter implements RDDConverter<Tuple, Tuple, POCounter> { - - private static final Log LOG = LogFactory.getLog(CounterConverter.class); - - @Override - public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, - POCounter poCounter) throws IOException { - SparkUtil.assertPredecessorSize(predecessors, poCounter, 1); - RDD<Tuple> rdd = predecessors.get(0); - CounterConverterFunction f = new CounterConverterFunction(poCounter); - JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true); -// jRdd = jRdd.cache(); - return jRdd.rdd(); - } - - @SuppressWarnings("serial") - private static class CounterConverterFunction implements - Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable { - - private final POCounter poCounter; - private long localCount = 1L; - private long sparkCount = 0L; - - private CounterConverterFunction(POCounter poCounter) { - this.poCounter = poCounter; - } - - @Override - public Iterator<Tuple> call(Integer index, final - Iterator<Tuple> input) { - Tuple inp = null; - Tuple output = null; - long sizeBag = 0L; - - List<Tuple> listOutput = new ArrayList<Tuple>(); - - try { - while (input.hasNext()) { - inp = input.next(); - output = TupleFactory.getInstance() - .newTuple(inp.getAll().size() + 3); - - for (int i = 0; i < inp.getAll().size(); i++) { - output.set(i + 3, inp.get(i)); - } - - if (poCounter.isRowNumber() || poCounter.isDenseRank()) { - output.set(2, getLocalCounter()); - incrementSparkCounter(); - incrementLocalCounter(); - } else if (!poCounter.isDenseRank()) { - int positionBag = inp.getAll().size()-1; - if (inp.getType(positionBag) == DataType.BAG) { - sizeBag = ((org.apache.pig.data.DefaultAbstractBag) - inp.get(positionBag)).size(); - } - - output.set(2, getLocalCounter()); - - addToSparkCounter(sizeBag); - addToLocalCounter(sizeBag); - } - - output.set(0, index); - output.set(1, getSparkCounter()); - listOutput.add(output); - } - } catch(ExecException e) { - throw new RuntimeException(e); - } - - - return listOutput.iterator(); - } - - private long getLocalCounter() { - return localCount; - } - - private long incrementLocalCounter() { - return localCount++; - } - - private long addToLocalCounter(long amount) { - return localCount += amount; - } - - private long getSparkCounter() { - return sparkCount; - } - - private long incrementSparkCounter() { - return sparkCount++; - } - - private long addToSparkCounter(long amount) { - return sparkCount += amount; - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.rdd.RDD; + +public class CounterConverter implements RDDConverter<Tuple, Tuple, POCounter> { + + private static final Log LOG = LogFactory.getLog(CounterConverter.class); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POCounter poCounter) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, poCounter, 1); + RDD<Tuple> rdd = predecessors.get(0); + CounterConverterFunction f = new CounterConverterFunction(poCounter); + JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true); +// jRdd = jRdd.cache(); + return jRdd.rdd(); + } + + @SuppressWarnings("serial") + private static class CounterConverterFunction implements + Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable { + + private final POCounter poCounter; + private long localCount = 1L; + private long sparkCount = 0L; + + private CounterConverterFunction(POCounter poCounter) { + this.poCounter = poCounter; + } + + @Override + public Iterator<Tuple> call(Integer index, final + Iterator<Tuple> input) { + Tuple inp = null; + Tuple output = null; + long sizeBag = 0L; + + List<Tuple> listOutput = new ArrayList<Tuple>(); + + try { + while (input.hasNext()) { + inp = input.next(); + output = TupleFactory.getInstance() + .newTuple(inp.getAll().size() + 3); + + for (int i = 0; i < inp.getAll().size(); i++) { + output.set(i + 3, inp.get(i)); + } + + if (poCounter.isRowNumber() || poCounter.isDenseRank()) { + output.set(2, getLocalCounter()); + incrementSparkCounter(); + incrementLocalCounter(); + } else if (!poCounter.isDenseRank()) { + int positionBag = inp.getAll().size()-1; + if (inp.getType(positionBag) == DataType.BAG) { + sizeBag = ((org.apache.pig.data.DefaultAbstractBag) + inp.get(positionBag)).size(); + } + + output.set(2, getLocalCounter()); + + addToSparkCounter(sizeBag); + addToLocalCounter(sizeBag); + } + + output.set(0, index); + output.set(1, getSparkCounter()); + listOutput.add(output); + } + } catch(ExecException e) { + throw new RuntimeException(e); + } + + + return listOutput.iterator(); + } + + private long getLocalCounter() { + return localCount; + } + + private long incrementLocalCounter() { + return localCount++; + } + + private long addToLocalCounter(long amount) { + return localCount += amount; + } + + private long getSparkCounter() { + return sparkCount; + } + + private long incrementSparkCounter() { + return sparkCount++; + } + + private long addToSparkCounter(long amount) { + return sparkCount += amount; + } + } +}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java Wed Apr 12 02:20:20 2017 @@ -21,18 +21,19 @@ import java.io.IOException; import java.io.Serializable; import java.util.List; -import scala.Tuple2; -import scala.runtime.AbstractFunction1; -import scala.runtime.AbstractFunction2; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.spark.rdd.PairRDDFunctions; import org.apache.spark.rdd.RDD; +import scala.Tuple2; +import scala.runtime.AbstractFunction1; +import scala.runtime.AbstractFunction2; + @SuppressWarnings({ "serial" }) public class DistinctConverter implements RDDConverter<Tuple, Tuple, PODistinct> { private static final Log LOG = LogFactory.getLog(DistinctConverter.class); @@ -51,7 +52,7 @@ public class DistinctConverter implement = new PairRDDFunctions<Tuple, Object>(keyValRDD, SparkUtil.getManifest(Tuple.class), SparkUtil.getManifest(Object.class), null); - int parallelism = SparkUtil.getParallelism(predecessors, op); + int parallelism = SparkPigContext.get().getParallelism(predecessors, op); return pairRDDFunctions.reduceByKey( SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism), new MergeValuesFunction()) @@ -66,15 +67,9 @@ public class DistinctConverter implement Serializable { @Override public Tuple2<Tuple, Object> apply(Tuple t) { - if (LOG.isDebugEnabled()) - LOG.debug("DistinctConverter.ToKeyValueFunction in " + t); - Tuple key = t; Object value = null; Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value); - - if (LOG.isDebugEnabled()) - LOG.debug("DistinctConverter.ToKeyValueFunction out " + out); return out; } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Wed Apr 12 02:20:20 2017 @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.spark.api.java.function.FlatMapFunction; @@ -59,7 +60,7 @@ public class FRJoinConverter implements Map<String, List<Tuple>> replicatedInputMap = new HashMap<>(); for (String replicatedInput : replicatedInputs) { - replicatedInputMap.put(replicatedInput, SparkUtil.getBroadcastedVars().get(replicatedInput).value()); + replicatedInputMap.put(replicatedInput, SparkPigContext.get().getBroadcastedVars().get(replicatedInput).value()); } poFRJoin.attachInputs(replicatedInputMap); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Wed Apr 12 02:20:20 2017 @@ -23,15 +23,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import scala.Product2; -import scala.Tuple2; -import scala.collection.JavaConversions; -import scala.collection.Seq; -import scala.runtime.AbstractFunction1; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; import org.apache.pig.data.Tuple; @@ -44,6 +39,12 @@ import org.apache.spark.api.java.functio import org.apache.spark.rdd.CoGroupedRDD; import org.apache.spark.rdd.RDD; +import scala.Product2; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.runtime.AbstractFunction1; + @SuppressWarnings({ "serial" }) public class GlobalRearrangeConverter implements RDDConverter<Tuple, Tuple, POGlobalRearrangeSpark> { @@ -57,7 +58,7 @@ public class GlobalRearrangeConverter im POGlobalRearrangeSpark op) throws IOException { SparkUtil.assertPredecessorSizeGreaterThan(predecessors, op, 0); - int parallelism = SparkUtil.getParallelism(predecessors, + int parallelism = SparkPigContext.get().getParallelism(predecessors, op); // TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input), @@ -322,7 +323,6 @@ public class GlobalRearrangeConverter im try { Tuple tuple = tf.newTuple(3); tuple.set(0, index); - tuple.set(1, key); tuple.set(2, next); return tuple; } catch (ExecException e) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Wed Apr 12 02:20:20 2017 @@ -23,18 +23,14 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.apache.pig.backend.executionengine.ExecException; -import scala.Product2; -import scala.Tuple2; -import scala.collection.JavaConversions; -import scala.collection.Seq; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark; @@ -44,6 +40,11 @@ import org.apache.pig.impl.io.PigNullabl import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.CoGroupedRDD; import org.apache.spark.rdd.RDD; + +import scala.Product2; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; import scala.runtime.AbstractFunction1; public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> { @@ -54,10 +55,10 @@ public class JoinGroupSparkConverter imp public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark op) throws IOException { SparkUtil.assertPredecessorSizeGreaterThan(predecessors, op, 0); - List<POLocalRearrange> lraOps = op.getLraOps(); - POGlobalRearrangeSpark glaOp = op.getGlaOp(); + List<POLocalRearrange> lraOps = op.getLROps(); + POGlobalRearrangeSpark glaOp = op.getGROp(); POPackage pkgOp = op.getPkgOp(); - int parallelism = SparkUtil.getParallelism(predecessors, glaOp); + int parallelism = SparkPigContext.get().getParallelism(predecessors, glaOp); List<RDD<Tuple2<IndexedKey, Tuple>>> rddAfterLRA = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>(); boolean useSecondaryKey = glaOp.isUseSecondaryKey(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Apr 12 02:20:20 2017 @@ -119,7 +119,7 @@ public class LoadConverter implements RD //create SparkCounter and set it for ToTupleFunction boolean disableCounter = jobConf.getBoolean("pig.disable.counter", false); if (!op.isTmpLoad() && !disableCounter) { - String counterName = SparkStatsUtil.getLoadSparkCounterName(op); + String counterName = SparkStatsUtil.getCounterName(op); SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance(); if (counterReporter.getCounters() != null) { counterReporter.getCounters().createCounter( Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java Wed Apr 12 02:20:20 2017 @@ -94,8 +94,7 @@ public class PackageConverter implements .byteValue()); if (LOG.isDebugEnabled()) LOG.debug("Setting index to " + next.get(0) + - " for tuple " + (Tuple)next.get(2) + " with key " + - next.get(1)); + " for tuple " + (Tuple)next.get(1)); return nullableTuple; } catch (ExecException e) { throw new RuntimeException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java Wed Apr 12 02:20:20 2017 @@ -1,134 +1,135 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.backend.hadoop.executionengine.spark.converter; - -import java.io.IOException; -import java.io.Serializable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import scala.Tuple2; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; - -public class RankConverter implements RDDConverter<Tuple, Tuple, PORank> { - - private static final Log LOG = LogFactory.getLog(RankConverter.class); - - @Override - public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank) - throws IOException { - int parallelism = SparkUtil.getParallelism(predecessors, poRank); - SparkUtil.assertPredecessorSize(predecessors, poRank, 1); - RDD<Tuple> rdd = predecessors.get(0); - JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD() - .mapToPair(new ToPairRdd()); - JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd - .groupByKey(parallelism); - JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex - .mapToPair(new IndexCounters()); - JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex - .sortByKey(true, parallelism); - Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap(); - JavaRDD<Tuple> finalRdd = rdd.toJavaRDD() - .map(new RankFunction(new HashMap<Integer, Long>(counts))); - return finalRdd.rdd(); - } - - @SuppressWarnings("serial") - private static class ToPairRdd implements - PairFunction<Tuple, Integer, Long>, Serializable { - - @Override - public Tuple2<Integer, Long> call(Tuple t) { - try { - Integer key = (Integer) t.get(0); - Long value = (Long) t.get(1); - return new Tuple2<Integer, Long>(key, value); - } catch (ExecException e) { - throw new RuntimeException(e); - } - } - } - - @SuppressWarnings("serial") - private static class IndexCounters implements - PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>, - Serializable { - @Override - public Tuple2<Integer, Long> call(Tuple2<Integer, - Iterable<Long>> input) { - long lastVaue = 0L; - - for (Long t : input._2()) { - lastVaue = (t > lastVaue) ? t : lastVaue; - } - - return new Tuple2<Integer, Long>(input._1(), lastVaue); - } - } - - @SuppressWarnings("serial") - private static class RankFunction implements Function<Tuple, Tuple>, - Serializable { - private final HashMap<Integer, Long> counts; - - private RankFunction(HashMap<Integer, Long> counts) { - this.counts = counts; - } - - @Override - public Tuple call(Tuple input) throws Exception { - Tuple output = TupleFactory.getInstance() - .newTuple(input.getAll().size() - 2); - - for (int i = 1; i < input.getAll().size() - 2; i ++) { - output.set(i, input.get(i+2)); - } - - long offset = calculateOffset((Integer) input.get(0)); - output.set(0, offset + (Long)input.get(2)); - return output; - } - - private long calculateOffset(Integer index) { - long offset = 0; - - if (index > 0) { - for (int i = 0; i < index; i++) { - if (counts.containsKey(i)) { - offset += counts.get(i); - } - } - } - return offset; - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import scala.Tuple2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; + +public class RankConverter implements RDDConverter<Tuple, Tuple, PORank> { + + private static final Log LOG = LogFactory.getLog(RankConverter.class); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank) + throws IOException { + int parallelism = SparkPigContext.get().getParallelism(predecessors, poRank); + SparkUtil.assertPredecessorSize(predecessors, poRank, 1); + RDD<Tuple> rdd = predecessors.get(0); + JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD() + .mapToPair(new ToPairRdd()); + JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd + .groupByKey(parallelism); + JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex + .mapToPair(new IndexCounters()); + JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex + .sortByKey(true, parallelism); + Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap(); + JavaRDD<Tuple> finalRdd = rdd.toJavaRDD() + .map(new RankFunction(new HashMap<Integer, Long>(counts))); + return finalRdd.rdd(); + } + + @SuppressWarnings("serial") + private static class ToPairRdd implements + PairFunction<Tuple, Integer, Long>, Serializable { + + @Override + public Tuple2<Integer, Long> call(Tuple t) { + try { + Integer key = (Integer) t.get(0); + Long value = (Long) t.get(1); + return new Tuple2<Integer, Long>(key, value); + } catch (ExecException e) { + throw new RuntimeException(e); + } + } + } + + @SuppressWarnings("serial") + private static class IndexCounters implements + PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>, + Serializable { + @Override + public Tuple2<Integer, Long> call(Tuple2<Integer, + Iterable<Long>> input) { + long lastVaue = 0L; + + for (Long t : input._2()) { + lastVaue = (t > lastVaue) ? t : lastVaue; + } + + return new Tuple2<Integer, Long>(input._1(), lastVaue); + } + } + + @SuppressWarnings("serial") + private static class RankFunction implements Function<Tuple, Tuple>, + Serializable { + private final HashMap<Integer, Long> counts; + + private RankFunction(HashMap<Integer, Long> counts) { + this.counts = counts; + } + + @Override + public Tuple call(Tuple input) throws Exception { + Tuple output = TupleFactory.getInstance() + .newTuple(input.getAll().size() - 2); + + for (int i = 1; i < input.getAll().size() - 2; i ++) { + output.set(i, input.get(i+2)); + } + + long offset = calculateOffset((Integer) input.get(0)); + output.set(0, offset + (Long)input.get(2)); + return output; + } + + private long calculateOffset(Integer index) { + long offset = 0; + + if (index > 0) { + for (int i = 0; i < index; i++) { + if (counts.containsKey(i)) { + offset += counts.get(i); + } + } + } + return offset; + } + } +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Wed Apr 12 02:20:20 2017 @@ -31,6 +31,7 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark; import org.apache.pig.data.DataBag; @@ -54,14 +55,14 @@ public class ReduceByConverter implement @Override public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark op) throws IOException { SparkUtil.assertPredecessorSize(predecessors, op, 1); - int parallelism = SparkUtil.getParallelism(predecessors, op); + int parallelism = SparkPigContext.get().getParallelism(predecessors, op); RDD<Tuple> rdd = predecessors.get(0); RDD<Tuple2<IndexedKey, Tuple>> rddPair - = rdd.map(new LocalRearrangeFunction(op.getLgr(), op.isUseSecondaryKey(), op.getSecondarySortOrder()) + = rdd.map(new LocalRearrangeFunction(op.getLROp(), op.isUseSecondaryKey(), op.getSecondarySortOrder()) , SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()); if (op.isUseSecondaryKey()) { - return SecondaryKeySortUtil.handleSecondarySort(rddPair, op.getPkg()); + return SecondaryKeySortUtil.handleSecondarySort(rddPair, op.getPKGOp()); } else { PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions = new PairRDDFunctions<>(rddPair, @@ -189,8 +190,8 @@ public class ReduceByConverter implement t.append(key); t.append(bag); - poReduce.getPkg().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true}); - Tuple packagedTuple = (Tuple) poReduce.getPkg().getPkgr().getNext().result; + poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true}); + Tuple packagedTuple = (Tuple) poReduce.getPKGOp().getPkgr().getNext().result; // Perform the operation LOG.debug("MergeValuesFunction packagedTuple : " + t); @@ -242,8 +243,8 @@ public class ReduceByConverter implement bag.add((Tuple) v1._2().get(1)); t.append(key); t.append(bag); - poReduce.getPkg().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true}); - packagedTuple = (Tuple) poReduce.getPkg().getPkgr().getNext().result; + poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true}); + packagedTuple = (Tuple) poReduce.getPKGOp().getPkgr().getNext().result; } catch (ExecException e) { throw new RuntimeException(e); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Wed Apr 12 02:20:20 2017 @@ -28,6 +28,7 @@ import java.util.HashMap; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; import org.apache.pig.data.DataBag; import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.util.Pair; @@ -84,10 +85,10 @@ public class SkewedJoinConverter impleme RDD<Tuple> rdd1 = predecessors.get(0); RDD<Tuple> rdd2 = predecessors.get(1); - Broadcast<List<Tuple>> keyDist = SparkUtil.getBroadcastedVars().get(skewedJoinPartitionFile); + Broadcast<List<Tuple>> keyDist = SparkPigContext.get().getBroadcastedVars().get(skewedJoinPartitionFile); // if no keyDist, we need defaultParallelism - Integer defaultParallelism = SparkUtil.getParallelism(predecessors, poSkewedJoin); + Integer defaultParallelism = SparkPigContext.get().getParallelism(predecessors, poSkewedJoin); // with partition id SkewPartitionIndexKeyFunction skewFun = new SkewPartitionIndexKeyFunction(this, keyDist, defaultParallelism); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Wed Apr 12 02:20:20 2017 @@ -28,6 +28,7 @@ import scala.runtime.AbstractFunction1; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.spark.api.java.JavaPairRDD; @@ -46,7 +47,7 @@ public class SortConverter implements RD throws IOException { SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1); RDD<Tuple> rdd = predecessors.get(0); - int parallelism = SparkUtil.getParallelism(predecessors, sortOperator); + int parallelism = SparkPigContext.get().getParallelism(predecessors, sortOperator); RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(), SparkUtil.<Tuple, Object> getTuple2Manifest()); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java Wed Apr 12 02:20:20 2017 @@ -71,7 +71,7 @@ public class StoreConverter implements RDD<Tuple> rdd = predecessors.get(0); SparkPigStatusReporter.getInstance().createCounter(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP, - SparkStatsUtil.getStoreSparkCounterName(op)); + SparkStatsUtil.getCounterName(op)); // convert back to KV pairs JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map( @@ -166,7 +166,7 @@ public class StoreConverter implements if (!op.isTmpStore() && !disableCounter) { ftf.setDisableCounter(disableCounter); ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP); - ftf.setCounterName(SparkStatsUtil.getStoreSparkCounterName(op)); + ftf.setCounterName(SparkStatsUtil.getCounterName(op)); SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance(); ftf.setSparkCounters(counterReporter.getCounters()); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Wed Apr 12 02:20:20 2017 @@ -31,50 +31,50 @@ import org.apache.spark.api.java.functio import org.apache.spark.rdd.RDD; public class StreamConverter implements - RDDConverter<Tuple, Tuple, POStream> { + RDDConverter<Tuple, Tuple, POStream> { - @Override - public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, - POStream poStream) throws IOException { - SparkUtil.assertPredecessorSize(predecessors, poStream, 1); - RDD<Tuple> rdd = predecessors.get(0); - StreamFunction streamFunction = new StreamFunction(poStream); - return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd(); - } - - private static class StreamFunction implements - FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { - private POStream poStream; - - private StreamFunction(POStream poStream) { - this.poStream = poStream; - } - - public Iterable<Tuple> call(final Iterator<Tuple> input) { - return new Iterable<Tuple>() { - @Override - public Iterator<Tuple> iterator() { - return new OutputConsumerIterator(input) { - - @Override - protected void attach(Tuple tuple) { - poStream.setInputs(null); - poStream.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - Result result = poStream.getNextTuple(); - return result; - } - - @Override - protected void endOfInput() { - poStream.setFetchable(true); - } - }; - } - }; - } - } + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POStream poStream) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, poStream, 1); + RDD<Tuple> rdd = predecessors.get(0); + StreamFunction streamFunction = new StreamFunction(poStream); + return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd(); + } + + private static class StreamFunction implements + FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + private POStream poStream; + + private StreamFunction(POStream poStream) { + this.poStream = poStream; + } + + public Iterable<Tuple> call(final Iterator<Tuple> input) { + return new Iterable<Tuple>() { + @Override + public Iterator<Tuple> iterator() { + return new OutputConsumerIterator(input) { + + @Override + protected void attach(Tuple tuple) { + poStream.setInputs(null); + poStream.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + Result result = poStream.getNextTuple(); + return result; + } + + @Override + protected void endOfInput() { + poStream.setFetchable(true); + } + }; + } + }; + } + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java Wed Apr 12 02:20:20 2017 @@ -43,11 +43,11 @@ public class POJoinGroupSpark extends Ph this.pkgOp = pkgOp; } - public List<POLocalRearrange> getLraOps() { + public List<POLocalRearrange> getLROps() { return lraOps; } - public POGlobalRearrangeSpark getGlaOp() { + public POGlobalRearrangeSpark getGROp() { return glaOp; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java Wed Apr 12 02:20:20 2017 @@ -32,11 +32,10 @@ import org.apache.pig.impl.plan.VisitorE public class POPoissonSampleSpark extends POPoissonSample { private static final Log LOG = LogFactory.getLog(POPoissonSampleSpark.class); - private static final long serialVersionUID = 1L; - - + //TODO verify can be removed? + //private static final long serialVersionUID = 1L; // Only for Spark - private boolean endOfInput = false; + private transient boolean endOfInput = false; public boolean isEndOfInput() { return endOfInput; @@ -50,12 +49,6 @@ public class POPoissonSampleSpark extend super(k, rp, sr, hp, tm); } - - @Override - public void visit(PhyPlanVisitor v) throws VisitorException { - v.visitPoissonSampleSpark(this); - } - @Override public Result getNextTuple() throws ExecException { if (!initialized) { @@ -115,10 +108,12 @@ public class POPoissonSampleSpark extend Result pickedSample = newSample; updateSkipInterval((Tuple) pickedSample.result); - LOG.debug("pickedSample:"); - if(pickedSample.result!=null){ - for(int i=0;i<((Tuple) pickedSample.result).size();i++) { - LOG.debug("the "+i+" ele:"+((Tuple) pickedSample.result).get(i)); + if( LOG.isDebugEnabled()) { + LOG.debug("pickedSample:"); + if (pickedSample.result != null) { + for (int i = 0; i < ((Tuple) pickedSample.result).size(); i++) { + LOG.debug("the " + i + " ele:" + ((Tuple) pickedSample.result).get(i)); + } } } return pickedSample; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java Wed Apr 12 02:20:20 2017 @@ -43,7 +43,7 @@ public class POReduceBySpark extends POF this.addOriginalLocation(lr.getAlias(), lr.getOriginalLocations()); } - public POPackage getPkg() { + public POPackage getPKGOp() { return pkg; } @@ -98,7 +98,7 @@ public class POReduceBySpark extends POF this.customPartitioner = customPartitioner; } - public POLocalRearrange getLgr() { + public POLocalRearrange getLROp() { return lr; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java Wed Apr 12 02:20:20 2017 @@ -17,6 +17,10 @@ */ package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; @@ -24,6 +28,8 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; +import java.util.List; + /** * A visitor to optimize plans that determines if a vertex plan can run in * accumulative mode. @@ -31,13 +37,24 @@ import org.apache.pig.impl.plan.VisitorE public class AccumulatorOptimizer extends SparkOpPlanVisitor { public AccumulatorOptimizer(SparkOperPlan plan) { - super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); } @Override public void visitSparkOp(SparkOperator sparkOperator) throws - VisitorException { - AccumulatorOptimizerUtil.addAccumulatorSpark(sparkOperator - .physicalPlan); + VisitorException { + PhysicalPlan plan = sparkOperator.physicalPlan; + List<PhysicalOperator> pos = plan.getRoots(); + if (pos == null || pos.size() == 0) { + return; + } + + List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(plan, + POGlobalRearrange.class); + + for (POGlobalRearrange glr : glrs) { + List<PhysicalOperator> successors = plan.getSuccessors(glr); + AccumulatorOptimizerUtil.addAccumulator(plan, successors); + } } } \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Wed Apr 12 02:20:20 2017 @@ -325,7 +325,7 @@ public class CombinerOptimizer extends S // Update the ReduceBy Operator with the packaging used by Local rearrange. private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException { - Packager pkgr = reduceOperator.getPkg().getPkgr(); + Packager pkgr = reduceOperator.getPKGOp().getPkgr(); // annotate the package with information from the LORearrange // update the keyInfo information if already present in the POPackage Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java Wed Apr 12 02:20:20 2017 @@ -72,7 +72,7 @@ public class JoinGroupOptimizerSpark ext try { restructSparkOp(planWithJoinAndGroup, glrSpark, sparkOp); } catch (PlanException e) { - throw new RuntimeException("GlobalRearrangeDiscover#visitSparkOp fails: ", e); + throw new VisitorException("GlobalRearrangeDiscover#visitSparkOp fails: ", e); } } } @@ -90,10 +90,7 @@ public class JoinGroupOptimizerSpark ext PhysicalPlan currentPlan = this.mCurrentWalker.getPlan();//If there are POSplit, we need traverse the POSplit.getPlans(), so use mCurrentWalker.getPlan() if( currentPlan != null) { plansWithJoinAndGroup.add(currentPlan); - }else{ - LOG.info("GlobalRearrangeDiscover#currentPlan is null"); } - } public List<PhysicalPlan> getPlansWithJoinAndGroup() { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java Wed Apr 12 02:20:20 2017 @@ -21,6 +21,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopFilterRemoverUtil; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -30,7 +31,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.impl.plan.DependencyOrderWalker; -import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; /** @@ -61,29 +61,11 @@ public class NoopFilterRemover extends S if (value instanceof Boolean) { Boolean filterValue = (Boolean) value; if (filterValue) { - removeFilter(filter, sparkOp.physicalPlan); + NoopFilterRemoverUtil.removeFilter(filter, sparkOp.physicalPlan); } } } } } } - - private void removeFilter(POFilter filter, PhysicalPlan plan) { - if (plan.size() > 1) { - try { - List<PhysicalOperator> fInputs = filter.getInputs(); - List<PhysicalOperator> sucs = plan.getSuccessors(filter); - - plan.removeAndReconnect(filter); - if (sucs != null && sucs.size() != 0) { - for (PhysicalOperator suc : sucs) { - suc.setInputs(fInputs); - } - } - } catch (PlanException pe) { - log.info("Couldn't remove a filter in optimizer: " + pe.getMessage()); - } - } - } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java Wed Apr 12 02:20:20 2017 @@ -64,13 +64,13 @@ public class SecondaryKeyOptimizerSpark List<POLocalRearrange> rearranges = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLocalRearrange.class); if (rearranges.isEmpty()) { if (LOG.isDebugEnabled()) { - LOG.debug("No POLocalRearranges found in the sparkOperator.Secondary key optimization is no need"); + LOG.debug("No POLocalRearranges found in the spark operator" + sparkOperator.getOperatorKey() + ". Skipping secondary key optimization."); } return; } /** - * When every POLocalRearrange is encounted in the sparkOperator.physicalPlan, + * When ever POLocalRearrange is encountered in the sparkOperator.physicalPlan, * the sub-physicalplan between the previousLR(or root) to currentLR is considered as mapPlan(like what * we call in mapreduce) and the sub-physicalplan between the POGlobalRearrange(the successor of currentLR) and * nextLR(or leaf) is considered as reducePlan(like what we call in mapreduce). After mapPlan and reducePlan are got, @@ -109,9 +109,8 @@ public class SecondaryKeyOptimizerSpark // The POLocalRearrange is sub-plan of a POSplit mapPlan = PlanHelper.getLocalRearrangePlanFromSplit(mapPlan, currentLR.getOperatorKey()); } - - SecondaryKeyOptimizerUtil.setIsSparkMode(true); - SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan, reducePlan); + SparkSecondaryKeyOptimizerUtil sparkSecondaryKeyOptUtil = new SparkSecondaryKeyOptimizerUtil(); + SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = sparkSecondaryKeyOptUtil.applySecondaryKeySort(mapPlan, reducePlan); if (info != null) { numSortRemoved += info.getNumSortRemoved(); numDistinctChanged += info.getNumDistinctChanged(); Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java?rev=1791060&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java Wed Apr 12 02:20:20 2017 @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil; + +import java.util.List; + +public class SparkSecondaryKeyOptimizerUtil extends SecondaryKeyOptimizerUtil{ + private static Log log = LogFactory.getLog(SparkSecondaryKeyOptimizerUtil.class); + + @Override + protected PhysicalOperator getCurrentNode(PhysicalOperator root, PhysicalPlan reducePlan) { + PhysicalOperator currentNode = null; + + if (!(root instanceof POGlobalRearrange)) { + log.debug("Expected reduce root to be a POGlobalRearrange, skip secondary key optimizing"); + currentNode = null; + } else { + List<PhysicalOperator> globalRearrangeSuccs = reducePlan + .getSuccessors(root); + if (globalRearrangeSuccs.size() == 1) { + currentNode = globalRearrangeSuccs.get(0); + } else { + log.debug("Expected successor of a POGlobalRearrange is POPackage, skip secondary key optimizing"); + currentNode = null; + } + } + + return currentNode; + } +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java Wed Apr 12 02:20:20 2017 @@ -41,8 +41,8 @@ public class DotSparkPrinter extends Dot DotSparkPrinter.InnerOperator, DotSparkPrinter.InnerPlan> { - static int counter = 0; - boolean isVerboseNesting = true; + private static int counter = 0; + private boolean isVerboseNesting = true; public DotSparkPrinter(SparkOperPlan plan, PrintStream ps) { this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>(),