Author: xuefu
Date: Fri Dec 18 05:28:22 2015
New Revision: 1720721
URL: http://svn.apache.org/viewvc?rev=1720721&view=rev
Log:
PIG-4293: Enable unit test TestNativeMapReduce for spark (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Fri Dec 18 05:28:22 2015
@@ -88,6 +88,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
@@ -573,6 +574,10 @@ public class SparkLauncher extends Launc
}
}
+ if (sparkOperator instanceof NativeSparkOperator) {
+ ((NativeSparkOperator) sparkOperator).runJob();
+ return;
+ }
List<PhysicalOperator> leafPOs =
sparkOperator.physicalPlan.getLeaves();
boolean isFail = false;
Exception exception = null;
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
Fri Dec 18 05:28:22 2015
@@ -17,8 +17,13 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+import org.apache.hadoop.util.RunJar;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.RunJarSecurityManager;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
/**
* NativeSparkOperator:
@@ -41,4 +46,50 @@ public class NativeSparkOperator extends
countJobs++;
return countJobs;
}
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void runJob() throws JobCreationException {
+ RunJarSecurityManager secMan = new RunJarSecurityManager();
+ try {
+ RunJar.main(getNativeMRParams());
+ SparkStatsUtil.addNativeJobStats(PigStats.get(), this);
+ } catch (SecurityException se) {
//java.lang.reflect.InvocationTargetException
+ if (secMan.getExitInvoked()) {
+ if (secMan.getExitCode() != 0) {
+ throw new JobCreationException("Native job returned with
non-zero return code");
+ } else {
+ SparkStatsUtil.addNativeJobStats(PigStats.get(), this);
+ }
+ }
+ } catch (Throwable t) {
+ JobCreationException e = new JobCreationException(
+ "Cannot run native spark job " + t.getMessage(), t);
+ SparkStatsUtil.addFailedNativeJobStats(PigStats.get(), this, e);
+ throw e;
+ } finally {
+ secMan.retire();
+ }
+ }
+
+ private String[] getNativeMRParams() {
+ String[] paramArr = new String[params.length + 1];
+ paramArr[0] = nativeSparkJar;
+ for (int i = 0; i < params.length; i++) {
+ paramArr[i + 1] = params[i];
+ }
+ return paramArr;
+ }
+
+ public String getCommandString() {
+ StringBuilder sb = new StringBuilder("hadoop jar ");
+ sb.append(nativeSparkJar);
+ for (String pr : params) {
+ sb.append(" ");
+ sb.append(pr);
+ }
+ return sb.toString();
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
Fri Dec 18 05:28:22 2015
@@ -54,6 +54,7 @@ public class SparkPrinter extends SparkO
mStream.println("");
mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
if (sparkOp instanceof NativeSparkOperator) {
+ mStream.println(((NativeSparkOperator)sparkOp).getCommandString());
mStream.println("--------");
mStream.println();
return;
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
Fri Dec 18 05:28:22 2015
@@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.PigContext;
@@ -95,6 +96,16 @@ public class SparkPigStats extends PigSt
jobStats.setBackendException(e);
}
}
+
+ public void addNativeJobStats(NativeSparkOperator sparkOperator, String
jobId, boolean isSuccess, Exception e){
+ SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+ jobStats.setSuccessful(isSuccess);
+ jobSparkOperatorMap.put(jobStats,sparkOperator);
+ jobPlan.add(jobStats);
+ if( e != null ){
+ jobStats.setBackendException(e);
+ }
+ }
public void finish() {
super.stop();
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1720721&r1=1720720&r2=1720721&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Fri Dec 18 05:28:22 2015
@@ -22,8 +22,10 @@ import org.apache.hadoop.mapred.JobConf;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStats;
import org.apache.spark.JobExecutionStatus;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.api.java.JavaSparkContext;
@@ -35,25 +37,25 @@ public class SparkStatsUtil {
public static final String SPARK_INPUT_COUNTER_GROUP = "Spark Input
Counters";
public static final String SPARK_INPUT_RECORD_COUNTER = "Input records
from ";
- public static void waitForJobAddStats(int jobID,
- POStore poStore, SparkOperator
sparkOperator,
- JobMetricsListener jobMetricsListener,
- JavaSparkContext sparkContext,
- SparkPigStats sparkPigStats,
- JobConf jobConf)
- throws InterruptedException {
- // Even though we are not making any async calls to spark,
- // the SparkStatusTracker can still return RUNNING status
- // for a finished job.
- // Looks like there is a race condition between spark
- // "event bus" thread updating it's internal listener and
- // this driver thread calling SparkStatusTracker.
- // To workaround this, we will wait for this job to "finish".
- jobMetricsListener.waitForJobToEnd(jobID);
- sparkPigStats.addJobStats(poStore, sparkOperator, jobID,
jobMetricsListener,
- sparkContext, jobConf);
- jobMetricsListener.cleanup(jobID);
- }
+ public static void waitForJobAddStats(int jobID,
+ POStore poStore, SparkOperator
sparkOperator,
+ JobMetricsListener
jobMetricsListener,
+ JavaSparkContext sparkContext,
+ SparkPigStats sparkPigStats,
+ JobConf jobConf)
+ throws InterruptedException {
+ // Even though we are not making any async calls to spark,
+ // the SparkStatusTracker can still return RUNNING status
+ // for a finished job.
+ // Looks like there is a race condition between spark
+ // "event bus" thread updating it's internal listener and
+ // this driver thread calling SparkStatusTracker.
+ // To workaround this, we will wait for this job to "finish".
+ jobMetricsListener.waitForJobToEnd(jobID);
+ sparkPigStats.addJobStats(poStore, sparkOperator, jobID,
jobMetricsListener,
+ sparkContext, jobConf);
+ jobMetricsListener.cleanup(jobID);
+ }
public static void addFailJobStats(String jobID,
POStore poStore, SparkOperator
sparkOperator,
@@ -100,26 +102,34 @@ public class SparkStatsUtil {
}
public static boolean isJobSuccess(int jobID,
- JavaSparkContext sparkContext) {
- JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
- if (status == JobExecutionStatus.SUCCEEDED) {
- return true;
- } else if (status != JobExecutionStatus.FAILED) {
- throw new RuntimeException("Unexpected job execution status " +
- status);
- }
-
- return false;
- }
-
- private static SparkJobInfo getJobInfo(int jobID,
- JavaSparkContext sparkContext) {
- SparkJobInfo jobInfo = sparkContext.statusTracker().getJobInfo(jobID);
- if (jobInfo == null) {
- throw new RuntimeException("No jobInfo available for jobID "
- + jobID);
- }
+ JavaSparkContext sparkContext) {
+ JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
+ if (status == JobExecutionStatus.SUCCEEDED) {
+ return true;
+ } else if (status != JobExecutionStatus.FAILED) {
+ throw new RuntimeException("Unexpected job execution status " +
+ status);
+ }
+
+ return false;
+ }
- return jobInfo;
- }
+ private static SparkJobInfo getJobInfo(int jobID,
+ JavaSparkContext sparkContext) {
+ SparkJobInfo jobInfo = sparkContext.statusTracker().getJobInfo(jobID);
+ if (jobInfo == null) {
+ throw new RuntimeException("No jobInfo available for jobID "
+ + jobID);
+ }
+
+ return jobInfo;
+ }
+
+ public static void addNativeJobStats(PigStats ps, NativeSparkOperator
nativeSparkOperator) {
+ ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator,
nativeSparkOperator.getJobId(), true, null);
+ }
+
+ public static void addFailedNativeJobStats(PigStats ps,
NativeSparkOperator nativeSparkOperator, Exception e) {
+ ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator,
nativeSparkOperator.getJobId(), false, e);
+ }
}
\ No newline at end of file