Author: praveen
Date: Thu Mar 19 11:26:48 2015
New Revision: 1667706
URL: http://svn.apache.org/r1667706
Log:
PIG-4469: Remove redundant code, comments in SparkLauncher (Praveen)
Removed:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/BroadCastClient.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/BroadCastServer.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1667706&r1=1667705&r2=1667706&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Thu Mar 19 11:26:48 2015
@@ -120,21 +120,11 @@ public class SparkLauncher extends Launc
// new SparkLauncher gets created for each job.
private static JavaSparkContext sparkContext = null;
private static JobMetricsListener jobMetricsListener = new
JobMetricsListener();
-
- public static BroadCastServer bcaster;
- private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
- .compile("\\.(zip|tgz|tar\\.gz|tar)$").matcher("");
- // An object that handle cache calls in the operator graph. This is
again
- // static because we want
- // it to be shared across SparkLaunchers. It gets cleared whenever we
close
- // the SparkContext.
- // private static CacheConverter cacheConverter = null;
private String jobGroupID;
@Override
public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
PigContext pigContext) throws Exception {
- LOG.info("!!!!!!!!!! Launching Spark (woot) !!!!!!!!!!!!");
LOG.debug(physicalPlan);
JobConf jobConf = SparkUtil.newJobConf(pigContext);
jobConf.set(PigConstants.LOCAL_CODE_DIR,
@@ -142,23 +132,12 @@ public class SparkLauncher extends Launc
SchemaTupleBackend.initialize(jobConf, pigContext);
SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
-
- if (System.getenv("BROADCAST_PORT") == null
- && System.getenv("BROADCAST_MASTER_IP") ==
null) {
- LOG.warn("Missing BROADCAST_POST/BROADCAST_HOST in the
environment.");
- } else {
- if (bcaster == null) {
- bcaster = new BroadCastServer();
-
bcaster.startBroadcastServer(Integer.parseInt(System
- .getenv("BROADCAST_PORT")));
- }
- }
-
SparkPigStats sparkStats = (SparkPigStats) pigContext
.getExecutionEngine().instantiatePigStats();
PigStats.start(sparkStats);
startSparkIfNeeded();
+
// Set a unique group id for this query, so we can lookup all
Spark job
// ids
// related to this query.
@@ -179,18 +158,16 @@ public class SparkLauncher extends Launc
jobConf);
}
- // ObjectSerializer.serialize(c);
byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
- // initialize the supported converters
- Map<Class<? extends PhysicalOperator>, POConverter> convertMap
= new HashMap<Class<? extends PhysicalOperator>, POConverter>();
+ // Create conversion map, mapping between pig operator and
spark convertor
+ Map<Class<? extends PhysicalOperator>, POConverter> convertMap
= new HashMap<Class<? extends PhysicalOperator>, POConverter>();
convertMap.put(POLoad.class, new LoadConverter(pigContext,
physicalPlan, sparkContext.sc()));
convertMap.put(POStore.class, new StoreConverter(pigContext));
convertMap.put(POForEach.class, new
ForEachConverter(confBytes));
convertMap.put(POFilter.class, new FilterConverter());
convertMap.put(POPackage.class, new
PackageConverter(confBytes));
- // convertMap.put(POCache.class, cacheConverter);
convertMap.put(POLocalRearrange.class, new
LocalRearrangeConverter());
convertMap.put(POGlobalRearrange.class, new
GlobalRearrangeConverter());
convertMap.put(POLimit.class, new LimitConverter());
@@ -205,18 +182,18 @@ public class SparkLauncher extends Launc
convertMap.put(POStreamSpark.class, new
StreamConverter(confBytes));
sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
-
cleanUpSparkJob(pigContext, currentDirectoryPath);
sparkStats.finish();
- return sparkStats;
+
+ return sparkStats;
}
private void optimize(PigContext pc, SparkOperPlan plan) throws
VisitorException {
- boolean isAccum =
+ boolean isAccumulator =
Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
- if (isAccum) {
- AccumulatorOptimizer accum = new
AccumulatorOptimizer(plan);
- accum.visit();
+ if (isAccumulator) {
+ AccumulatorOptimizer accumulatorOptimizer = new
AccumulatorOptimizer(plan);
+ accumulatorOptimizer.visit();
}
}
@@ -419,15 +396,11 @@ public class SparkLauncher extends Launc
master = "local";
}
- String sparkHome = System.getenv("SPARK_HOME"); // It's
okay if this
- // is null for local
- // mode
+ String sparkHome = System.getenv("SPARK_HOME");
String sparkJarsSetting = System.getenv("SPARK_JARS");
String pigJar = System.getenv("SPARK_PIG_JAR");
String[] sparkJars = sparkJarsSetting == null ? new
String[] {}
: sparkJarsSetting.split(",");
-
- // TODO: Don't hardcode this JAR
List<String> jars = Lists.asList(pigJar, sparkJars);
if (!master.startsWith("local") &&
!master.equals("yarn-client")) {
@@ -438,41 +411,13 @@ public class SparkLauncher extends Launc
.println("You need to
set SPARK_HOME to run on a Mesos cluster!");
throw new PigException("SPARK_HOME is
not set");
}
- /*
- * if (System.getenv("MESOS_NATIVE_LIBRARY") ==
null) {
- *
- * System.err.println(
- * "You need to set MESOS_NATIVE_LIBRARY to run
on a Mesos cluster!"
- * ); throw new
PigException("MESOS_NATIVE_LIBRARY is not set");
- * }
- *
- * // Tell Spark to use Mesos in coarse-grained
mode (only
- * affects Spark 0.6+; no impact on others)
- * System.setProperty("spark.mesos.coarse",
"true");
- */
}
- // // For coarse-grained Mesos mode, tell it an upper
bound on how
- // many
- // // cores to grab in total;
- // // we conservatively set this to 32 unless the user
set the
- // // SPARK_MAX_CPUS environment variable.
- // if (System.getenv("SPARK_MAX_CPUS") != null) {
- // int maxCores = 32;
- // maxCores =
Integer.parseInt(System.getenv("SPARK_MAX_CPUS"));
- // System.setProperty("spark.cores.max", "" + maxCores);
- // }
- // System.setProperty("spark.cores.max", "1");
- // System.setProperty("spark.executor.memory", "" +
"512m");
- // System.setProperty("spark.shuffle.memoryFraction",
"0.0");
- // System.setProperty("spark.storage.memoryFraction",
"0.0");
-
- sparkContext = new JavaSparkContext(master, "Spork",
sparkHome,
+ sparkContext = new JavaSparkContext(master,
"PigOnSpark", sparkHome,
jars.toArray(new String[jars.size()]));
sparkContext.sc().addSparkListener(new
StatsReportListener());
sparkContext.sc().addSparkListener(new JobLogger());
sparkContext.sc().addSparkListener(jobMetricsListener);
- // cacheConverter = new CacheConverter();
}
}
@@ -481,7 +426,6 @@ public class SparkLauncher extends Launc
if (sparkContext != null) {
sparkContext.stop();
sparkContext = null;
- // cacheConverter = null;
}
}
@@ -499,7 +443,6 @@ public class SparkLauncher extends Launc
sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
physicalOpToRdds, convertMap,
seenJobIDs, sparkStats,
jobConf);
-
}
} else {
throw new RuntimeException("sparkPlan is null");
@@ -560,23 +503,22 @@ public class SparkLauncher extends Launc
sparkOperator.physicalPlan, POStore.class);
if (poStores != null && poStores.size() == 1) {
POStore poStore = poStores.get(0);
- if( isFail == false ) {
- for (int jobID : getJobIDs(seenJobIDs)) {
-
SparkStatsUtil.waitForJobAddStats(jobID, poStore,
- jobMetricsListener,
sparkContext, sparkStats, conf);
- }
- }else{
- String failJobID
=sparkOperator.name().concat("_fail");
- SparkStatsUtil.addFailJobStats(failJobID,
poStore,sparkStats,
- conf,exception);
- }
- } else {
+ if (!isFail) {
+ for (int jobID : getJobIDs(seenJobIDs)) {
+ SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+ jobMetricsListener, sparkContext, sparkStats,
conf);
+ }
+ } else {
+ String failJobID = sparkOperator.name().concat("_fail");
+ SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkStats,
+ conf, exception);
+ }
+ } else {
LOG.info(String
- .format("sparkOperator:{} does not have
POStore or "
- + "sparkOperator has
more than 1 POStore, the size of POStore",
+ .format(String.format("sparkOperator:{}
does not have POStore or" +
+ " sparkOperator has more than 1 POStore,
the size of POStore"),
sparkOperator.name(),
poStores.size()));
}
-
}
private void physicalToRDD(PhysicalPlan plan,