Author: xuefu
Date: Tue Mar 10 04:37:36 2015
New Revision: 1665404

URL: http://svn.apache.org/r1665404
Log:
PIG-4374: Add SparkPlan in spark package (Liyun via Xuefu)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Removed:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1665404&r1=1665403&r2=1665404&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Tue Mar 10 04:37:36 2015
@@ -39,7 +39,6 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -87,8 +86,8 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
-import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOper;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -112,457 +111,529 @@ import org.apache.spark.scheduler.StatsR
  */
 public class SparkLauncher extends Launcher {
 
-    private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
+       private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
 
-    // Our connection to Spark. It needs to be static so that it can be reused
-    // across jobs, because a
-    // new SparkLauncher gets created for each job.
-    private static JavaSparkContext sparkContext = null;
-    private static JobMetricsListener jobMetricsListener = new 
JobMetricsListener();
-
-    public static BroadCastServer bcaster;
-    private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
-            .compile("\\.(zip|tgz|tar\\.gz|tar)$").matcher("");
-    // An object that handle cache calls in the operator graph. This is again
-    // static because we want
-    // it to be shared across SparkLaunchers. It gets cleared whenever we close
-    // the SparkContext.
-    // private static CacheConverter cacheConverter = null;
-    private String jobGroupID;
-
-    @Override
-    public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
-            PigContext pigContext) throws Exception {
-        LOG.info("!!!!!!!!!!  Launching Spark (woot) !!!!!!!!!!!!");
-        LOG.debug(physicalPlan);
-        JobConf c = SparkUtil.newJobConf(pigContext);
-        c.set(PigConstants.LOCAL_CODE_DIR, 
System.getProperty("java.io.tmpdir"));
-
-        SchemaTupleBackend.initialize(c, pigContext);
-        SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
-
-        if (System.getenv("BROADCAST_PORT") == null
-                && System.getenv("BROADCAST_MASTER_IP") == null) {
-            LOG.warn("Missing BROADCAST_POST/BROADCAST_HOST in the 
environment.");
-        } else {
-            if (bcaster == null) {
-                bcaster = new BroadCastServer();
-                bcaster.startBroadcastServer(Integer.parseInt(System
-                        .getenv("BROADCAST_PORT")));
-            }
-        }
-
-        SparkPigStats sparkStats = (SparkPigStats)
-            pigContext.getExecutionEngine().instantiatePigStats();
-        PigStats.start(sparkStats);
-
-        startSparkIfNeeded();
-        // Set a unique group id for this query, so we can lookup all Spark 
job ids
-        // related to this query.
-        jobGroupID = UUID.randomUUID().toString();
-        sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster", 
false);
-        jobMetricsListener.reset();
-
-        String currentDirectoryPath = 
Paths.get(".").toAbsolutePath().normalize().toString() + "/";
-        startSparkJob(pigContext, currentDirectoryPath);
-        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
-                physicalPlan, POStore.class);
-        POStore firstStore = stores.getFirst();
-        if( firstStore != null ){
-            MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext, c);
-        }
-
-        //        ObjectSerializer.serialize(c);
-        byte[] confBytes = KryoSerializer.serializeJobConf(c);
-        // initialize the supported converters
-        Map<Class<? extends PhysicalOperator>, POConverter> convertMap = new 
HashMap<Class<? extends PhysicalOperator>, POConverter>();
-
-        convertMap.put(POLoad.class, new LoadConverter(pigContext,
-                physicalPlan, sparkContext.sc()));
-        convertMap.put(POStore.class, new StoreConverter(pigContext));
-        convertMap.put(POForEach.class, new ForEachConverter(confBytes));
-        convertMap.put(POFilter.class, new FilterConverter());
-        convertMap.put(POPackage.class, new PackageConverter(confBytes
-        ));
-        // convertMap.put(POCache.class, cacheConverter);
-        convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
-        convertMap.put(POGlobalRearrange.class, new 
GlobalRearrangeConverter());
-        convertMap.put(POLimit.class, new LimitConverter());
-        convertMap.put(PODistinct.class, new DistinctConverter());
-        convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
-        convertMap.put(POSort.class, new SortConverter());
-        convertMap.put(POSplit.class, new SplitConverter());
-        convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
-        convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
-        convertMap.put(POCounter.class, new CounterConverter());
-        convertMap.put(PORank.class, new RankConverter());
-        convertMap.put(POStreamSpark.class,new StreamConverter(confBytes));
-
-        sparkPlanToRDD(sparkplan,convertMap, sparkStats, c);
-
-        cleanUpSparkJob(pigContext,currentDirectoryPath);
-        sparkStats.finish();
-        return sparkStats;
-    }
-
-
-    /**
-     * In Spark, currently only async actions return job id.
-     * There is no async equivalent of actions like saveAsNewAPIHadoopFile()
-     *
-     * The only other way to get a job id is to register a "job group ID" with 
the
-     * spark context and request all job ids corresponding to that job group 
via
-     * getJobIdsForGroup.
-     *
-     * However getJobIdsForGroup does not guarantee the order of the elements 
in
-     * it's result.
-     *
-     * This method simply returns the previously unseen job ids.
-     *
-     * @param seenJobIDs job ids in the job group that are already seen
-     * @return Spark job ids not seen before
-     */
-    private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
-        Set<Integer> groupjobIDs = new HashSet<Integer>(Arrays.asList(
-            ArrayUtils.toObject(sparkContext.statusTracker()
-                .getJobIdsForGroup(jobGroupID))));
-        groupjobIDs.removeAll(seenJobIDs);
-        List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
-        if (unseenJobIDs.size() == 0) {
-          throw new RuntimeException("Expected at least one unseen jobID " +
-              " in this call to getJobIdsForGroup, but got " + 
unseenJobIDs.size());
-        }
-
-        seenJobIDs.addAll(unseenJobIDs);
-        return unseenJobIDs;
-    }
-
-    private void cleanUpSparkJob(PigContext pigContext, String 
currentDirectoryPath) {
-        LOG.info("clean up Spark Job");
-        boolean isLocal = System.getenv("SPARK_MASTER")!= 
null?System.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL"): true;
-        if (isLocal) {
-            String shipFiles = 
pigContext.getProperties().getProperty("pig.streaming.ship.files");
-            if (shipFiles != null) {
-                for (String file : shipFiles.split(",")) {
-                    File shipFile = new File(file);
-                    File deleteFile = new File(currentDirectoryPath + "/" + 
shipFile.getName());
-                    if (deleteFile.exists()) {
-                        LOG.info(String.format("delete ship file result: %b", 
deleteFile.delete()));
-                    }
-                }
-            }
-            String cacheFiles = 
pigContext.getProperties().getProperty("pig.streaming.cache.files");
-            if (cacheFiles != null) {
-                for (String file : cacheFiles.split(",")) {
-                    String fileName = extractFileName(file.trim());
-                    File deleteFile = new File(currentDirectoryPath + "/" + 
fileName);
-                    if (deleteFile.exists()) {
-                        LOG.info(String.format("delete cache file result: %b", 
deleteFile.delete()));
-                    }
-                }
-            }
-        }
-    }
-
-    private void startSparkJob(PigContext pigContext, String 
currentDirectoryPath) throws IOException {
-        LOG.info("start Spark Job");
-        String shipFiles = 
pigContext.getProperties().getProperty("pig.streaming.ship.files");
-        shipFiles(shipFiles, currentDirectoryPath);
-        String cacheFiles = 
pigContext.getProperties().getProperty("pig.streaming.cache.files");
-        cacheFiles(cacheFiles, currentDirectoryPath, pigContext);
-    }
-
-    private void shipFiles(String shipFiles, String currentDirectoryPath) 
throws IOException {
-        if (shipFiles != null) {
-            for (String file : shipFiles.split(",")) {
-                File shipFile = new File(file.trim());
-                if (shipFile.exists()) {
-                    LOG.info(String.format("shipFile:%s",shipFile));
-                    boolean isLocal = System.getenv("SPARK_MASTER")!= 
null?System.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL"): true;
-                    if (isLocal) {
-                        File localFile = new File(currentDirectoryPath+"/" + 
shipFile.getName());
-                        if( localFile.exists()){
-                            LOG.info(String.format("ship file %s exists, ready 
to delete",localFile.getAbsolutePath()));
-                            localFile.delete();
-                        } else{
-                            LOG.info(String.format("ship file %s  not 
exists,",localFile.getAbsolutePath()));
-                        }
-                        Files.copy(shipFile.toPath(), 
Paths.get(localFile.getAbsolutePath()));
-                    } else {
-                        
sparkContext.addFile(shipFile.toURI().toURL().toExternalForm());
-                    }
-                }
-            }
-        }
-    }
-
-    private void cacheFiles(String cacheFiles, String currentDirectoryPath, 
PigContext pigContext) throws IOException {
-        if (cacheFiles != null) {
-            Configuration conf = SparkUtil.newJobConf(pigContext);
-            boolean isLocal = System.getenv("SPARK_MASTER")!= 
null?System.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL"): true;
-            for (String file : cacheFiles.split(",")) {
-                String fileName = extractFileName(file.trim());
-                Path src = new Path(extractFileUrl(file.trim()));
-                File tmpFile = File.createTempFile(fileName,".tmp");
-                Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
-                FileSystem fs = tmpFilePath.getFileSystem(conf);
-                fs.copyToLocalFile(src, tmpFilePath);
-                tmpFile.deleteOnExit();
-                if (isLocal) {
-                    File localFile = new File(currentDirectoryPath + "/" + 
fileName);
-                    if( localFile.exists()){
-                        LOG.info(String.format("cache file %s exists, ready to 
delete",localFile.getAbsolutePath()));
-                        localFile.delete();
-                    } else{
-                        LOG.info(String.format("cache file %s not 
exists,",localFile.getAbsolutePath()));
-                    }
-                    Files.copy( Paths.get(tmpFilePath.toString()), 
Paths.get(localFile.getAbsolutePath()));
-                } else {
-                    
sparkContext.addFile(tmpFile.toURI().toURL().toExternalForm());
-                }
-            }
-        }
-    }
-
-    private String extractFileName(String cacheFileUrl) {
-        String[] tmpAry = cacheFileUrl.split("#");
-        String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1] : 
null;
-        if (fileName == null) {
-            throw new RuntimeException("cache file is invalid format, file:" + 
cacheFileUrl);
-        } else {
-            LOG.debug("cache file name is valid:" + cacheFileUrl);
-            return fileName;
-        }
-    }
-
-    private String extractFileUrl(String cacheFileUrl) {
-        String[] tmpAry = cacheFileUrl.split("#");
-        String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0] : 
null;
-        if (fileName == null) {
-            throw new RuntimeException("cache file is invalid format, file:" + 
cacheFileUrl);
-        } else {
-            LOG.debug("cache file name is valid:" + cacheFileUrl);
-            return fileName;
-        }
-    }
-
-    private SparkOperPlan compile(PhysicalPlan physicalPlan, PigContext 
pigContext) throws PlanException, IOException, VisitorException {
-        SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan, 
pigContext);
-        sparkCompiler.compile();
-        SparkOperPlan plan = sparkCompiler.getSparkPlan();
-
-        // optimize key - value handling in package
-        SparkPOPackageAnnotator pkgAnnotator = new 
SparkPOPackageAnnotator(plan);
-        pkgAnnotator.visit();
-        return plan;
-    }
-
-    private static void startSparkIfNeeded() throws PigException {
-        if (sparkContext == null) {
-            String master = System.getenv("SPARK_MASTER");
-            if (master == null) {
-                LOG.info("SPARK_MASTER not specified, using \"local\"");
-                master = "local";
-            }
-
-            String sparkHome = System.getenv("SPARK_HOME"); // It's okay if 
this
-            // is null for local
-            // mode
-            String sparkJarsSetting = System.getenv("SPARK_JARS");
-            String pigJar = System.getenv("SPARK_PIG_JAR");
-            String[] sparkJars = sparkJarsSetting == null ? new String[] {}
-                    : sparkJarsSetting.split(",");
-
-            // TODO: Don't hardcode this JAR
-            List<String> jars = Lists.asList(pigJar, sparkJars);
-
-            if (!master.startsWith("local") && !master.equals("yarn-client")) {
-                // Check that we have the Mesos native library and Spark home
-                // are set
-                if (sparkHome == null) {
-                    System.err
-                            .println("You need to set SPARK_HOME to run on a 
Mesos cluster!");
-                    throw new PigException("SPARK_HOME is not set");
-                }
-                /*
-                 * if (System.getenv("MESOS_NATIVE_LIBRARY") == null) {
-                 *
-                 * System.err.println(
-                 * "You need to set MESOS_NATIVE_LIBRARY to run on a Mesos 
cluster!"
-                 * ); throw new PigException("MESOS_NATIVE_LIBRARY is not 
set");
-                 * }
-                 *
-                 * // Tell Spark to use Mesos in coarse-grained mode (only
-                 * affects Spark 0.6+; no impact on others)
-                 * System.setProperty("spark.mesos.coarse", "true");
-                 */
-            }
-
-//            // For coarse-grained Mesos mode, tell it an upper bound on how 
many
-//            // cores to grab in total;
-//            // we conservatively set this to 32 unless the user set the
-//            // SPARK_MAX_CPUS environment variable.
-//            if (System.getenv("SPARK_MAX_CPUS") != null) {
-//                int maxCores = 32;
-//                maxCores = Integer.parseInt(System.getenv("SPARK_MAX_CPUS"));
-//                System.setProperty("spark.cores.max", "" + maxCores);
-//            }
-//            System.setProperty("spark.cores.max", "1");
-//            System.setProperty("spark.executor.memory", "" + "512m");
-//            System.setProperty("spark.shuffle.memoryFraction", "0.0");
-//            System.setProperty("spark.storage.memoryFraction", "0.0");
-
-            sparkContext = new JavaSparkContext(master,
-                    "Spork", sparkHome, jars.toArray(new String[jars.size()]));
-            sparkContext.sc().addSparkListener(new StatsReportListener());
-            sparkContext.sc().addSparkListener(new JobLogger());
-            sparkContext.sc().addSparkListener(jobMetricsListener);
-            // cacheConverter = new CacheConverter();
-        }
-    }
-
-    // You can use this in unit tests to stop the SparkContext between tests.
-    static void stopSpark() {
-        if (sparkContext != null) {
-            sparkContext.stop();
-            sparkContext = null;
-            // cacheConverter = null;
-        }
-    }
-
-    private void sparkPlanToRDD(SparkOperPlan sparkPlan, Map<Class<? extends 
PhysicalOperator>, POConverter> convertMap, SparkPigStats sparkStats, JobConf 
c) throws IOException , InterruptedException {
-        Set<Integer> seenJobIDs = new HashSet<Integer>();
-        if (sparkPlan != null) {
-            List<SparkOper> leaves = sparkPlan.getLeaves();
-            Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new HashMap();
-            for (SparkOper leaf : leaves) {
-                Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new HashMap();
-                sparkOperToRDD(sparkPlan, leaf, sparkOpRdds, physicalOpRdds, 
convertMap, seenJobIDs, sparkStats, c);
-
-            }
-        }
-    }
-
-    private void sparkOperToRDD(SparkOperPlan sparkPlan,
-                                SparkOper sparkOper,Map<OperatorKey, 
RDD<Tuple>>  sparkOpRdds, Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
-                                Map<Class<? extends PhysicalOperator>, 
POConverter> convertMap,
-                                Set<Integer> seenJobIDs, SparkPigStats 
sparkStats, JobConf c ) throws IOException, InterruptedException {
-
-        List<SparkOper> predecessors = sparkPlan.getPredecessors(sparkOper);
-        List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
-        if (predecessors != null) {
-            for (SparkOper prede : predecessors) {
-                if (sparkOpRdds.get(prede.getOperatorKey()) == null) {
-                    sparkOperToRDD(sparkPlan, prede, sparkOpRdds, 
physicalOpRdds, convertMap,seenJobIDs, sparkStats, c);
-                }
-                predecessorRDDs.add(sparkOpRdds.get(prede.getOperatorKey()));
-            }
-        }
-
-        List<PhysicalOperator> leafPOs = sparkOper.plan.getLeaves();
-        if (leafPOs != null && leafPOs.size() != 1) {
-            throw new IllegalArgumentException(String.format("sparkOper.plan 
should have 1 leaf, but sparkOper.plan.getLeaves() not equals 1, sparkOper:{}" 
+ sparkOper.name()));
-        } else {
-            PhysicalOperator leafPO = leafPOs.get(0);
-            physicalToRDD(sparkOper.plan, leafPO, physicalOpRdds, 
predecessorRDDs, convertMap);
-            
sparkOpRdds.put(sparkOper.getOperatorKey(),physicalOpRdds.get(leafPO.getOperatorKey()));
-        }
-
-        List<POStore> poStores =  PlanHelper.getPhysicalOperators(
-                sparkOper.plan, POStore.class);
-        if( poStores!=null && poStores.size() ==1){
-            POStore poStore = poStores.get(0);
-            for (int jobID : getJobIDs(seenJobIDs)) {
-                SparkStatsUtil.waitForJobAddStats(jobID, poStore,
-                        jobMetricsListener, sparkContext, sparkStats, c);
-            }
-        }  else{
-            LOG.info(String.format("sparkOper:{} does not have POStore or 
sparkOper has more than 1 POStore",sparkOper.name()));
-        }
-
-    }
-
-    private void physicalToRDD(PhysicalPlan plan,
-                               PhysicalOperator physicalOperator,
-                               Map<OperatorKey, RDD<Tuple>> rdds,  
List<RDD<Tuple>> rddsFromPredeSparkOper,
-                               Map<Class<? extends PhysicalOperator>, 
POConverter> convertMap)
-            throws IOException {
-        RDD<Tuple> nextRDD = null;
-        List<PhysicalOperator> predecessors = plan
-                .getPredecessors(physicalOperator);
-        List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
-        if (predecessors != null) {
-            for (PhysicalOperator predecessor : predecessors) {
-                physicalToRDD(plan, predecessor, rdds,rddsFromPredeSparkOper, 
convertMap);
-                predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
-            }
-        }else{
-            if( rddsFromPredeSparkOper!=null && 
rddsFromPredeSparkOper.size()>0 ){
-                predecessorRdds.addAll(rddsFromPredeSparkOper);
-            }
-        }
-
-        POConverter converter = convertMap.get(physicalOperator.getClass());
-        if (converter == null) {
-            throw new IllegalArgumentException(
-                    "Spork unsupported PhysicalOperator: " + physicalOperator);
-        }
-
-        LOG.info("Converting operator "
-                + physicalOperator.getClass().getSimpleName() + " "
-                + physicalOperator);
-        nextRDD = converter.convert(predecessorRdds, physicalOperator);
-
-        if (nextRDD == null) {
-            throw new IllegalArgumentException(
-                    "RDD should not be null after PhysicalOperator: "
-                            + physicalOperator);
-        }
-
-        rdds.put(physicalOperator.getOperatorKey(), nextRDD);
-    }
-
-    @Override
-    public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
-                        String format, boolean verbose) throws IOException {
-        SparkOperPlan sparkPlan = compile(pp, pc);
-        ps.println("#-----------------------------------------------------#");
-        ps.println("# Spark plan is A DAG, the Spark node relations are:");
-        ps.println("#-----------------------------------------------------#");
-        Map<OperatorKey, SparkOper> allOperKeys= sparkPlan.getKeys();
-        List<OperatorKey> operKeyList = new ArrayList(allOperKeys.keySet());
-        Collections.sort(operKeyList);
-        for(OperatorKey operatorKey: operKeyList){
-            SparkOper op = sparkPlan.getOperator(operatorKey);
-            ps.print(op.getOperatorKey());
-            List<SparkOper> successors = sparkPlan.getSuccessors(op);
-            if( successors!=null) {
-                ps.print("->");
-                for (SparkOper suc : successors) {
-                    ps.print(suc.getOperatorKey() + " ");
-                }
-            }
-            ps.println();
-        }
-        if (format.equals("text")) {
-            SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
-            printer.setVerbose(verbose);
-            printer.visit();
-        } else {  // TODO: add support for other file format
-            throw new IOException("Non-text    output of explain is not 
supported.");
-        }
-    }
-
-    @Override
-    public void kill() throws BackendException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void killJob(String jobID, Configuration conf)
-            throws BackendException {
-        // TODO Auto-generated method stub
+       // Our connection to Spark. It needs to be static so that it can be 
reused
+       // across jobs, because a
+       // new SparkLauncher gets created for each job.
+       private static JavaSparkContext sparkContext = null;
+       private static JobMetricsListener jobMetricsListener = new 
JobMetricsListener();
+
+       public static BroadCastServer bcaster;
+       private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
+                       .compile("\\.(zip|tgz|tar\\.gz|tar)$").matcher("");
+       // An object that handle cache calls in the operator graph. This is 
again
+       // static because we want
+       // it to be shared across SparkLaunchers. It gets cleared whenever we 
close
+       // the SparkContext.
+       // private static CacheConverter cacheConverter = null;
+       private String jobGroupID;
+
+       @Override
+       public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
+                       PigContext pigContext) throws Exception {
+               LOG.info("!!!!!!!!!!  Launching Spark (woot) !!!!!!!!!!!!");
+               LOG.debug(physicalPlan);
+               JobConf jobConf = SparkUtil.newJobConf(pigContext);
+               jobConf.set(PigConstants.LOCAL_CODE_DIR,
+                               System.getProperty("java.io.tmpdir"));
+
+               SchemaTupleBackend.initialize(jobConf, pigContext);
+               SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
+
+               if (System.getenv("BROADCAST_PORT") == null
+                               && System.getenv("BROADCAST_MASTER_IP") == 
null) {
+                       LOG.warn("Missing BROADCAST_POST/BROADCAST_HOST in the 
environment.");
+               } else {
+                       if (bcaster == null) {
+                               bcaster = new BroadCastServer();
+                               
bcaster.startBroadcastServer(Integer.parseInt(System
+                                               .getenv("BROADCAST_PORT")));
+                       }
+               }
+
+               SparkPigStats sparkStats = (SparkPigStats) pigContext
+                               .getExecutionEngine().instantiatePigStats();
+               PigStats.start(sparkStats);
+
+               startSparkIfNeeded();
+               // Set a unique group id for this query, so we can lookup all 
Spark job
+               // ids
+               // related to this query.
+               jobGroupID = UUID.randomUUID().toString();
+               sparkContext.setJobGroup(jobGroupID, "Pig query to Spark 
cluster",
+                               false);
+               jobMetricsListener.reset();
+
+               String currentDirectoryPath = Paths.get(".").toAbsolutePath()
+                               .normalize().toString()
+                               + "/";
+               startSparkJob(pigContext, currentDirectoryPath);
+               LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+                               physicalPlan, POStore.class);
+               POStore firstStore = stores.getFirst();
+               if (firstStore != null) {
+                       MapRedUtil.setupStreamingDirsConfSingle(firstStore, 
pigContext,
+                                       jobConf);
+               }
+
+               // ObjectSerializer.serialize(c);
+               byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+               // initialize the supported converters
+               Map<Class<? extends PhysicalOperator>, POConverter> convertMap 
= new HashMap<Class<? extends PhysicalOperator>, POConverter>();
+
+               convertMap.put(POLoad.class, new LoadConverter(pigContext,
+                               physicalPlan, sparkContext.sc()));
+               convertMap.put(POStore.class, new StoreConverter(pigContext));
+               convertMap.put(POForEach.class, new 
ForEachConverter(confBytes));
+               convertMap.put(POFilter.class, new FilterConverter());
+               convertMap.put(POPackage.class, new 
PackageConverter(confBytes));
+               // convertMap.put(POCache.class, cacheConverter);
+               convertMap.put(POLocalRearrange.class, new 
LocalRearrangeConverter());
+               convertMap.put(POGlobalRearrange.class, new 
GlobalRearrangeConverter());
+               convertMap.put(POLimit.class, new LimitConverter());
+               convertMap.put(PODistinct.class, new DistinctConverter());
+               convertMap.put(POUnion.class, new 
UnionConverter(sparkContext.sc()));
+               convertMap.put(POSort.class, new SortConverter());
+               convertMap.put(POSplit.class, new SplitConverter());
+               convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
+               convertMap.put(POCollectedGroup.class, new 
CollectedGroupConverter());
+               convertMap.put(POCounter.class, new CounterConverter());
+               convertMap.put(PORank.class, new RankConverter());
+               convertMap.put(POStreamSpark.class, new 
StreamConverter(confBytes));
+
+               sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
+
+               cleanUpSparkJob(pigContext, currentDirectoryPath);
+               sparkStats.finish();
+               return sparkStats;
+       }
+
+       /**
+        * In Spark, currently only async actions return job id. There is no 
async
+        * equivalent of actions like saveAsNewAPIHadoopFile()
+        * 
+        * The only other way to get a job id is to register a "job group ID" 
with
+        * the spark context and request all job ids corresponding to that job 
group
+        * via getJobIdsForGroup.
+        * 
+        * However getJobIdsForGroup does not guarantee the order of the 
elements in
+        * it's result.
+        * 
+        * This method simply returns the previously unseen job ids.
+        * 
+        * @param seenJobIDs
+        *            job ids in the job group that are already seen
+        * @return Spark job ids not seen before
+        */
+       private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+               Set<Integer> groupjobIDs = new HashSet<Integer>(
+                               
Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
+                                               
.getJobIdsForGroup(jobGroupID))));
+               groupjobIDs.removeAll(seenJobIDs);
+               List<Integer> unseenJobIDs = new 
ArrayList<Integer>(groupjobIDs);
+               if (unseenJobIDs.size() == 0) {
+                       throw new RuntimeException("Expected at least one 
unseen jobID "
+                                       + " in this call to getJobIdsForGroup, 
but got "
+                                       + unseenJobIDs.size());
+               }
+
+               seenJobIDs.addAll(unseenJobIDs);
+               return unseenJobIDs;
+       }
+
+       private void cleanUpSparkJob(PigContext pigContext,
+                       String currentDirectoryPath) {
+               LOG.info("clean up Spark Job");
+               boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
+                               
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+               if (isLocal) {
+                       String shipFiles = 
pigContext.getProperties().getProperty(
+                                       "pig.streaming.ship.files");
+                       if (shipFiles != null) {
+                               for (String file : shipFiles.split(",")) {
+                                       File shipFile = new File(file);
+                                       File deleteFile = new 
File(currentDirectoryPath + "/"
+                                                       + shipFile.getName());
+                                       if (deleteFile.exists()) {
+                                               LOG.info(String.format("delete 
ship file result: %b",
+                                                               
deleteFile.delete()));
+                                       }
+                               }
+                       }
+                       String cacheFiles = 
pigContext.getProperties().getProperty(
+                                       "pig.streaming.cache.files");
+                       if (cacheFiles != null) {
+                               for (String file : cacheFiles.split(",")) {
+                                       String fileName = 
extractFileName(file.trim());
+                                       File deleteFile = new 
File(currentDirectoryPath + "/"
+                                                       + fileName);
+                                       if (deleteFile.exists()) {
+                                               LOG.info(String.format("delete 
cache file result: %b",
+                                                               
deleteFile.delete()));
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private void startSparkJob(PigContext pigContext,
+                       String currentDirectoryPath) throws IOException {
+               LOG.info("start Spark Job");
+               String shipFiles = pigContext.getProperties().getProperty(
+                               "pig.streaming.ship.files");
+               shipFiles(shipFiles, currentDirectoryPath);
+               String cacheFiles = pigContext.getProperties().getProperty(
+                               "pig.streaming.cache.files");
+               cacheFiles(cacheFiles, currentDirectoryPath, pigContext);
+       }
+
+       private void shipFiles(String shipFiles, String currentDirectoryPath)
+                       throws IOException {
+               if (shipFiles != null) {
+                       for (String file : shipFiles.split(",")) {
+                               File shipFile = new File(file.trim());
+                               if (shipFile.exists()) {
+                                       LOG.info(String.format("shipFile:%s", 
shipFile));
+                                       boolean isLocal = 
System.getenv("SPARK_MASTER") != null ? System
+                                                       
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL")
+                                                       : true;
+                                       if (isLocal) {
+                                               File localFile = new 
File(currentDirectoryPath + "/"
+                                                               + 
shipFile.getName());
+                                               if (localFile.exists()) {
+                                                       LOG.info(String.format(
+                                                                       "ship 
file %s exists, ready to delete",
+                                                                       
localFile.getAbsolutePath()));
+                                                       localFile.delete();
+                                               } else {
+                                                       
LOG.info(String.format("ship file %s  not exists,",
+                                                                       
localFile.getAbsolutePath()));
+                                               }
+                                               Files.copy(shipFile.toPath(),
+                                                               
Paths.get(localFile.getAbsolutePath()));
+                                       } else {
+                                               
sparkContext.addFile(shipFile.toURI().toURL()
+                                                               
.toExternalForm());
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private void cacheFiles(String cacheFiles, String currentDirectoryPath,
+                       PigContext pigContext) throws IOException {
+               if (cacheFiles != null) {
+                       Configuration conf = SparkUtil.newJobConf(pigContext);
+                       boolean isLocal = System.getenv("SPARK_MASTER") != null 
? System
+                                       
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+                       for (String file : cacheFiles.split(",")) {
+                               String fileName = extractFileName(file.trim());
+                               Path src = new 
Path(extractFileUrl(file.trim()));
+                               File tmpFile = File.createTempFile(fileName, 
".tmp");
+                               Path tmpFilePath = new 
Path(tmpFile.getAbsolutePath());
+                               FileSystem fs = tmpFilePath.getFileSystem(conf);
+                               fs.copyToLocalFile(src, tmpFilePath);
+                               tmpFile.deleteOnExit();
+                               if (isLocal) {
+                                       File localFile = new 
File(currentDirectoryPath + "/"
+                                                       + fileName);
+                                       if (localFile.exists()) {
+                                               LOG.info(String.format(
+                                                               "cache file %s 
exists, ready to delete",
+                                                               
localFile.getAbsolutePath()));
+                                               localFile.delete();
+                                       } else {
+                                               LOG.info(String.format("cache 
file %s not exists,",
+                                                               
localFile.getAbsolutePath()));
+                                       }
+                                       
Files.copy(Paths.get(tmpFilePath.toString()),
+                                                       
Paths.get(localFile.getAbsolutePath()));
+                               } else {
+                                       
sparkContext.addFile(tmpFile.toURI().toURL()
+                                                       .toExternalForm());
+                               }
+                       }
+               }
+       }
+
+       private String extractFileName(String cacheFileUrl) {
+               String[] tmpAry = cacheFileUrl.split("#");
+               String fileName = tmpAry != null && tmpAry.length == 2 ? 
tmpAry[1]
+                               : null;
+               if (fileName == null) {
+                       throw new RuntimeException("cache file is invalid 
format, file:"
+                                       + cacheFileUrl);
+               } else {
+                       LOG.debug("cache file name is valid:" + cacheFileUrl);
+                       return fileName;
+               }
+       }
+
+       private String extractFileUrl(String cacheFileUrl) {
+               String[] tmpAry = cacheFileUrl.split("#");
+               String fileName = tmpAry != null && tmpAry.length == 2 ? 
tmpAry[0]
+                               : null;
+               if (fileName == null) {
+                       throw new RuntimeException("cache file is invalid 
format, file:"
+                                       + cacheFileUrl);
+               } else {
+                       LOG.debug("cache file name is valid:" + cacheFileUrl);
+                       return fileName;
+               }
+       }
+
+       private SparkOperPlan compile(PhysicalPlan physicalPlan,
+                       PigContext pigContext) throws PlanException, 
IOException,
+                       VisitorException {
+               SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
+                               pigContext);
+               sparkCompiler.compile();
+               SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
+
+               // optimize key - value handling in package
+               SparkPOPackageAnnotator pkgAnnotator = new 
SparkPOPackageAnnotator(
+                               sparkPlan);
+               pkgAnnotator.visit();
+               return sparkPlan;
+       }
+
+       private static void startSparkIfNeeded() throws PigException {
+               if (sparkContext == null) {
+                       String master = System.getenv("SPARK_MASTER");
+                       if (master == null) {
+                               LOG.info("SPARK_MASTER not specified, using 
\"local\"");
+                               master = "local";
+                       }
+
+                       String sparkHome = System.getenv("SPARK_HOME"); // It's 
okay if this
+                       // is null for local
+                       // mode
+                       String sparkJarsSetting = System.getenv("SPARK_JARS");
+                       String pigJar = System.getenv("SPARK_PIG_JAR");
+                       String[] sparkJars = sparkJarsSetting == null ? new 
String[] {}
+                                       : sparkJarsSetting.split(",");
+
+                       // TODO: Don't hardcode this JAR
+                       List<String> jars = Lists.asList(pigJar, sparkJars);
+
+                       if (!master.startsWith("local") && 
!master.equals("yarn-client")) {
+                               // Check that we have the Mesos native library 
and Spark home
+                               // are set
+                               if (sparkHome == null) {
+                                       System.err
+                                                       .println("You need to 
set SPARK_HOME to run on a Mesos cluster!");
+                                       throw new PigException("SPARK_HOME is 
not set");
+                               }
+                               /*
+                                * if (System.getenv("MESOS_NATIVE_LIBRARY") == 
null) {
+                                * 
+                                * System.err.println(
+                                * "You need to set MESOS_NATIVE_LIBRARY to run 
on a Mesos cluster!"
+                                * ); throw new 
PigException("MESOS_NATIVE_LIBRARY is not set");
+                                * }
+                                * 
+                                * // Tell Spark to use Mesos in coarse-grained 
mode (only
+                                * affects Spark 0.6+; no impact on others)
+                                * System.setProperty("spark.mesos.coarse", 
"true");
+                                */
+                       }
+
+                       // // For coarse-grained Mesos mode, tell it an upper 
bound on how
+                       // many
+                       // // cores to grab in total;
+                       // // we conservatively set this to 32 unless the user 
set the
+                       // // SPARK_MAX_CPUS environment variable.
+                       // if (System.getenv("SPARK_MAX_CPUS") != null) {
+                       // int maxCores = 32;
+                       // maxCores = 
Integer.parseInt(System.getenv("SPARK_MAX_CPUS"));
+                       // System.setProperty("spark.cores.max", "" + maxCores);
+                       // }
+                       // System.setProperty("spark.cores.max", "1");
+                       // System.setProperty("spark.executor.memory", "" + 
"512m");
+                       // System.setProperty("spark.shuffle.memoryFraction", 
"0.0");
+                       // System.setProperty("spark.storage.memoryFraction", 
"0.0");
+
+                       sparkContext = new JavaSparkContext(master, "Spork", 
sparkHome,
+                                       jars.toArray(new String[jars.size()]));
+                       sparkContext.sc().addSparkListener(new 
StatsReportListener());
+                       sparkContext.sc().addSparkListener(new JobLogger());
+                       sparkContext.sc().addSparkListener(jobMetricsListener);
+                       // cacheConverter = new CacheConverter();
+               }
+       }
+
+       // You can use this in unit tests to stop the SparkContext between 
tests.
+       static void stopSpark() {
+               if (sparkContext != null) {
+                       sparkContext.stop();
+                       sparkContext = null;
+                       // cacheConverter = null;
+               }
+       }
+
+       private void sparkPlanToRDD(SparkOperPlan sparkPlan,
+                       Map<Class<? extends PhysicalOperator>, POConverter> 
convertMap,
+                       SparkPigStats sparkStats, JobConf jobConf) throws 
IOException,
+                       InterruptedException {
+               Set<Integer> seenJobIDs = new HashSet<Integer>();
+               if (sparkPlan != null) {
+                       List<SparkOperator> leaves = sparkPlan.getLeaves();
+                       Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new 
HashMap();
+                       for (SparkOperator leaf : leaves) {
+                               Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = 
new HashMap();
+                               sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
+                                               physicalOpToRdds, convertMap, 
seenJobIDs, sparkStats,
+                                               jobConf);
+
+                       }
+               } else {
+                       throw new RuntimeException("sparkPlan is null");
+               }
+       }
+
+       private void sparkOperToRDD(SparkOperPlan sparkPlan,
+                       SparkOperator sparkOperator,
+                       Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
+                       Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
+                       Map<Class<? extends PhysicalOperator>, POConverter> 
convertMap,
+                       Set<Integer> seenJobIDs, SparkPigStats sparkStats, 
JobConf conf)
+                       throws IOException, InterruptedException {
+
+               List<SparkOperator> predecessors = sparkPlan
+                               .getPredecessors(sparkOperator);
+               List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+               if (predecessors != null) {
+                       for (SparkOperator pred : predecessors) {
+                               if (sparkOpRdds.get(pred.getOperatorKey()) == 
null) {
+                                       sparkOperToRDD(sparkPlan, pred, 
sparkOpRdds,
+                                                       physicalOpRdds, 
convertMap, seenJobIDs, sparkStats,
+                                                       conf);
+                               }
+                               
predecessorRDDs.add(sparkOpRdds.get(pred.getOperatorKey()));
+                       }
+               }
+
+               List<PhysicalOperator> leafPOs = 
sparkOperator.physicalPlan.getLeaves();
+               if (leafPOs != null && leafPOs.size() != 1) {
+                       throw new IllegalArgumentException(
+                                       String.format(
+                                                       "sparkOperator "
+                                                                       + 
".physicalPlan should have 1 leaf, but  sparkOperator"
+                                                                       + 
".physicalPlan.getLeaves():{} not equals 1, sparkOperator"
+                                                                       + 
"sparkOperator:{}",
+                                                       
sparkOperator.physicalPlan.getLeaves().size(),
+                                                       sparkOperator.name()));
+               } else {
+                       PhysicalOperator leafPO = leafPOs.get(0);
+                       physicalToRDD(sparkOperator.physicalPlan, leafPO, 
physicalOpRdds,
+                                       predecessorRDDs, convertMap);
+                       sparkOpRdds.put(sparkOperator.getOperatorKey(),
+                                       
physicalOpRdds.get(leafPO.getOperatorKey()));
+               }
+
+               List<POStore> poStores = PlanHelper.getPhysicalOperators(
+                               sparkOperator.physicalPlan, POStore.class);
+               if (poStores != null && poStores.size() == 1) {
+                       POStore poStore = poStores.get(0);
+                       for (int jobID : getJobIDs(seenJobIDs)) {
+                               SparkStatsUtil.waitForJobAddStats(jobID, 
poStore,
+                                               jobMetricsListener, 
sparkContext, sparkStats, conf);
+                       }
+               } else {
+                       LOG.info(String
+                                       .format("sparkOperator:{} does not have 
POStore or "
+                                                       + "sparkOperator has 
more than 1 POStore, the size of POStore",
+                                                       sparkOperator.name(), 
poStores.size()));
+               }
+
+       }
+
+       private void physicalToRDD(PhysicalPlan plan,
+                       PhysicalOperator physicalOperator,
+                       Map<OperatorKey, RDD<Tuple>> rdds,
+                       List<RDD<Tuple>> rddsFromPredeSparkOper,
+                       Map<Class<? extends PhysicalOperator>, POConverter> 
convertMap)
+                       throws IOException {
+               RDD<Tuple> nextRDD = null;
+               List<PhysicalOperator> predecessors = plan
+                               .getPredecessors(physicalOperator);
+               List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
+               if (predecessors != null) {
+                       for (PhysicalOperator predecessor : predecessors) {
+                               physicalToRDD(plan, predecessor, rdds, 
rddsFromPredeSparkOper,
+                                               convertMap);
+                               
predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
+                       }
+               } else {
+                       if (rddsFromPredeSparkOper != null
+                                       && rddsFromPredeSparkOper.size() > 0) {
+                               predecessorRdds.addAll(rddsFromPredeSparkOper);
+                       }
+               }
+
+               POConverter converter = 
convertMap.get(physicalOperator.getClass());
+               if (converter == null) {
+                       throw new IllegalArgumentException(
+                                       "Spork unsupported PhysicalOperator: " 
+ physicalOperator);
+               }
+
+               LOG.info("Converting operator "
+                               + physicalOperator.getClass().getSimpleName() + 
" "
+                               + physicalOperator);
+               nextRDD = converter.convert(predecessorRdds, physicalOperator);
+
+               if (nextRDD == null) {
+                       throw new IllegalArgumentException(
+                                       "RDD should not be null after 
PhysicalOperator: "
+                                                       + physicalOperator);
+               }
+
+               rdds.put(physicalOperator.getOperatorKey(), nextRDD);
+       }
+
+       @Override
+       public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
+                       String format, boolean verbose) throws IOException {
+               SparkOperPlan sparkPlan = compile(pp, pc);
+               
ps.println("#-----------------------------------------------------#");
+               ps.println("#The Spark node relations are:");
+               
ps.println("#-----------------------------------------------------#");
+               Map<OperatorKey, SparkOperator> allOperKeys = 
sparkPlan.getKeys();
+               List<OperatorKey> operKeyList = new 
ArrayList(allOperKeys.keySet());
+               Collections.sort(operKeyList);
+               for (OperatorKey operatorKey : operKeyList) {
+                       SparkOperator op = sparkPlan.getOperator(operatorKey);
+                       ps.print(op.getOperatorKey());
+                       List<SparkOperator> successors = 
sparkPlan.getSuccessors(op);
+                       if (successors != null) {
+                               ps.print("->");
+                               for (SparkOperator suc : successors) {
+                                       ps.print(suc.getOperatorKey() + " ");
+                               }
+                       }
+                       ps.println();
+               }
+               if (format.equals("text")) {
+                       SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
+                       printer.setVerbose(verbose);
+                       printer.visit();
+               } else { // TODO: add support for other file format
+                       throw new IOException(
+                                       "Non-text    output of explain is not 
supported.");
+               }
+       }
+
+       @Override
+       public void kill() throws BackendException {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void killJob(String jobID, Configuration conf)
+                       throws BackendException {
+               // TODO Auto-generated method stub
 
-    }
-}
\ No newline at end of file
+       }
+}

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java?rev=1665404&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
 Tue Mar 10 04:37:36 2015
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * NativeSparkOperator:
+ */
+public class NativeSparkOperator extends SparkOperator {
+    private static final long serialVersionUID = 1L;
+    private static int countJobs = 0;
+    private String nativeSparkJar;
+    private String[] params;
+    private String jobId;
+
+    public NativeSparkOperator(OperatorKey k, String sparkJar, String[] 
parameters) {
+        super(k);
+        nativeSparkJar = sparkJar;
+        params = parameters;
+        jobId = sparkJar + "_" + getJobNumber();
+    }
+
+    private static int getJobNumber() {
+        countJobs++;
+        return countJobs;
+    }
+}


Reply via email to