Author: xuefu
Date: Wed Mar 11 01:27:06 2015
New Revision: 1665753

URL: http://svn.apache.org/r1665753
Log:
PIG-4269: Enable unit test TestAccumulator for spark (Liyun via Xuefu)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed Mar 11 01:27:06 2015
@@ -25,7 +25,6 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -86,6 +85,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -106,6 +106,7 @@ import org.apache.spark.api.java.JavaSpa
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
 import org.apache.spark.scheduler.StatsReportListener;
+import org.apache.spark.SparkException;
 
 /**
  * Main class that launches pig for Spark
@@ -210,6 +211,15 @@ public class SparkLauncher extends Launc
                return sparkStats;
        }
 
+       private void optimize(PigContext pc, SparkOperPlan plan) throws 
VisitorException {
+               boolean isAccum =
+                               
Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
+               if (isAccum) {
+                       AccumulatorOptimizer accum = new 
AccumulatorOptimizer(plan);
+                       accum.visit();
+               }
+       }
+
        /**
         * In Spark, currently only async actions return job id. There is no 
async
         * equivalent of actions like saveAsNewAPIHadoopFile()
@@ -396,6 +406,8 @@ public class SparkLauncher extends Launc
                SparkPOPackageAnnotator pkgAnnotator = new 
SparkPOPackageAnnotator(
                                sparkPlan);
                pkgAnnotator.visit();
+
+               optimize(pigContext, sparkPlan);
                return sparkPlan;
        }
 
@@ -501,7 +513,6 @@ public class SparkLauncher extends Launc
                        Map<Class<? extends PhysicalOperator>, POConverter> 
convertMap,
                        Set<Integer> seenJobIDs, SparkPigStats sparkStats, 
JobConf conf)
                        throws IOException, InterruptedException {
-
                List<SparkOperator> predecessors = sparkPlan
                                .getPredecessors(sparkOperator);
                List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
@@ -517,6 +528,8 @@ public class SparkLauncher extends Launc
                }
 
                List<PhysicalOperator> leafPOs = 
sparkOperator.physicalPlan.getLeaves();
+               boolean isFail = false;
+               Exception exception = null;
                if (leafPOs != null && leafPOs.size() != 1) {
                        throw new IllegalArgumentException(
                                        String.format(
@@ -528,19 +541,34 @@ public class SparkLauncher extends Launc
                                                        sparkOperator.name()));
                } else {
                        PhysicalOperator leafPO = leafPOs.get(0);
-                       physicalToRDD(sparkOperator.physicalPlan, leafPO, 
physicalOpRdds,
-                                       predecessorRDDs, convertMap);
-                       sparkOpRdds.put(sparkOperator.getOperatorKey(),
-                                       
physicalOpRdds.get(leafPO.getOperatorKey()));
+                       try {
+                               physicalToRDD(sparkOperator.physicalPlan, 
leafPO, physicalOpRdds,
+                                               predecessorRDDs, convertMap);
+                               sparkOpRdds.put(sparkOperator.getOperatorKey(),
+                                               
physicalOpRdds.get(leafPO.getOperatorKey()));
+                       }catch(Exception e) {
+                               if( e instanceof  SparkException) {
+                                       LOG.info("throw SparkException, error 
founds when running " +
+                                                       "rdds in spark");
+                               }
+                               exception = e;
+                               isFail = true;
+                       }
                }
 
                List<POStore> poStores = PlanHelper.getPhysicalOperators(
                                sparkOperator.physicalPlan, POStore.class);
                if (poStores != null && poStores.size() == 1) {
                        POStore poStore = poStores.get(0);
-                       for (int jobID : getJobIDs(seenJobIDs)) {
-                               SparkStatsUtil.waitForJobAddStats(jobID, 
poStore,
-                                               jobMetricsListener, 
sparkContext, sparkStats, conf);
+                       if( isFail == false ) {
+                               for (int jobID : getJobIDs(seenJobIDs)) {
+                                       
SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+                                                       jobMetricsListener, 
sparkContext, sparkStats, conf);
+                               }
+                       }else{
+                               String failJobID 
=sparkOperator.name().concat("_fail");
+                               SparkStatsUtil.addFailJobStats(failJobID, 
poStore,sparkStats,
+                                               conf,exception);
                        }
                } else {
                        LOG.info(String

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java?rev=1665753&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
 Wed Mar 11 01:27:06 2015
@@ -0,0 +1,26 @@
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to optimize plans that determines if a vertex plan can run in
+ * accumulative mode.
+ */
+public class AccumulatorOptimizer extends SparkOpPlanVisitor {
+
+    public AccumulatorOptimizer(SparkOperPlan plan) {
+               super(plan, new DepthFirstWalker<SparkOperator, 
SparkOperPlan>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOperator) throws
+                       VisitorException {
+        AccumulatorOptimizerUtil.addAccumulatorSpark(sparkOperator
+                .physicalPlan);
+    }
+}
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
 Wed Mar 11 01:27:06 2015
@@ -37,11 +37,14 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
 
 public class AccumulatorOptimizerUtil {
     private static final Log LOG = 
LogFactory.getLog(AccumulatorOptimizerUtil.class);
@@ -286,4 +289,85 @@ public class AccumulatorOptimizerUtil {
 
         return false;
     }
+
+    public static void addAccumulatorSpark(PhysicalPlan plan) throws
+            VisitorException {
+        List<PhysicalOperator> pos = plan.getRoots();
+        if (pos == null || pos.size() == 0) {
+            return;
+        }
+
+        // See if this is a POGlobalRearrange
+        PhysicalOperator po_globalRearrange = pos.get(0);
+        if (!po_globalRearrange.getClass().equals(POGlobalRearrange.class)) {
+            return;
+        }
+
+        List<PhysicalOperator> poPackages = 
plan.getSuccessors(po_globalRearrange);
+
+        if (poPackages == null || poPackages.size() == 0) {
+            return;
+        }
+        // See if this is a POPackage
+        PhysicalOperator po_package = poPackages.get(0);
+        if (!po_package.getClass().equals(POPackage.class)) {
+            return;
+        }
+
+        Packager pkgr = ((POPackage) po_package).getPkgr();
+        // Check that this is a standard package, not a subclass
+        if (!pkgr.getClass().equals(Packager.class)) {
+            return;
+        }
+
+        // if POPackage is for distinct, just return
+        if (pkgr.isDistinct()) {
+            return;
+        }
+
+        // if any input to POPackage is inner, just return
+        boolean[] isInner = pkgr.getInner();
+        for (boolean b : isInner) {
+            if (b) {
+                return;
+            }
+        }
+
+        List<PhysicalOperator> l = plan.getSuccessors(po_package);
+        // there should be only one POForEach
+        if (l == null || l.size() == 0 || l.size() > 1) {
+            return;
+        }
+
+        PhysicalOperator po_foreach = l.get(0);
+        if (!(po_foreach instanceof POForEach)) {
+            return;
+        }
+
+        boolean foundUDF = false;
+        List<PhysicalPlan> list = ((POForEach) po_foreach).getInputPlans();
+        for (PhysicalPlan p : list) {
+            PhysicalOperator po = p.getLeaves().get(0);
+
+            // only expression operators are allowed
+            if (!(po instanceof ExpressionOperator)) {
+                return;
+            }
+
+            if (((ExpressionOperator) po).containUDF()) {
+                foundUDF = true;
+            }
+
+            if (!check(po)) {
+                return;
+            }
+        }
+
+        if (foundUDF) {
+            // if all tests are passed, reducer can run in accumulative mode
+            LOG.info("Reducer is to run in accumulative mode.");
+            po_package.setAccumulative();
+            po_foreach.setAccumulative();
+        }
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
Wed Mar 11 01:27:06 2015
@@ -66,14 +66,15 @@ public class SparkJobStats extends JobSt
       }
   }
 
-  public void collectStats(JobMetricsListener jobMetricsListener) {
-      Map<String, List<TaskMetrics>> taskMetrics = 
jobMetricsListener.getJobMetric(jobId);
-      if (taskMetrics == null) {
-        throw new RuntimeException("No task metrics available for jobId " + 
jobId);
-      }
-
-      stats = combineTaskMetrics(taskMetrics);
-  }
+       public void collectStats(JobMetricsListener jobMetricsListener) {
+               if (jobMetricsListener != null) {
+                       Map<String, List<TaskMetrics>> taskMetrics = 
jobMetricsListener.getJobMetric(jobId);
+                       if (taskMetrics == null) {
+                               throw new RuntimeException("No task metrics 
available for jobId " + jobId);
+                       }
+                       stats = combineTaskMetrics(taskMetrics);
+               }
+       }
 
   public Map<String, Long> getStats() {
       return stats;

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
Wed Mar 11 01:27:06 2015
@@ -36,6 +36,22 @@ public class SparkPigStats extends PigSt
         jobPlan.add(jobStats);
     }
 
+    public void addFailJobStats(POStore poStore, String jobId,
+                                JobMetricsListener jobMetricsListener,
+                                JavaSparkContext sparkContext,
+                                Configuration conf,
+                                Exception e) {
+        boolean isSuccess = false;
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+        jobStats.setSuccessful(isSuccess);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+        jobStats.collectStats(jobMetricsListener);
+        jobPlan.add(jobStats);
+        if (e != null) {
+            jobStats.setBackendException(e);
+        }
+    }
+
     public void finish() {
         super.stop();
         display();

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
Wed Mar 11 01:27:06 2015
@@ -43,10 +43,20 @@ public class SparkStatsUtil {
       // To workaround this, we will wait for this job to "finish".
       jobMetricsListener.waitForJobToEnd(jobID);
       sparkPigStats.addJobStats(poStore, jobID, jobMetricsListener,
-          sparkContext, jobConf);
+              sparkContext, jobConf);
       jobMetricsListener.cleanup(jobID);
   }
 
+    public static void addFailJobStats(String jobID,
+                                       POStore poStore,
+                                       SparkPigStats sparkPigStats,
+                                       JobConf jobConf, Exception e) {
+        JobMetricsListener jobMetricsListener = null;
+        JavaSparkContext sparkContext = null;
+        sparkPigStats.addFailJobStats(poStore, jobID, jobMetricsListener,
+                sparkContext, jobConf, e);
+    }
+
   public static boolean isJobSuccess(int jobID,
                                     JavaSparkContext sparkContext) {
       JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();


Reply via email to