Author: xuefu
Date: Fri Dec 18 05:28:22 2015
New Revision: 1720721

URL: http://svn.apache.org/viewvc?rev=1720721&view=rev
Log:
PIG-4293: Enable unit test TestNativeMapReduce for spark (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Fri Dec 18 05:28:22 2015
@@ -88,6 +88,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
@@ -573,6 +574,10 @@ public class SparkLauncher extends Launc
             }
         }
 
+        if (sparkOperator instanceof NativeSparkOperator) {
+            ((NativeSparkOperator) sparkOperator).runJob();
+            return;
+        }
         List<PhysicalOperator> leafPOs = 
sparkOperator.physicalPlan.getLeaves();
         boolean isFail = false;
         Exception exception = null;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
 Fri Dec 18 05:28:22 2015
@@ -17,8 +17,13 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.operator;
 
+import org.apache.hadoop.util.RunJar;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.RunJarSecurityManager;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 
 /**
  * NativeSparkOperator:
@@ -41,4 +46,50 @@ public class NativeSparkOperator extends
         countJobs++;
         return countJobs;
     }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public void runJob() throws JobCreationException {
+        RunJarSecurityManager secMan = new RunJarSecurityManager();
+        try {
+            RunJar.main(getNativeMRParams());
+            SparkStatsUtil.addNativeJobStats(PigStats.get(), this);
+        } catch (SecurityException se) {   
//java.lang.reflect.InvocationTargetException
+            if (secMan.getExitInvoked()) {
+                if (secMan.getExitCode() != 0) {
+                    throw new JobCreationException("Native job returned with 
non-zero return code");
+                } else {
+                    SparkStatsUtil.addNativeJobStats(PigStats.get(), this);
+                }
+            }
+        } catch (Throwable t) {
+            JobCreationException e = new JobCreationException(
+                    "Cannot run native spark job " + t.getMessage(), t);
+            SparkStatsUtil.addFailedNativeJobStats(PigStats.get(), this, e);
+            throw e;
+        } finally {
+            secMan.retire();
+        }
+    }
+
+    private String[] getNativeMRParams() {
+        String[] paramArr = new String[params.length + 1];
+        paramArr[0] = nativeSparkJar;
+        for (int i = 0; i < params.length; i++) {
+            paramArr[i + 1] = params[i];
+        }
+        return paramArr;
+    }
+
+    public String getCommandString() {
+        StringBuilder sb = new StringBuilder("hadoop jar ");
+        sb.append(nativeSparkJar);
+        for (String pr : params) {
+            sb.append(" ");
+            sb.append(pr);
+        }
+        return sb.toString();
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
 Fri Dec 18 05:28:22 2015
@@ -54,6 +54,7 @@ public class SparkPrinter extends SparkO
         mStream.println("");
         mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
         if (sparkOp instanceof NativeSparkOperator) {
+            mStream.println(((NativeSparkOperator)sparkOp).getCommandString());
             mStream.println("--------");
             mStream.println();
             return;

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
Fri Dec 18 05:28:22 2015
@@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.PigContext;
@@ -95,6 +96,16 @@ public class SparkPigStats extends PigSt
             jobStats.setBackendException(e);
         }
     }
+
+    public void addNativeJobStats(NativeSparkOperator sparkOperator, String 
jobId, boolean isSuccess, Exception e){
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+        jobStats.setSuccessful(isSuccess);
+        jobSparkOperatorMap.put(jobStats,sparkOperator);
+        jobPlan.add(jobStats);
+        if( e != null ){
+            jobStats.setBackendException(e);
+        }
+    }
 
     public void finish() {
         super.stop();

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
Fri Dec 18 05:28:22 2015
@@ -22,8 +22,10 @@ import org.apache.hadoop.mapred.JobConf;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.spark.JobExecutionStatus;
 import org.apache.spark.SparkJobInfo;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -35,25 +37,25 @@ public class SparkStatsUtil {
     public static final String SPARK_INPUT_COUNTER_GROUP = "Spark Input 
Counters";
     public static final String SPARK_INPUT_RECORD_COUNTER = "Input records 
from ";
 
-  public static void waitForJobAddStats(int jobID,
-                                        POStore poStore, SparkOperator 
sparkOperator,
-                                        JobMetricsListener jobMetricsListener,
-                                        JavaSparkContext sparkContext,
-                                        SparkPigStats sparkPigStats,
-                                        JobConf jobConf)
-      throws InterruptedException {
-      // Even though we are not making any async calls to spark,
-      // the SparkStatusTracker can still return RUNNING status
-      // for a finished job.
-      // Looks like there is a race condition between spark
-      // "event bus" thread updating it's internal listener and
-      // this driver thread calling SparkStatusTracker.
-      // To workaround this, we will wait for this job to "finish".
-      jobMetricsListener.waitForJobToEnd(jobID);
-      sparkPigStats.addJobStats(poStore, sparkOperator, jobID, 
jobMetricsListener,
-              sparkContext, jobConf);
-      jobMetricsListener.cleanup(jobID);
-  }
+    public static void waitForJobAddStats(int jobID,
+                                          POStore poStore, SparkOperator 
sparkOperator,
+                                          JobMetricsListener 
jobMetricsListener,
+                                          JavaSparkContext sparkContext,
+                                          SparkPigStats sparkPigStats,
+                                          JobConf jobConf)
+            throws InterruptedException {
+        // Even though we are not making any async calls to spark,
+        // the SparkStatusTracker can still return RUNNING status
+        // for a finished job.
+        // Looks like there is a race condition between spark
+        // "event bus" thread updating it's internal listener and
+        // this driver thread calling SparkStatusTracker.
+        // To workaround this, we will wait for this job to "finish".
+        jobMetricsListener.waitForJobToEnd(jobID);
+        sparkPigStats.addJobStats(poStore, sparkOperator, jobID, 
jobMetricsListener,
+                sparkContext, jobConf);
+        jobMetricsListener.cleanup(jobID);
+    }
 
     public static void addFailJobStats(String jobID,
                                        POStore poStore, SparkOperator 
sparkOperator,
@@ -100,26 +102,34 @@ public class SparkStatsUtil {
     }
 
     public static boolean isJobSuccess(int jobID,
-                                    JavaSparkContext sparkContext) {
-      JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
-      if (status == JobExecutionStatus.SUCCEEDED) {
-        return true;
-      } else if (status != JobExecutionStatus.FAILED) {
-        throw new RuntimeException("Unexpected job execution status " +
-            status);
-      }
-
-      return false;
-  }
-
-  private static SparkJobInfo getJobInfo(int jobID,
-                                         JavaSparkContext sparkContext) {
-      SparkJobInfo jobInfo = sparkContext.statusTracker().getJobInfo(jobID);
-      if (jobInfo == null) {
-        throw new RuntimeException("No jobInfo available for jobID "
-            + jobID);
-      }
+                                       JavaSparkContext sparkContext) {
+        JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
+        if (status == JobExecutionStatus.SUCCEEDED) {
+            return true;
+        } else if (status != JobExecutionStatus.FAILED) {
+            throw new RuntimeException("Unexpected job execution status " +
+                    status);
+        }
+
+        return false;
+    }
 
-      return jobInfo;
-  }
+    private static SparkJobInfo getJobInfo(int jobID,
+                                           JavaSparkContext sparkContext) {
+        SparkJobInfo jobInfo = sparkContext.statusTracker().getJobInfo(jobID);
+        if (jobInfo == null) {
+            throw new RuntimeException("No jobInfo available for jobID "
+                    + jobID);
+        }
+
+        return jobInfo;
+    }
+
+    public static void addNativeJobStats(PigStats ps, NativeSparkOperator 
nativeSparkOperator) {
+        ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator, 
nativeSparkOperator.getJobId(), true, null);
+    }
+
+    public static void addFailedNativeJobStats(PigStats ps, 
NativeSparkOperator nativeSparkOperator, Exception e) {
+        ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator, 
nativeSparkOperator.getJobId(), false, e);
+    }
 }
\ No newline at end of file


Reply via email to