Author: xuefu
Date: Wed Feb 3 12:50:40 2016
New Revision: 1728303
URL: http://svn.apache.org/viewvc?rev=1728303&view=rev
Log:
PIG-4783: Refactor SparkLauncher for spark engine (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1728303&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
Wed Feb 3 12:50:40 2016
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+
+import com.google.common.collect.Lists;
+
+public class JobGraphBuilder extends SparkOpPlanVisitor {
+
+ private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
+
+ private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap =
null;
+ private SparkPigStats sparkStats = null;
+ private JavaSparkContext sparkContext = null;
+ private JobMetricsListener jobMetricsListener = null;
+ private String jobGroupID = null;
+ private Set<Integer> seenJobIDs = new HashSet<Integer>();
+ private SparkOperPlan sparkPlan = null;
+ private Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new
HashMap<OperatorKey, RDD<Tuple>>();
+ private Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new
HashMap<OperatorKey, RDD<Tuple>>();
+
+ public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends
PhysicalOperator>, RDDConverter> convertMap, SparkPigStats sparkStats,
JavaSparkContext sparkContext, JobMetricsListener jobMetricsListener, String
jobGroupID) {
+ super(plan, new DependencyOrderWalker<SparkOperator,
SparkOperPlan>(plan, true));
+ this.sparkPlan = plan;
+ this.convertMap = convertMap;
+ this.sparkStats = sparkStats;
+ this.sparkContext = sparkContext;
+ this.jobMetricsListener = jobMetricsListener;
+ this.jobGroupID = jobGroupID;
+ }
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+ new PhyPlanSetter(sparkOp.physicalPlan).visit();
+ try {
+ sparkOperToRDD(sparkOp);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("fail to get the rdds of this spark
operator: ", e);
+ } catch (JobCreationException e){
+ throw new RuntimeException("fail to get the rdds of this spark
operator: ", e);
+ }
+ }
+
+ private void sparkOperToRDD(SparkOperator sparkOperator) throws
InterruptedException, VisitorException, JobCreationException {
+ List<SparkOperator> predecessors = sparkPlan
+ .getPredecessors(sparkOperator);
+ Set<OperatorKey> predecessorOfPreviousSparkOp = new
HashSet<OperatorKey>();
+ if (predecessors != null) {
+ for (SparkOperator pred : predecessors) {
+ predecessorOfPreviousSparkOp.add(pred.getOperatorKey());
+ }
+ }
+
+ boolean isFail = false;
+ Exception exception = null;
+ if (sparkOperator instanceof NativeSparkOperator) {
+ ((NativeSparkOperator) sparkOperator).runJob();
+ } else {
+ List<PhysicalOperator> leafPOs =
sparkOperator.physicalPlan.getLeaves();
+
+ //One SparkOperator may have multiple leaves(POStores) after
multiquery feature is enabled
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sparkOperator.physicalPlan have " +
sparkOperator.physicalPlan.getLeaves().size() + " leaves");
+ }
+ for (PhysicalOperator leafPO : leafPOs) {
+ try {
+ physicalToRDD(sparkOperator, sparkOperator.physicalPlan,
leafPO,
+ predecessorOfPreviousSparkOp);
+ sparkOpRdds.put(sparkOperator.getOperatorKey(),
+ physicalOpRdds.get(leafPO.getOperatorKey()));
+ } catch (Exception e) {
+ LOG.error("throw exception in sparkOperToRDD: ", e);
+ exception = e;
+ isFail = true;
+ }
+ }
+
+
+ List<POStore> poStores = PlanHelper.getPhysicalOperators(
+ sparkOperator.physicalPlan, POStore.class);
+ Collections.sort(poStores);
+ if (poStores.size() > 0) {
+ int i = 0;
+ if (!isFail) {
+ List<Integer> jobIDs = getJobIDs(seenJobIDs);
+ for (POStore poStore : poStores) {
+ SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++),
poStore, sparkOperator,
+ jobMetricsListener, sparkContext, sparkStats);
+ }
+ } else {
+ for (POStore poStore : poStores) {
+ String failJobID =
sparkOperator.name().concat("_fail");
+ SparkStatsUtil.addFailJobStats(failJobID, poStore,
sparkOperator, sparkStats, exception);
+ }
+ }
+ }
+ }
+ }
+
+ private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan plan,
+ PhysicalOperator physicalOperator,
+ Set<OperatorKey> predsFromPreviousSparkOper)
+ throws IOException {
+ RDD<Tuple> nextRDD = null;
+ List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
+ .getPredecessors(physicalOperator);
+ if (predecessorsOfCurrentPhysicalOp != null &&
predecessorsOfCurrentPhysicalOp.size() > 1) {
+ Collections.sort(predecessorsOfCurrentPhysicalOp);
+ }
+
+ Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
+ addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator,
operatorKeysOfAllPreds);
+ if (predecessorsOfCurrentPhysicalOp != null) {
+ for (PhysicalOperator predecessor :
predecessorsOfCurrentPhysicalOp) {
+ physicalToRDD(sparkOperator, plan, predecessor,
predsFromPreviousSparkOper);
+ operatorKeysOfAllPreds.add(predecessor.getOperatorKey());
+ }
+
+ } else {
+ if (predsFromPreviousSparkOper != null
+ && predsFromPreviousSparkOper.size() > 0) {
+ for (OperatorKey predFromPreviousSparkOper :
predsFromPreviousSparkOper) {
+ operatorKeysOfAllPreds.add(predFromPreviousSparkOper);
+ }
+ }
+ }
+
+
+ if (physicalOperator instanceof POSplit) {
+ List<PhysicalPlan> successorPlans = ((POSplit)
physicalOperator).getPlans();
+ for (PhysicalPlan successPlan : successorPlans) {
+ List<PhysicalOperator> leavesOfSuccessPlan =
successPlan.getLeaves();
+ if (leavesOfSuccessPlan.size() != 1) {
+ LOG.error("the size of leaves of SuccessPlan should be 1");
+ break;
+ }
+ PhysicalOperator leafOfSuccessPlan =
leavesOfSuccessPlan.get(0);
+ physicalToRDD(sparkOperator, successPlan, leafOfSuccessPlan,
operatorKeysOfAllPreds);
+ }
+ } else {
+ RDDConverter converter =
convertMap.get(physicalOperator.getClass());
+ if (converter == null) {
+ throw new IllegalArgumentException(
+ "Pig on Spark does not support Physical Operator: " +
physicalOperator);
+ }
+
+ LOG.info("Converting operator "
+ + physicalOperator.getClass().getSimpleName() + " "
+ + physicalOperator);
+ List<RDD<Tuple>> allPredRDDs =
sortPredecessorRDDs(operatorKeysOfAllPreds);
+ nextRDD = converter.convert(allPredRDDs, physicalOperator);
+
+ if (nextRDD == null) {
+ throw new IllegalArgumentException(
+ "RDD should not be null after PhysicalOperator: "
+ + physicalOperator);
+ }
+
+ physicalOpRdds.put(physicalOperator.getOperatorKey(), nextRDD);
+ }
+ }
+
+ //get all rdds of predecessors sorted by the OperatorKey
+ private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey>
operatorKeysOfAllPreds) {
+ List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+ List<OperatorKey> operatorKeyOfAllPreds =
Lists.newArrayList(operatorKeysOfAllPreds);
+ Collections.sort(operatorKeyOfAllPreds);
+ for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
+ predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred));
+ }
+ return predecessorRDDs;
+ }
+
+ //deal special cases containing operators with multiple predecessors when
multiquery is enabled to get the predecessors of specified
+ // physicalOp in previous SparkOp(see PIG-4675)
+ private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator,
PhysicalOperator physicalOperator, Set<OperatorKey> operatorKeysOfPredecessors)
{
+ // the relationship is stored in
sparkOperator.getMultiQueryOptimizeConnectionItem()
+ List<OperatorKey> predOperatorKeys =
sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey());
+ if (predOperatorKeys != null) {
+ for (OperatorKey predOperator : predOperatorKeys) {
+ LOG.debug(String.format("add predecessor(OperatorKey:%s) for
OperatorKey:%s", predOperator, physicalOperator.getOperatorKey()));
+ operatorKeysOfPredecessors.add(predOperator);
+ }
+ }
+ }
+
+ /**
+ * In Spark, currently only async actions return job id. There is no async
+ * equivalent of actions like saveAsNewAPIHadoopFile()
+ * <p/>
+ * The only other way to get a job id is to register a "job group ID" with
+ * the spark context and request all job ids corresponding to that job
group
+ * via getJobIdsForGroup.
+ * <p/>
+ * However getJobIdsForGroup does not guarantee the order of the elements
in
+ * it's result.
+ * <p/>
+ * This method simply returns the previously unseen job ids.
+ *
+ * @param seenJobIDs job ids in the job group that are already seen
+ * @return Spark job ids not seen before
+ */
+ private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+ Set<Integer> groupjobIDs = new HashSet<Integer>(
+ Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
+ .getJobIdsForGroup(jobGroupID))));
+ groupjobIDs.removeAll(seenJobIDs);
+ List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
+ if (unseenJobIDs.size() == 0) {
+ throw new RuntimeException("Expected at least one unseen jobID "
+ + " in this call to getJobIdsForGroup, but got "
+ + unseenJobIDs.size());
+ }
+ seenJobIDs.addAll(unseenJobIDs);
+ return unseenJobIDs;
+ }
+}
\ No newline at end of file
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Feb 3 12:50:40 2016
@@ -23,10 +23,8 @@ import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -34,7 +32,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -47,7 +44,6 @@ import org.apache.pig.PigException;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -91,7 +87,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
-import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
@@ -107,21 +102,17 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
-import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
-import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;
@@ -157,7 +148,7 @@ public class SparkLauncher extends Launc
explain(sparkplan, System.out, "text", true);
SparkPigStats sparkStats = (SparkPigStats) pigContext
.getExecutionEngine().instantiatePigStats();
- sparkStats.initialize(pigContext, sparkplan);
+ sparkStats.initialize(pigContext, sparkplan, jobConf);
PigStats.start(sparkStats);
startSparkIfNeeded(pigContext);
@@ -214,13 +205,24 @@ public class SparkLauncher extends Launc
convertMap.put(POReduceBySpark.class, new ReduceByConverter());
convertMap.put(POPreCombinerLocalRearrange.class, new
LocalRearrangeConverter());
- sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
+ uploadUDFJars(sparkplan);
+ new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext,
jobMetricsListener, jobGroupID).visit();
cleanUpSparkJob();
sparkStats.finish();
return sparkStats;
}
+ private void uploadUDFJars(SparkOperPlan sparkplan) throws IOException {
+ UDFJarsFinder udfJarsFinder = new UDFJarsFinder(sparkplan, pigContext);
+ udfJarsFinder.visit();
+ Set<String> udfJars = udfJarsFinder.getUdfJars();
+ for (String udfJar : udfJars) {
+ File jarFile = new File(udfJar);
+ addJarToSparkJobWorkingDirectory(jarFile, jarFile.getName());
+ }
+ }
+
private void optimize(PigContext pc, SparkOperPlan plan) throws
IOException {
Configuration conf =
ConfigurationUtil.toConfiguration(pc.getProperties());
@@ -273,37 +275,6 @@ public class SparkLauncher extends Launc
}
}
- /**
- * In Spark, currently only async actions return job id. There is no async
- * equivalent of actions like saveAsNewAPIHadoopFile()
- * <p/>
- * The only other way to get a job id is to register a "job group ID" with
- * the spark context and request all job ids corresponding to that job
group
- * via getJobIdsForGroup.
- * <p/>
- * However getJobIdsForGroup does not guarantee the order of the elements
in
- * it's result.
- * <p/>
- * This method simply returns the previously unseen job ids.
- *
- * @param seenJobIDs job ids in the job group that are already seen
- * @return Spark job ids not seen before
- */
- private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
- Set<Integer> groupjobIDs = new HashSet<Integer>(
- Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
- .getJobIdsForGroup(jobGroupID))));
- groupjobIDs.removeAll(seenJobIDs);
- List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
- if (unseenJobIDs.size() == 0) {
- throw new RuntimeException("Expected at least one unseen jobID "
- + " in this call to getJobIdsForGroup, but got "
- + unseenJobIDs.size());
- }
- seenJobIDs.addAll(unseenJobIDs);
- return unseenJobIDs;
- }
-
private void cleanUpSparkJob() {
LOG.info("clean up Spark Job");
boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
@@ -442,6 +413,7 @@ public class SparkLauncher extends Launc
SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
pigContext);
sparkCompiler.compile();
+ sparkCompiler.connectSoftLink();
SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
// optimize key - value handling in package
@@ -539,201 +511,6 @@ public class SparkLauncher extends Launc
}
}
- private void sparkPlanToRDD(SparkOperPlan sparkPlan,
- Map<Class<? extends PhysicalOperator>,
RDDConverter> convertMap,
- SparkPigStats sparkStats, JobConf jobConf)
- throws IOException, InterruptedException {
- Set<Integer> seenJobIDs = new HashSet<Integer>();
- if (sparkPlan == null) {
- throw new RuntimeException("SparkPlan is null.");
- }
-
- List<SparkOperator> leaves = sparkPlan.getLeaves();
- Collections.sort(leaves);
- Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Converting " + leaves.size() + " Spark Operators to
RDDs");
- }
-
- for (SparkOperator leaf : leaves) {
- new PhyPlanSetter(leaf.physicalPlan).visit();
- Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap();
- sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
- physicalOpToRdds, convertMap, seenJobIDs, sparkStats,
- jobConf);
- }
- }
-
- private void addUDFJarsToSparkJobWorkingDirectory(SparkOperator leaf)
throws IOException {
-
- for (String udf : leaf.UDFs) {
- Class clazz = pigContext.getClassForAlias(udf);
- if (clazz != null) {
- String jar = JarManager.findContainingJar(clazz);
- if (jar != null) {
- File jarFile = new File(jar);
- addJarToSparkJobWorkingDirectory(jarFile,
jarFile.getName());
- }
- }
- }
- }
-
- private void sparkOperToRDD(SparkOperPlan sparkPlan,
- SparkOperator sparkOperator,
- Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
- Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
- Map<Class<? extends PhysicalOperator>,
RDDConverter> convertMap,
- Set<Integer> seenJobIDs, SparkPigStats
sparkStats, JobConf conf)
- throws IOException, InterruptedException {
- addUDFJarsToSparkJobWorkingDirectory(sparkOperator);
- List<SparkOperator> predecessors = sparkPlan
- .getPredecessors(sparkOperator);
- Set<OperatorKey> predecessorOfPreviousSparkOp = new
HashSet<OperatorKey>();
- if (predecessors != null) {
- for (SparkOperator pred : predecessors) {
- if (sparkOpRdds.get(pred.getOperatorKey()) == null) {
- sparkOperToRDD(sparkPlan, pred, sparkOpRdds,
- physicalOpRdds, convertMap, seenJobIDs, sparkStats,
- conf);
- }
- predecessorOfPreviousSparkOp.add(pred.getOperatorKey());
- }
- }
-
- if (sparkOperator instanceof NativeSparkOperator) {
- ((NativeSparkOperator) sparkOperator).runJob();
- return;
- }
- List<PhysicalOperator> leafPOs =
sparkOperator.physicalPlan.getLeaves();
- boolean isFail = false;
- Exception exception = null;
- //One SparkOperator may have multiple leaves(POStores) after
multiquery feature is enabled
- if (LOG.isDebugEnabled()) {
- LOG.debug("sparkOperator.physicalPlan have " +
sparkOperator.physicalPlan.getLeaves().size() + " leaves");
- }
- for (PhysicalOperator leafPO : leafPOs) {
- try {
- physicalToRDD(sparkOperator, sparkOperator.physicalPlan,
leafPO, physicalOpRdds,
- predecessorOfPreviousSparkOp, convertMap);
- sparkOpRdds.put(sparkOperator.getOperatorKey(),
- physicalOpRdds.get(leafPO.getOperatorKey()));
- } catch (Exception e) {
- LOG.error("throw exception in sparkOperToRDD: ", e);
- exception = e;
- isFail = true;
- }
- }
-
- List<POStore> poStores = PlanHelper.getPhysicalOperators(
- sparkOperator.physicalPlan, POStore.class);
- Collections.sort(poStores);
- if (poStores.size() > 0) {
- int i = 0;
- if (!isFail) {
- List<Integer> jobIDs = getJobIDs(seenJobIDs);
- for (POStore poStore : poStores) {
- SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++),
poStore, sparkOperator,
- jobMetricsListener, sparkContext, sparkStats,
conf);
- }
- } else {
- for (POStore poStore : poStores) {
- String failJobID = sparkOperator.name().concat("_fail");
- SparkStatsUtil.addFailJobStats(failJobID, poStore,
sparkOperator, sparkStats,
- conf, exception);
- }
- }
- }
- }
-
- private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan plan,
- PhysicalOperator physicalOperator,
- Map<OperatorKey, RDD<Tuple>> rdds,
- Set<OperatorKey> predsFromPreviousSparkOper,
- Map<Class<? extends PhysicalOperator>,
RDDConverter> convertMap)
- throws IOException {
- RDD<Tuple> nextRDD = null;
- List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
- .getPredecessors(physicalOperator);
- if (predecessorsOfCurrentPhysicalOp != null &&
predecessorsOfCurrentPhysicalOp.size() > 1) {
- Collections.sort(predecessorsOfCurrentPhysicalOp);
- }
-
- Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
- addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator,
operatorKeysOfAllPreds);
- if (predecessorsOfCurrentPhysicalOp != null) {
- for (PhysicalOperator predecessor :
predecessorsOfCurrentPhysicalOp) {
- physicalToRDD(sparkOperator, plan, predecessor, rdds,
predsFromPreviousSparkOper,
- convertMap);
- operatorKeysOfAllPreds.add(predecessor.getOperatorKey());
- }
-
- } else {
- if (predsFromPreviousSparkOper != null
- && predsFromPreviousSparkOper.size() > 0) {
- for (OperatorKey predFromPreviousSparkOper :
predsFromPreviousSparkOper) {
- operatorKeysOfAllPreds.add(predFromPreviousSparkOper);
- }
- }
- }
-
-
- if (physicalOperator instanceof POSplit) {
- List<PhysicalPlan> successorPlans = ((POSplit)
physicalOperator).getPlans();
- for (PhysicalPlan successPlan : successorPlans) {
- List<PhysicalOperator> leavesOfSuccessPlan =
successPlan.getLeaves();
- if (leavesOfSuccessPlan.size() != 1) {
- LOG.error("the size of leaves of SuccessPlan should be 1");
- break;
- }
- PhysicalOperator leafOfSuccessPlan =
leavesOfSuccessPlan.get(0);
- physicalToRDD(sparkOperator, successPlan, leafOfSuccessPlan,
rdds, operatorKeysOfAllPreds, convertMap);
- }
- } else {
- RDDConverter converter =
convertMap.get(physicalOperator.getClass());
- if (converter == null) {
- throw new IllegalArgumentException(
- "Pig on Spark does not support Physical Operator: " +
physicalOperator);
- }
-
- LOG.info("Converting operator "
- + physicalOperator.getClass().getSimpleName() + " "
- + physicalOperator);
- List<RDD<Tuple>> allPredRDDs =
sortPredecessorRDDs(operatorKeysOfAllPreds, rdds);
- nextRDD = converter.convert(allPredRDDs, physicalOperator);
-
- if (nextRDD == null) {
- throw new IllegalArgumentException(
- "RDD should not be null after PhysicalOperator: "
- + physicalOperator);
- }
-
- rdds.put(physicalOperator.getOperatorKey(), nextRDD);
- }
- }
-
- //get all rdds of predecessors sorted by the OperatorKey
- private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey>
operatorKeysOfAllPreds, Map<OperatorKey, RDD<Tuple>> rdds) {
- List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
- List<OperatorKey> operatorKeyOfAllPreds =
Lists.newArrayList(operatorKeysOfAllPreds);
- Collections.sort(operatorKeyOfAllPreds);
- for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
- predecessorRDDs.add(rdds.get(operatorKeyOfAllPred));
- }
- return predecessorRDDs;
- }
-
- //deal special cases containing operators with multiple predecessors when
multiquery is enabled to get the predecessors of specified
- // physicalOp in previous SparkOp(see PIG-4675)
- private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator,
PhysicalOperator physicalOperator, Set<OperatorKey> operatorKeysOfPredecessors)
{
- // the relationship is stored in
sparkOperator.getMultiQueryOptimizeConnectionItem()
- List<OperatorKey> predOperatorKeys =
sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey());
- if (predOperatorKeys != null) {
- for (OperatorKey predOperator : predOperatorKeys) {
- LOG.debug(String.format("add predecessor(OperatorKey:%s) for
OperatorKey:%s", predOperator, physicalOperator.getOperatorKey()));
- operatorKeysOfPredecessors.add(predOperator);
- }
- }
- }
@Override
public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java?rev=1728303&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
Wed Feb 3 12:50:40 2016
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
+
+//find udf jars which will be downloaded with spark job on every nodes
+public class UDFJarsFinder extends SparkOpPlanVisitor {
+ private PigContext pigContext = null;
+ private Set<String> udfJars = new HashSet();
+
+ public UDFJarsFinder(SparkOperPlan plan, PigContext pigContext) {
+ super(plan, new DependencyOrderWalker(plan));
+ this.pigContext = pigContext;
+ }
+
+ public void visitSparkOp(SparkOperator sparkOp)
+ throws VisitorException {
+ for (String udf : sparkOp.UDFs) {
+ try {
+ Class clazz = this.pigContext.getClassForAlias(udf);
+ if (clazz != null) {
+ String jar = JarManager.findContainingJar(clazz);
+ if (jar != null) {
+ this.udfJars.add(jar);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("pigContext.getClassForAlias(udf)
fail, ", e);
+ }
+ }
+ }
+
+ public Set<String> getUdfJars() {
+ return this.udfJars;
+ }
+}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
Wed Feb 3 12:50:40 2016
@@ -59,7 +59,8 @@ public class NativeSparkOperator extends
} catch (SecurityException se) {
//java.lang.reflect.InvocationTargetException
if (secMan.getExitInvoked()) {
if (secMan.getExitCode() != 0) {
- throw new JobCreationException("Native job returned with
non-zero return code");
+ JobCreationException e = new JobCreationException("Native
job returned with non-zero return code");
+ SparkStatsUtil.addFailedNativeJobStats(PigStats.get(),
this, e);
} else {
SparkStatsUtil.addNativeJobStats(PigStats.get(), this);
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Wed Feb 3 12:50:40 2016
@@ -476,7 +476,8 @@ public class SparkCompiler extends PhyPl
try {
addToPlan(op);
curSparkOp.markLimit();
- } catch (Exception e) {
+ phyToSparkOpMap.put(op, curSparkOp);
+ } catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator "
+ op.getClass().getSimpleName();
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
Wed Feb 3 12:50:40 2016
@@ -47,18 +47,18 @@ public class SparkJobStats extends JobSt
private int jobId;
private Map<String, Long> stats = Maps.newLinkedHashMap();
- protected SparkJobStats(int jobId, PigStats.JobGraph plan) {
- this(String.valueOf(jobId), plan);
+ protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration
conf) {
+ this(String.valueOf(jobId), plan, conf);
this.jobId = jobId;
}
- protected SparkJobStats(String jobId, PigStats.JobGraph plan) {
+ protected SparkJobStats(String jobId, PigStats.JobGraph plan,
Configuration conf) {
super(jobId, plan);
+ setConf(conf);
}
public void addOutputInfo(POStore poStore, boolean success,
- JobMetricsListener jobMetricsListener,
- Configuration conf) {
+ JobMetricsListener jobMetricsListener) {
if (!poStore.isTmpStore()) {
long bytes = getOutputSize(poStore, conf);
long recordsCount =
SparkStatsUtil.getStoreSparkCounterValue(poStore);
@@ -72,8 +72,7 @@ public class SparkJobStats extends JobSt
}
public void addInputStats(POLoad po, boolean success,
- boolean singleInput,
- Configuration conf){
+ boolean singleInput) {
long recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
long bytesRead = -1;
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
Wed Feb 3 12:50:40 2016
@@ -53,26 +53,28 @@ public class SparkPigStats extends PigSt
private SparkScriptState sparkScriptState;
+ private Configuration conf;
+
public SparkPigStats() {
jobPlan = new JobGraph();
this.sparkScriptState = (SparkScriptState) ScriptState.get();
}
- public void initialize(PigContext pigContext, SparkOperPlan sparkPlan){
+ public void initialize(PigContext pigContext, SparkOperPlan sparkPlan,
Configuration conf) {
super.start();
this.pigContext = pigContext;
+ this.conf = conf;
sparkScriptState.setScriptInfo(sparkPlan);
}
public void addJobStats(POStore poStore, SparkOperator sparkOperator, int
jobId,
JobMetricsListener jobMetricsListener,
- JavaSparkContext sparkContext,
- Configuration conf) {
+ JavaSparkContext sparkContext) {
boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
- SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+ SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
jobStats.setSuccessful(isSuccess);
jobStats.collectStats(jobMetricsListener);
- jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+ jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess,
jobMetricsListener, conf);
jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
@@ -82,13 +84,12 @@ public class SparkPigStats extends PigSt
public void addFailJobStats(POStore poStore, SparkOperator sparkOperator,
String jobId,
JobMetricsListener jobMetricsListener,
JavaSparkContext sparkContext,
- Configuration conf,
Exception e) {
boolean isSuccess = false;
- SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+ SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
jobStats.setSuccessful(isSuccess);
jobStats.collectStats(jobMetricsListener);
- jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+ jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess,
jobMetricsListener, conf);
jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
@@ -97,12 +98,12 @@ public class SparkPigStats extends PigSt
}
}
- public void addNativeJobStats(NativeSparkOperator sparkOperator, String
jobId, boolean isSuccess, Exception e){
- SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+ public void addNativeJobStats(NativeSparkOperator sparkOperator, String
jobId, boolean isSuccess, Exception e) {
+ SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
jobStats.setSuccessful(isSuccess);
- jobSparkOperatorMap.put(jobStats,sparkOperator);
+ jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
- if( e != null ){
+ if (e != null) {
jobStats.setBackendException(e);
}
}
@@ -202,7 +203,7 @@ public class SparkPigStats extends PigSt
List<POLoad> poLoads =
PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLoad.class);
for (POLoad load : poLoads) {
if (!load.isTmpLoad()) {
- jobStats.addInputStats(load, isSuccess, (poLoads.size() ==
1), conf);
+ jobStats.addInputStats(load, isSuccess, (poLoads.size() ==
1));
}
}
} catch (VisitorException ve) {
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Wed Feb 3 12:50:40 2016
@@ -41,8 +41,7 @@ public class SparkStatsUtil {
POStore poStore, SparkOperator
sparkOperator,
JobMetricsListener
jobMetricsListener,
JavaSparkContext sparkContext,
- SparkPigStats sparkPigStats,
- JobConf jobConf)
+ SparkPigStats sparkPigStats)
throws InterruptedException {
// Even though we are not making any async calls to spark,
// the SparkStatusTracker can still return RUNNING status
@@ -53,18 +52,18 @@ public class SparkStatsUtil {
// To workaround this, we will wait for this job to "finish".
jobMetricsListener.waitForJobToEnd(jobID);
sparkPigStats.addJobStats(poStore, sparkOperator, jobID,
jobMetricsListener,
- sparkContext, jobConf);
+ sparkContext);
jobMetricsListener.cleanup(jobID);
}
public static void addFailJobStats(String jobID,
POStore poStore, SparkOperator
sparkOperator,
SparkPigStats sparkPigStats,
- JobConf jobConf, Exception e) {
+ Exception e) {
JobMetricsListener jobMetricsListener = null;
JavaSparkContext sparkContext = null;
sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID,
jobMetricsListener,
- sparkContext, jobConf, e);
+ sparkContext, e);
}
public static String getStoreSparkCounterName(POStore store) {
Modified: pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1728303&r1=1728302&r2=1728303&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java Wed
Feb 3 12:50:40 2016
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
@@ -210,6 +211,13 @@ public class TestNativeMapReduce {
} catch (JobCreationException e) {
// Running in Tez mode throw exception
assertTrue(e.getCause() instanceof FileAlreadyExistsException);
+ } catch (ExecException e) {
+ // Running in spark mode throw exception
+ if (e.getCause() instanceof RuntimeException) {
+ RuntimeException re = (RuntimeException) e.getCause();
+ JobCreationException jce = (JobCreationException)
re.getCause();
+ assertTrue(jce.getCause() instanceof
FileAlreadyExistsException);
+ }
}
finally{
// We have to manually delete intermediate mapreduce files