Author: praveen
Date: Thu Mar 19 11:26:48 2015
New Revision: 1667706

URL: http://svn.apache.org/r1667706
Log:
PIG-4469: Remove redundant code, comments in SparkLauncher (Praveen)

Removed:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/BroadCastClient.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/BroadCastServer.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1667706&r1=1667705&r2=1667706&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Thu Mar 19 11:26:48 2015
@@ -120,21 +120,11 @@ public class SparkLauncher extends Launc
        // new SparkLauncher gets created for each job.
        private static JavaSparkContext sparkContext = null;
        private static JobMetricsListener jobMetricsListener = new 
JobMetricsListener();
-
-       public static BroadCastServer bcaster;
-       private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
-                       .compile("\\.(zip|tgz|tar\\.gz|tar)$").matcher("");
-       // An object that handle cache calls in the operator graph. This is 
again
-       // static because we want
-       // it to be shared across SparkLaunchers. It gets cleared whenever we 
close
-       // the SparkContext.
-       // private static CacheConverter cacheConverter = null;
        private String jobGroupID;
 
        @Override
        public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
                        PigContext pigContext) throws Exception {
-               LOG.info("!!!!!!!!!!  Launching Spark (woot) !!!!!!!!!!!!");
                LOG.debug(physicalPlan);
                JobConf jobConf = SparkUtil.newJobConf(pigContext);
                jobConf.set(PigConstants.LOCAL_CODE_DIR,
@@ -142,23 +132,12 @@ public class SparkLauncher extends Launc
 
                SchemaTupleBackend.initialize(jobConf, pigContext);
                SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
-
-               if (System.getenv("BROADCAST_PORT") == null
-                               && System.getenv("BROADCAST_MASTER_IP") == 
null) {
-                       LOG.warn("Missing BROADCAST_POST/BROADCAST_HOST in the 
environment.");
-               } else {
-                       if (bcaster == null) {
-                               bcaster = new BroadCastServer();
-                               
bcaster.startBroadcastServer(Integer.parseInt(System
-                                               .getenv("BROADCAST_PORT")));
-                       }
-               }
-
                SparkPigStats sparkStats = (SparkPigStats) pigContext
                                .getExecutionEngine().instantiatePigStats();
                PigStats.start(sparkStats);
 
                startSparkIfNeeded();
+
                // Set a unique group id for this query, so we can lookup all 
Spark job
                // ids
                // related to this query.
@@ -179,18 +158,16 @@ public class SparkLauncher extends Launc
                                        jobConf);
                }
 
-               // ObjectSerializer.serialize(c);
                byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
-               // initialize the supported converters
-               Map<Class<? extends PhysicalOperator>, POConverter> convertMap 
= new HashMap<Class<? extends PhysicalOperator>, POConverter>();
 
+               // Create conversion map, mapping between pig operator and 
spark convertor
+               Map<Class<? extends PhysicalOperator>, POConverter> convertMap 
= new HashMap<Class<? extends PhysicalOperator>, POConverter>();
                convertMap.put(POLoad.class, new LoadConverter(pigContext,
                                physicalPlan, sparkContext.sc()));
                convertMap.put(POStore.class, new StoreConverter(pigContext));
                convertMap.put(POForEach.class, new 
ForEachConverter(confBytes));
                convertMap.put(POFilter.class, new FilterConverter());
                convertMap.put(POPackage.class, new 
PackageConverter(confBytes));
-               // convertMap.put(POCache.class, cacheConverter);
                convertMap.put(POLocalRearrange.class, new 
LocalRearrangeConverter());
                convertMap.put(POGlobalRearrange.class, new 
GlobalRearrangeConverter());
                convertMap.put(POLimit.class, new LimitConverter());
@@ -205,18 +182,18 @@ public class SparkLauncher extends Launc
                convertMap.put(POStreamSpark.class, new 
StreamConverter(confBytes));
 
                sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
-
                cleanUpSparkJob(pigContext, currentDirectoryPath);
                sparkStats.finish();
-               return sparkStats;
+
+        return sparkStats;
        }
 
        private void optimize(PigContext pc, SparkOperPlan plan) throws 
VisitorException {
-               boolean isAccum =
+               boolean isAccumulator =
                                
Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
-               if (isAccum) {
-                       AccumulatorOptimizer accum = new 
AccumulatorOptimizer(plan);
-                       accum.visit();
+               if (isAccumulator) {
+                       AccumulatorOptimizer accumulatorOptimizer = new 
AccumulatorOptimizer(plan);
+                       accumulatorOptimizer.visit();
                }
        }
 
@@ -419,15 +396,11 @@ public class SparkLauncher extends Launc
                                master = "local";
                        }
 
