Author: xuefu
Date: Wed Apr 29 00:45:31 2015
New Revision: 1676651
URL: http://svn.apache.org/r1676651
Log:
PIG-4518: SparkOperator should correspond to complete Spark job (Mohit via
Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Apr 29 00:45:31 2015
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark;
+import com.google.common.collect.Lists;
+
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
@@ -32,10 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import com.google.common.collect.Lists;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -101,7 +100,6 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
-
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
@@ -125,13 +123,16 @@ public class SparkLauncher extends Launc
@Override
public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
PigContext pigContext) throws Exception {
- LOG.debug(physicalPlan);
+ if (LOG.isDebugEnabled())
+ LOG.debug(physicalPlan);
JobConf jobConf = SparkUtil.newJobConf(pigContext);
jobConf.set(PigConstants.LOCAL_CODE_DIR,
System.getProperty("java.io.tmpdir"));
SchemaTupleBackend.initialize(jobConf, pigContext);
SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
+ if (LOG.isDebugEnabled())
+ explain(sparkplan, System.out, "text", true);
SparkPigStats sparkStats = (SparkPigStats) pigContext
.getExecutionEngine().instantiatePigStats();
PigStats.start(sparkStats);
@@ -185,7 +186,7 @@ public class SparkLauncher extends Launc
cleanUpSparkJob(pigContext, currentDirectoryPath);
sparkStats.finish();
- return sparkStats;
+ return sparkStats;
}
private void optimize(PigContext pc, SparkOperPlan plan) throws
VisitorException {
@@ -443,6 +444,8 @@ public class SparkLauncher extends Launc
List<SparkOperator> leaves = sparkPlan.getLeaves();
Collections.sort(leaves);
Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new
HashMap();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Converting " + leaves.size() + " Spark
Operators");
for (SparkOperator leaf : leaves) {
Map<OperatorKey, RDD<Tuple>> physicalOpToRdds =
new HashMap();
sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
@@ -494,7 +497,7 @@ public class SparkLauncher extends Launc
predecessorRDDs, convertMap);
sparkOpRdds.put(sparkOperator.getOperatorKey(),
physicalOpRdds.get(leafPO.getOperatorKey()));
- }catch(Exception e) {
+ } catch(Exception e) {
if( e instanceof SparkException) {
LOG.info("throw SparkException, error
founds when running " +
"rdds in spark");
@@ -507,7 +510,7 @@ public class SparkLauncher extends Launc
List<POStore> poStores = PlanHelper.getPhysicalOperators(
sparkOperator.physicalPlan, POStore.class);
if (poStores != null && poStores.size() == 1) {
- POStore poStore = poStores.get(0);
+ POStore poStore = poStores.get(0);
if (!isFail) {
for (int jobID : getJobIDs(seenJobIDs)) {
SparkStatsUtil.waitForJobAddStats(jobID, poStore,
@@ -519,10 +522,10 @@ public class SparkLauncher extends Launc
conf, exception);
}
} else {
- LOG.info(String
- .format(String.format("sparkOperator:{}
does not have POStore or" +
- " sparkOperator has more than 1 POStore,
the size of POStore"),
- sparkOperator.name(),
poStores.size()));
+ LOG.info(String
+
.format(String.format("sparkOperator:{} does not have POStore or" +
+ " sparkOperator has more than 1 POStore. {} is the size of
POStore."),
+ sparkOperator.name(), poStores.size()));
}
}
@@ -542,6 +545,7 @@ public class SparkLauncher extends Launc
convertMap);
predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
}
+
} else {
if (rddsFromPredeSparkOper != null
&& rddsFromPredeSparkOper.size() > 0) {
@@ -552,7 +556,7 @@ public class SparkLauncher extends Launc
POConverter converter =
convertMap.get(physicalOperator.getClass());
if (converter == null) {
throw new IllegalArgumentException(
- "Spork unsupported PhysicalOperator: "
+ physicalOperator);
+ "Pig on Spark does not support Physical
Operator: " + physicalOperator);
}
LOG.info("Converting operator "
@@ -573,9 +577,12 @@ public class SparkLauncher extends Launc
public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
String format, boolean verbose) throws IOException {
SparkOperPlan sparkPlan = compile(pp, pc);
-
ps.println("#-----------------------------------------------------#");
- ps.println("#The Spark node relations are:");
-
ps.println("#-----------------------------------------------------#");
+ explain(sparkPlan, ps, format, verbose);
+ }
+
+ private void explain(SparkOperPlan sparkPlan, PrintStream ps,
+ String format, boolean verbose)
+ throws IOException {
Map<OperatorKey, SparkOperator> allOperKeys =
sparkPlan.getKeys();
List<OperatorKey> operKeyList = new
ArrayList(allOperKeys.keySet());
Collections.sort(operKeyList);
@@ -591,13 +598,14 @@ public class SparkLauncher extends Launc
}
ps.println();
}
+
if (format.equals("text")) {
SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
printer.setVerbose(verbose);
printer.visit();
} else { // TODO: add support for other file format
throw new IOException(
- "Non-text output of explain is not
supported.");
+ "Non-text output of explain is not
supported.");
}
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
Wed Apr 29 00:45:31 2015
@@ -113,6 +113,10 @@ public class PackageConverter implements
(Tuple) next.get(2));
nullableTuple.setIndex(((Number) next.get(0))
.byteValue());
+ if (LOG.isDebugEnabled())
+ LOG.debug("Setting index to " + next.get(0) +
+ " for tuple " + (Tuple)next.get(2) + "
with key " +
+ next.get(1));
return nullableTuple;
} catch (ExecException e) {
throw new RuntimeException(e);
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
Wed Apr 29 00:45:31 2015
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -33,7 +35,9 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.builtin.LOG;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -54,7 +58,8 @@ import com.google.common.collect.Lists;
public class StoreConverter implements
POConverter<Tuple, Tuple2<Text, Tuple>, POStore> {
- private static final FromTupleFunction FROM_TUPLE_FUNCTION = new
FromTupleFunction();
+ private static final Log LOG = LogFactory.getLog(StoreConverter.class);
+ private static final FromTupleFunction FROM_TUPLE_FUNCTION = new
FromTupleFunction();
private PigContext pigContext;
@@ -94,7 +99,10 @@ public class StoreConverter implements
.getFileName(), Text.class, Tuple.class,
PigOutputFormat.class, storeJobConf);
}
- return rddPairs.rdd();
+ RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd();
+ if (LOG.isDebugEnabled())
+ LOG.debug("RDD lineage: " + retRdd.toDebugString());
+ return retRdd;
}
private static POStore configureStorer(JobConf jobConf,
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Wed Apr 29 00:45:31 2015
@@ -308,7 +308,7 @@ public class SparkCompiler extends PhyPl
public void visitDistinct(PODistinct op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator "
@@ -343,7 +343,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitLoad(POLoad op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
int errCode = 2034;
@@ -379,7 +379,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitStore(POStore op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
phyToSparkOpMap.put(op, curSparkOp);
if (op.getSFile() != null &&
op.getSFile().getFuncSpec() != null)
curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
@@ -394,7 +394,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitFilter(POFilter op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
processUDFs(op.getPlan());
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
@@ -408,7 +408,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitCross(POCross op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
int errCode = 2034;
@@ -422,7 +422,7 @@ public class SparkCompiler extends PhyPl
public void visitStream(POStream op) throws VisitorException {
try {
POStreamSpark poStreamSpark = new POStreamSpark(op);
- nonBlocking(poStreamSpark);
+ addToPlan(poStreamSpark);
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
int errCode = 2034;
@@ -435,7 +435,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitSort(POSort op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
POSort sort = op;
long limit = sort.getLimit();
if (limit!=-1) {
@@ -455,7 +455,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitLimit(POLimit op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator "
@@ -468,7 +468,7 @@ public class SparkCompiler extends PhyPl
public void visitLocalRearrange(POLocalRearrange op)
throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
List<PhysicalPlan> plans = op.getPlans();
if (plans != null)
for (PhysicalPlan ep : plans)
@@ -520,7 +520,7 @@ public class SparkCompiler extends PhyPl
}
try {
- nonBlocking(op);
+ addToPlan(op);
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
int errCode = 2034;
@@ -533,7 +533,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitPOForEach(POForEach op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
List<PhysicalPlan> plans = op.getInputPlans();
if (plans != null) {
for (PhysicalPlan ep : plans) {
@@ -553,7 +553,7 @@ public class SparkCompiler extends PhyPl
public void visitGlobalRearrange(POGlobalRearrange op)
throws VisitorException {
try {
- blocking(op);
+ addToPlan(op);
curSparkOp.customPartitioner =
op.getCustomPartitioner();
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
@@ -567,7 +567,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitPackage(POPackage op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
phyToSparkOpMap.put(op, curSparkOp);
if (op.getPkgr().getPackageType() ==
Packager.PackageType.JOIN) {
curSparkOp.markRegularJoin();
@@ -590,7 +590,7 @@ public class SparkCompiler extends PhyPl
@Override
public void visitUnion(POUnion op) throws VisitorException {
try {
- nonBlocking(op);
+ addToPlan(op);
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
int errCode = 2034;
@@ -629,7 +629,7 @@ public class SparkCompiler extends PhyPl
}
}
- private void nonBlocking(PhysicalOperator op) throws PlanException,
+ private void addToPlan(PhysicalOperator op) throws PlanException,
IOException {
SparkOperator sparkOp = null;
if (compiledInputs.length == 1) {
@@ -639,17 +639,6 @@ public class SparkCompiler extends PhyPl
}
sparkOp.physicalPlan.addAsLeaf(op);
curSparkOp = sparkOp;
- }
-
- private void blocking(PhysicalOperator op) throws PlanException,
- IOException {
- SparkOperator sparkOp = getSparkOp();
- sparkPlan.add(sparkOp);
- for (SparkOperator compileInput : compiledInputs) {
- sparkPlan.connect(compileInput, sparkOp);
- }
- sparkOp.physicalPlan.addAsLeaf(op);
- curSparkOp = sparkOp;
}
private SparkOperator merge(SparkOperator[] compiledInputs)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
Wed Apr 29 00:45:31 2015
@@ -17,15 +17,19 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -41,6 +45,8 @@ import org.apache.pig.impl.util.Pair;
* stitched in to the "value"
*/
public class SparkPOPackageAnnotator extends SparkOpPlanVisitor {
+ private static final Log LOG =
LogFactory.getLog(SparkPOPackageAnnotator.class);
+
public SparkPOPackageAnnotator(SparkOperPlan plan) {
super(plan, new DepthFirstWalker<SparkOperator,
SparkOperPlan>(plan));
}
@@ -51,86 +57,56 @@ public class SparkPOPackageAnnotator ext
PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(
sparkOp.physicalPlan);
pkgDiscoverer.visit();
- POPackage pkg = pkgDiscoverer.getPkg();
- if (pkg != null) {
- handlePackage(sparkOp, pkg);
- }
- }
- }
-
- private void handlePackage(SparkOperator pkgSparkOp, POPackage pkg)
- throws VisitorException {
- int lrFound = 0;
- List<SparkOperator> predecessors = this.mPlan
- .getPredecessors(pkgSparkOp);
- if (predecessors != null && predecessors.size() > 0) {
- for (SparkOperator pred : predecessors) {
- lrFound += patchPackage(pred, pkgSparkOp, pkg);
- if (lrFound == pkg.getNumInps()) {
- break;
- }
- }
- }
- if (lrFound != pkg.getNumInps()) {
- int errCode = 2086;
- String msg = "Unexpected problem during optimization.
Could not find all LocalRearrange operators.";
- throw new OptimizerException(msg, errCode,
PigException.BUG);
}
}
- private int patchPackage(SparkOperator pred, SparkOperator pkgSparkOp,
- POPackage pkg) throws VisitorException {
- LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
- pred.physicalPlan, pkg);
- lrDiscoverer.visit();
- // let our caller know if we managed to patch
- // the package
- return lrDiscoverer.getLoRearrangeFound();
- }
-
static class PackageDiscoverer extends PhyPlanVisitor {
-
private POPackage pkg;
+ private PhysicalPlan plan;
public PackageDiscoverer(PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator,
PhysicalPlan>(
plan));
+ this.plan = plan;
}
@Override
public void visitPackage(POPackage pkg) throws VisitorException
{
this.pkg = pkg;
- };
-
- /**
- * @return the pkg
- */
- public POPackage getPkg() {
- return pkg;
- }
-
- }
- static class LoRearrangeDiscoverer extends PhyPlanVisitor {
-
- private int loRearrangeFound = 0;
- private POPackage pkg;
+ // Find POLocalRearrange(s) corresponding to this
POPackage
+ PhysicalOperator graOp =
plan.getPredecessors(pkg).get(0);
+ if (! (graOp instanceof POGlobalRearrange)) {
+ throw new OptimizerException("Package
operator is not preceded by " +
+ "GlobalRearrange operator in Spark
Plan", 2087, PigException.BUG);
+ }
- public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
- super(plan, new DepthFirstWalker<PhysicalOperator,
PhysicalPlan>(
- plan));
- this.pkg = pkg;
- }
+ List<PhysicalOperator> lraOps =
plan.getPredecessors(graOp);
+ if (pkg.getNumInps() != lraOps.size()) {
+ throw new OptimizerException("Unexpected problem during
optimization. " +
+ "Could not find all
LocalRearrange operators. Expected " + pkg.getNumInps() +
+ ". Got " + lraOps.size() + ".",
2086, PigException.BUG);
+ }
+ Collections.sort(lraOps);
+ for (PhysicalOperator op : lraOps) {
+ if (! (op instanceof POLocalRearrange)) {
+ throw new
OptimizerException("GlobalRearrange operator can only be preceded by " +
+ "LocalRearrange
operator(s) in Spark Plan", 2087, PigException.BUG);
+ }
+ annotatePkgWithLRA((POLocalRearrange)op);
+ }
+ };
- @Override
- public void visitLocalRearrange(POLocalRearrange lrearrange)
+ private void annotatePkgWithLRA(POLocalRearrange lrearrange)
throws VisitorException {
- loRearrangeFound++;
+
Map<Integer, Pair<Boolean, Map<Integer, Integer>>>
keyInfo;
+ if (LOG.isDebugEnabled())
+ LOG.debug("Annotating package " + pkg + " with
localrearrange operator "
+ + lrearrange + " with index " + lrearrange.getIndex());
if (pkg.getPkgr() instanceof LitePackager) {
if (lrearrange.getIndex() != 0) {
- // Throw some exception here
throw new RuntimeException(
"POLocalRearrange for
POPackageLite cannot have index other than 0, but has index - "
+
lrearrange.getIndex());
@@ -158,17 +134,12 @@ public class SparkPOPackageAnnotator ext
Integer.valueOf(lrearrange.getIndex()),
new Pair<Boolean, Map<Integer,
Integer>>(lrearrange
.isProjectStar(),
lrearrange.getProjectedColsMap()));
+ if (LOG.isDebugEnabled())
+ LOG.debug("KeyInfo for packager for package operator " + pkg + " is "
+ + keyInfo );
pkg.getPkgr().setKeyInfo(keyInfo);
pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
}
-
- /**
- * @return the loRearrangeFound
- */
- public int getLoRearrangeFound() {
- return loRearrangeFound;
- }
-
}
-}
+}
\ No newline at end of file
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
Wed Apr 29 00:45:31 2015
@@ -291,19 +291,25 @@ public class AccumulatorOptimizerUtil {
}
public static void addAccumulatorSpark(PhysicalPlan plan) throws
- VisitorException {
+ VisitorException {
List<PhysicalOperator> pos = plan.getRoots();
if (pos == null || pos.size() == 0) {
return;
}
- // See if this is a POGlobalRearrange
- PhysicalOperator po_globalRearrange = pos.get(0);
- if (!po_globalRearrange.getClass().equals(POGlobalRearrange.class)) {
- return;
+ List<POGlobalRearrange> gras = PlanHelper.getPhysicalOperators(plan,
+ POGlobalRearrange.class);
+
+ for (POGlobalRearrange gra : gras) {
+ addAccumulatorSparkForGRASubDAG(plan, gra);
}
+ }
+
+
+ private static void addAccumulatorSparkForGRASubDAG(PhysicalPlan plan,
+ POGlobalRearrange gra) throws VisitorException {
- List<PhysicalOperator> poPackages =
plan.getSuccessors(po_globalRearrange);
+ List<PhysicalOperator> poPackages = plan.getSuccessors(gra);
if (poPackages == null || poPackages.size() == 0) {
return;
@@ -370,4 +376,4 @@ public class AccumulatorOptimizerUtil {
po_foreach.setAccumulative();
}
}
-}
+}
\ No newline at end of file