Author: xuefu Date: Mon Oct 17 18:18:43 2016 New Revision: 1765346 URL: http://svn.apache.org/viewvc?rev=1765346&view=rev Log: PIG-4969: Optimize combine case for spark mode (Part 2) (Liyun via Xuefu)
Removed: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1765346&r1=1765345&r2=1765346&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Mon Oct 17 18:18:43 2016 @@ -17,13 +17,13 @@ */ package org.apache.pig.backend.hadoop.executionengine.spark.converter; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.pig.backend.executionengine.ExecException; import scala.Product2; import scala.Tuple2; import scala.collection.JavaConversions; @@ -44,7 +44,7 @@ import org.apache.pig.impl.io.PigNullabl import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.CoGroupedRDD; import org.apache.spark.rdd.RDD; - +import scala.runtime.AbstractFunction1; public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> { private static final Log LOG = LogFactory @@ -63,8 +63,7 @@ public class JoinGroupSparkConverter imp for (int i = 0; i < predecessors.size(); i++) { RDD<Tuple> rdd = predecessors.get(i); - rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp.isUseSecondaryKey(), glaOp - .getSecondarySortOrder()), + rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest())); } if (rddAfterLRA.size() == 1 && useSecondaryKey) { @@ -83,6 +82,67 @@ public class JoinGroupSparkConverter imp } } + private static class LocalRearrangeFunction extends + AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable { + + private final POLocalRearrange lra; + + private boolean useSecondaryKey; + private boolean[] secondarySortOrder; + + public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) { + if( glaOp.isUseSecondaryKey()) { + this.useSecondaryKey = glaOp.isUseSecondaryKey(); + this.secondarySortOrder = glaOp.getSecondarySortOrder(); + } + this.lra = lra; + } + + @Override + public Tuple2<IndexedKey, Tuple> apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction in " + t); + } + Result result; + try { + lra.setInputs(null); + lra.attachInput(t); + result = lra.getNextTuple(); + + if (result == null) { + throw new RuntimeException( + "Null response found for LocalRearange on tuple: " + + t); + } + + switch (result.returnStatus) { + case POStatus.STATUS_OK: + // (index, key, value without keys) + Tuple resultTuple = (Tuple) result.result; + Object key = resultTuple.get(1); + IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key); + if( useSecondaryKey) { + indexedKey.setUseSecondaryKey(useSecondaryKey); + indexedKey.setSecondarySortOrder(secondarySortOrder); + } + Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey, + (Tuple) resultTuple.get(2)); + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction out " + out); + } + return out; + default: + throw new RuntimeException( + "Unexpected response code from operator " + + lra + " : " + result); + } + } catch (ExecException e) { + throw new RuntimeException( + "Couldn't do LocalRearange on tuple: " + t, e); + } + } + + } /** * Send cogroup output where each element is {key, bag[]} to PoPackage Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1765346&r1=1765345&r2=1765346&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Mon Oct 17 18:18:43 2016 @@ -28,7 +28,9 @@ import scala.runtime.AbstractFunction2; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark; import org.apache.pig.data.DataBag; @@ -249,4 +251,74 @@ public class ReduceByConverter implement return packagedTuple; } } + + /** + * Converts incoming locally rearranged tuple, which is of the form + * (index, key, value) into Tuple2<key, Tuple(key, value)> + */ + private static class LocalRearrangeFunction extends + AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable { + + private final POLocalRearrange lra; + + private boolean useSecondaryKey; + private boolean[] secondarySortOrder; + + public LocalRearrangeFunction(POLocalRearrange lra, boolean useSecondaryKey, boolean[] secondarySortOrder) { + if( useSecondaryKey ) { + this.useSecondaryKey = useSecondaryKey; + this.secondarySortOrder = secondarySortOrder; + } + this.lra = lra; + } + + @Override + public Tuple2<IndexedKey, Tuple> apply(Tuple t) { + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction in " + t); + } + Result result; + try { + lra.setInputs(null); + lra.attachInput(t); + result = lra.getNextTuple(); + + if (result == null) { + throw new RuntimeException( + "Null response found for LocalRearange on tuple: " + + t); + } + + switch (result.returnStatus) { + case POStatus.STATUS_OK: + // (index, key, Tuple(key, value)) + Tuple resultTuple = (Tuple) result.result; + Object key = resultTuple.get(1); + IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key); + if( useSecondaryKey) { + indexedKey.setUseSecondaryKey(useSecondaryKey); + indexedKey.setSecondarySortOrder(secondarySortOrder); + } + Tuple outValue = TupleFactory.getInstance().newTuple(); + outValue.append(key); + outValue.append(resultTuple.get(2)); + Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey, + outValue); + if (LOG.isDebugEnabled()) { + LOG.debug("LocalRearrangeFunction out " + out); + } + return out; + default: + throw new RuntimeException( + "Unexpected response code from operator " + + lra + " : " + result); + } + } catch (ExecException e) { + throw new RuntimeException( + "Couldn't do LocalRearange on tuple: " + t, e); + } + } + + } + }