-                       String sparkHome = System.getenv("SPARK_HOME"); // It's 
okay if this
-                       // is null for local
-                       // mode
+                       String sparkHome = System.getenv("SPARK_HOME");
                        String sparkJarsSetting = System.getenv("SPARK_JARS");
                        String pigJar = System.getenv("SPARK_PIG_JAR");
                        String[] sparkJars = sparkJarsSetting == null ? new 
String[] {}
                                        : sparkJarsSetting.split(",");
-
-                       // TODO: Don't hardcode this JAR
                        List<String> jars = Lists.asList(pigJar, sparkJars);
 
                        if (!master.startsWith("local") && 
!master.equals("yarn-client")) {
@@ -438,41 +411,13 @@ public class SparkLauncher extends Launc
                                                        .println("You need to 
set SPARK_HOME to run on a Mesos cluster!");
                                        throw new PigException("SPARK_HOME is 
not set");
                                }
-                               /*
-                                * if (System.getenv("MESOS_NATIVE_LIBRARY") == 
null) {
-                                * 
-                                * System.err.println(
-                                * "You need to set MESOS_NATIVE_LIBRARY to run 
on a Mesos cluster!"
-                                * ); throw new 
PigException("MESOS_NATIVE_LIBRARY is not set");
-                                * }
-                                * 
-                                * // Tell Spark to use Mesos in coarse-grained 
mode (only
-                                * affects Spark 0.6+; no impact on others)
-                                * System.setProperty("spark.mesos.coarse", 
"true");
-                                */
                        }
 
-                       // // For coarse-grained Mesos mode, tell it an upper 
bound on how
-                       // many
-                       // // cores to grab in total;
-                       // // we conservatively set this to 32 unless the user 
set the
-                       // // SPARK_MAX_CPUS environment variable.
-                       // if (System.getenv("SPARK_MAX_CPUS") != null) {
-                       // int maxCores = 32;
-                       // maxCores = 
Integer.parseInt(System.getenv("SPARK_MAX_CPUS"));
-                       // System.setProperty("spark.cores.max", "" + maxCores);
-                       // }
-                       // System.setProperty("spark.cores.max", "1");
-                       // System.setProperty("spark.executor.memory", "" + 
"512m");
-                       // System.setProperty("spark.shuffle.memoryFraction", 
"0.0");
-                       // System.setProperty("spark.storage.memoryFraction", 
"0.0");
-
-                       sparkContext = new JavaSparkContext(master, "Spork", 
sparkHome,
+                       sparkContext = new JavaSparkContext(master, 
"PigOnSpark", sparkHome,
                                        jars.toArray(new String[jars.size()]));
                        sparkContext.sc().addSparkListener(new 
StatsReportListener());
                        sparkContext.sc().addSparkListener(new JobLogger());
                        sparkContext.sc().addSparkListener(jobMetricsListener);
-                       // cacheConverter = new CacheConverter();
                }
        }
 
@@ -481,7 +426,6 @@ public class SparkLauncher extends Launc
                if (sparkContext != null) {
                        sparkContext.stop();
                        sparkContext = null;
-                       // cacheConverter = null;
                }
        }
 
@@ -499,7 +443,6 @@ public class SparkLauncher extends Launc
                                sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
                                                physicalOpToRdds, convertMap, 
seenJobIDs, sparkStats,
                                                jobConf);
-
                        }
                } else {
                        throw new RuntimeException("sparkPlan is null");
@@ -560,23 +503,22 @@ public class SparkLauncher extends Launc
                                sparkOperator.physicalPlan, POStore.class);
                if (poStores != null && poStores.size() == 1) {
                        POStore poStore = poStores.get(0);
-                       if( isFail == false ) {
-                               for (int jobID : getJobIDs(seenJobIDs)) {
-                                       
SparkStatsUtil.waitForJobAddStats(jobID, poStore,
-                                                       jobMetricsListener, 
sparkContext, sparkStats, conf);
-                               }
-                       }else{
-                               String failJobID 
=sparkOperator.name().concat("_fail");
-                               SparkStatsUtil.addFailJobStats(failJobID, 
poStore,sparkStats,
-                                               conf,exception);
-                       }
-               } else {
+            if (!isFail) {
+                for (int jobID : getJobIDs(seenJobIDs)) {
+                    SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+                            jobMetricsListener, sparkContext, sparkStats, 
conf);
+                }
+            } else {
+                String failJobID = sparkOperator.name().concat("_fail");
+                SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkStats,
+                        conf, exception);
+            }
+        } else {
                        LOG.info(String
-                                       .format("sparkOperator:{} does not have 
POStore or "
-                                                       + "sparkOperator has 
more than 1 POStore, the size of POStore",
+                                       .format(String.format("sparkOperator:{} 
does not have POStore or" +
+                                    " sparkOperator has more than 1 POStore, 
the size of POStore"),
                                                        sparkOperator.name(), 
poStores.size()));
                }
-
        }
 
        private void physicalToRDD(PhysicalPlan plan,


Reply via email to