Author: xuefu
Date: Fri May 22 18:05:27 2015
New Revision: 1681176
URL: http://svn.apache.org/r1681176
Log:
PIG-4549: Set CROSS operation parallelism for Spark engine (Mohit via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1681176&r1=1681175&r2=1681176&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Fri May 22 18:05:27 2015
@@ -94,6 +94,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
+import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -169,6 +170,8 @@ public class SparkLauncher extends Launc
jobConf);
}
+ new ParallelismSetter(sparkplan, jobConf).visit();
+
byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
// Create conversion map, mapping between pig operator and
spark convertor
@@ -455,8 +458,8 @@ public class SparkLauncher extends Launc
private void sparkPlanToRDD(SparkOperPlan sparkPlan,
Map<Class<? extends PhysicalOperator>, POConverter>
convertMap,
- SparkPigStats sparkStats, JobConf jobConf) throws
IOException,
- InterruptedException {
+ SparkPigStats sparkStats, JobConf jobConf)
+ throws IOException, InterruptedException {
Set<Integer> seenJobIDs = new HashSet<Integer>();
if (sparkPlan == null) {
throw new RuntimeException("SparkPlan is null.");
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java?rev=1681176&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
Fri May 22 18:05:27 2015
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.FuncSpec;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.builtin.GFCross;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ParallelismSetter extends SparkOpPlanVisitor {
+ private JobConf jobConf;
+
+ public ParallelismSetter(SparkOperPlan plan, JobConf jobConf) {
+ super(plan, new DependencyOrderWalker<SparkOperator,
SparkOperPlan>(plan));
+ this.jobConf = jobConf;
+ }
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+ if (sparkOp instanceof NativeSparkOperator) {
+ return;
+ }
+
+ if (sparkOp.getCrossKeys() != null) {
+ for (String key : sparkOp.getCrossKeys()) {
+ jobConf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + key,
+ // TODO: Estimate parallelism. For now we are
hard-coding GFCross.DEFAULT_PARALLELISM
+ Integer.toString(96));
+ }
+ }
+ }
+}
\ No newline at end of file
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1681176&r1=1681175&r2=1681176&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Fri May 22 18:05:27 2015
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
@@ -59,8 +61,10 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.builtin.LOG;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -76,7 +80,9 @@ import org.apache.pig.impl.plan.VisitorE
* operators
*/
public class SparkCompiler extends PhyPlanVisitor {
- private PigContext pigContext;
+ private static final Log LOG = LogFactory.getLog(SparkCompiler.class);
+
+ private PigContext pigContext;
// The physicalPlan that is being compiled
private PhysicalPlan physicalPlan;
@@ -121,17 +127,18 @@ public class SparkCompiler extends PhyPl
scope = roots.get(0).getOperatorKey().getScope();
List<PhysicalOperator> leaves = physicalPlan.getLeaves();
- if (!pigContext.inIllustrator)
- for (PhysicalOperator op : leaves) {
- if (!(op instanceof POStore)) {
- int errCode = 2025;
- String msg = "Expected leaf of reduce
physicalPlan to "
- + "always be POStore.
Found "
- +
op.getClass().getSimpleName();
- throw new SparkCompilerException(msg,
errCode,
- PigException.BUG);
- }
- }
+ if (!pigContext.inIllustrator) {
+ for (PhysicalOperator op : leaves) {
+ if (!(op instanceof POStore)) {
+ int errCode = 2025;
+ String msg = "Expected leaf of reduce physicalPlan to "
+ + "always be POStore. Found "
+ + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode,
+ PigException.BUG);
+ }
+ }
+ }
// get all stores and nativeSpark operators, sort them in
order(operator
// id)
@@ -154,7 +161,9 @@ public class SparkCompiler extends PhyPl
Collections.sort(ops);
for (PhysicalOperator op : ops) {
- compile(op);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Starting compile of leaf-level operator " + op);
+ compile(op);
}
}
@@ -171,6 +180,10 @@ public class SparkCompiler extends PhyPl
PlanException, VisitorException {
SparkOperator[] prevCompInp = compiledInputs;
+ if (LOG.isDebugEnabled())
+ LOG.debug("Compiling physical operator " + op +
+ ". Current spark operator is " + curSparkOp);
+
List<PhysicalOperator> predecessors =
physicalPlan.getPredecessors(op);
if (op instanceof PONative) {
// the predecessor (store) has already been processed
@@ -249,7 +262,10 @@ public class SparkCompiler extends PhyPl
}
private SparkOperator getSparkOp() {
- return new SparkOperator(OperatorKey.genOpKey(scope));
+ SparkOperator op = new
SparkOperator(OperatorKey.genOpKey(scope));
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created new Spark operator " + op);
+ return op;
}
public SparkOperPlan getSparkPlan() {
@@ -557,6 +573,10 @@ public class SparkCompiler extends PhyPl
try {
POGlobalRearrangeSpark glbOp = new POGlobalRearrangeSpark(op);
addToPlan(glbOp);
+ if (op.isCross()) {
+ curSparkOp.addCrossKey(op.getOperatorKey().toString());
+ }
+
curSparkOp.customPartitioner = op.getCustomPartitioner();
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
@@ -682,12 +702,14 @@ public class SparkCompiler extends PhyPl
SparkOperator ret = getSparkOp();
sparkPlan.add(ret);
- Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>();
+ Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>();
List<SparkOperator> toBeRemoved = new
ArrayList<SparkOperator>();
List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
for (SparkOperator sparkOp : compiledInputs) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Merging Spark operator" + sparkOp);
toBeRemoved.add(sparkOp);
toBeMerged.add(sparkOp.physicalPlan);
List<SparkOperator> predecessors = sparkPlan
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1681176&r1=1681175&r2=1681176&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Fri May 22 18:05:27 2015
@@ -17,8 +17,10 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -55,8 +57,6 @@ public class SparkOperator extends Opera
public Set<PhysicalOperator> scalars;
- public boolean isUDFComparatorUsed = false;
-
public int requestedParallelism = -1;
private BitSet feature = new BitSet();
@@ -71,6 +71,8 @@ public class SparkOperator extends Opera
private boolean combineSmallSplits = true;
+ private List<String> crossKeys = null;
+
public SparkOperator(OperatorKey k) {
super(k);
physicalPlan = new PhysicalPlan();
@@ -118,6 +120,17 @@ public class SparkOperator extends Opera
v.visitSparkOp(this);
}
+ public void addCrossKey(String key) {
+ if (crossKeys == null) {
+ crossKeys = new ArrayList<String>();
+ }
+ crossKeys.add(key);
+ }
+
+ public List<String> getCrossKeys() {
+ return crossKeys;
+ }
+
public boolean isGroupBy() {
return feature.get(OPER_FEATURE.GROUPBY.ordinal());
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1681176&r1=1681175&r2=1681176&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Fri May
22 18:05:27 2015
@@ -1666,8 +1666,14 @@ public class TestEvalPipeline2 {
return false;
}
});
- // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
- Assert.assertTrue(partFiles.length >= 2);
+
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ // TODO: Fix this when we implement auto-parallelism in Spark
+ Assert.assertTrue(partFiles.length == 1);
+ } else {
+ // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
+ Assert.assertTrue(partFiles.length >= 2);
+ }
// Check the output
Iterator<Tuple> iter = job.getResults();
List<Tuple> results = new ArrayList<Tuple>();