Author: xuefu
Date: Fri Jul  3 12:48:05 2015
New Revision: 1689010

URL: http://svn.apache.org/r1689010
Log:
PIG-4619: Cleanup: change the indent size of some files of pig on spark project 
from 2 to 4 space (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java?rev=1689010&r1=1689009&r2=1689010&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
 Fri Jul  3 12:48:05 2015
@@ -21,6 +21,7 @@ package org.apache.pig.backend.hadoop.ex
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.spark.executor.TaskMetrics;
@@ -49,150 +50,150 @@ import java.util.Set;
 
 public class JobMetricsListener implements SparkListener {
 
-  private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
+    private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
 
-  private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
-  private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
-  private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = 
Maps.newHashMap();
-  private final Set<Integer> finishedJobIds = Sets.newHashSet();
-
-  @Override
-  public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
-
-  }
-
-  @Override
-  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
-
-  }
-
-  @Override
-  public void onTaskStart(SparkListenerTaskStart taskStart) {
-
-  }
-
-  @Override
-  public void onTaskGettingResult(SparkListenerTaskGettingResult 
taskGettingResult) {
-
-  }
-
-  @Override
-  public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
-
-  }
-
-  @Override
-  public void onExecutorAdded(SparkListenerExecutorAdded executorAdded){
-
-  }
-
-  @Override
-  public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
-      int stageId = taskEnd.stageId();
-      int stageAttemptId = taskEnd.stageAttemptId();
-      String stageIdentifier = stageId + "_" + stageAttemptId;
-      Integer jobId = stageIdToJobId.get(stageId);
-      if (jobId == null) {
-          LOG.warn("Cannot find job id for stage[" + stageId + "].");
-      } else {
-          Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
-          if (jobMetrics == null) {
-              jobMetrics = Maps.newHashMap();
-              allJobMetrics.put(jobId, jobMetrics);
-          }
-          List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
-          if (stageMetrics == null) {
-              stageMetrics = Lists.newLinkedList();
-              jobMetrics.put(stageIdentifier, stageMetrics);
-          }
-          stageMetrics.add(taskEnd.taskMetrics());
-      }
-  }
-
-  @Override
-  public synchronized void onJobStart(SparkListenerJobStart jobStart) {
-      int jobId = jobStart.jobId();
-      int size = jobStart.stageIds().size();
-      int[] intStageIds = new int[size];
-      for (int i = 0; i < size; i++) {
-          Integer stageId = (Integer) jobStart.stageIds().apply(i);
-          intStageIds[i] = stageId;
-          stageIdToJobId.put(stageId, jobId);
-      }
-      jobIdToStageId.put(jobId, intStageIds);
-  }
-
-  @Override
-  public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
-      finishedJobIds.add(jobEnd.jobId());
-      notify();
-  }
-
-  @Override
-  public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate 
environmentUpdate) {
-
-  }
-
-  @Override
-  public void onBlockManagerAdded(SparkListenerBlockManagerAdded 
blockManagerAdded) {
-
-  }
-
-  @Override
-  public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved 
blockManagerRemoved) {
-
-  }
-
-  @Override
-  public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
-
-  }
-
-  @Override
-  public void onApplicationStart(SparkListenerApplicationStart 
applicationStart) {
-
-  }
-
-  @Override
-  public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
-
-  }
-
-  @Override
-  public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate 
executorMetricsUpdate) {
-
-  }
-
-  public synchronized  Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
-      return allJobMetrics.get(jobId);
-  }
-
-  public synchronized boolean waitForJobToEnd(int jobId) throws 
InterruptedException {
-      if (finishedJobIds.contains(jobId)) {
-        finishedJobIds.remove(jobId);
-        return true;
-      }
-
-      wait();
-      return false;
-  }
-
-  public synchronized void cleanup(int jobId) {
-      allJobMetrics.remove(jobId);
-      jobIdToStageId.remove(jobId);
-      Iterator<Map.Entry<Integer, Integer>> iterator = 
stageIdToJobId.entrySet().iterator();
-      while (iterator.hasNext()) {
-          Map.Entry<Integer, Integer> entry = iterator.next();
-          if (entry.getValue() == jobId) {
-              iterator.remove();
-          }
-      }
-  }
-
-  public synchronized void reset() {
-      stageIdToJobId.clear();
-      jobIdToStageId.clear();
-      allJobMetrics.clear();
-      finishedJobIds.clear();
-  }
+    private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+    private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+    private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = 
Maps.newHashMap();
+    private final Set<Integer> finishedJobIds = Sets.newHashSet();
+
+    @Override
+    public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+    }
+
+    @Override
+    public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+    }
+
+    @Override
+    public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+    }
+
+    @Override
+    public void onTaskGettingResult(SparkListenerTaskGettingResult 
taskGettingResult) {
+
+    }
+
+    @Override
+    public void onExecutorRemoved(SparkListenerExecutorRemoved 
executorRemoved) {
+
+    }
+
+    @Override
+    public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
+
+    }
+
+    @Override
+    public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+        int stageId = taskEnd.stageId();
+        int stageAttemptId = taskEnd.stageAttemptId();
+        String stageIdentifier = stageId + "_" + stageAttemptId;
+        Integer jobId = stageIdToJobId.get(stageId);
+        if (jobId == null) {
+            LOG.warn("Cannot find job id for stage[" + stageId + "].");
+        } else {
+            Map<String, List<TaskMetrics>> jobMetrics = 
allJobMetrics.get(jobId);
+            if (jobMetrics == null) {
+                jobMetrics = Maps.newHashMap();
+                allJobMetrics.put(jobId, jobMetrics);
+            }
+            List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+            if (stageMetrics == null) {
+                stageMetrics = Lists.newLinkedList();
+                jobMetrics.put(stageIdentifier, stageMetrics);
+            }
+            stageMetrics.add(taskEnd.taskMetrics());
+        }
+    }
+
+    @Override
+    public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+        int jobId = jobStart.jobId();
+        int size = jobStart.stageIds().size();
+        int[] intStageIds = new int[size];
+        for (int i = 0; i < size; i++) {
+            Integer stageId = (Integer) jobStart.stageIds().apply(i);
+            intStageIds[i] = stageId;
+            stageIdToJobId.put(stageId, jobId);
+        }
+        jobIdToStageId.put(jobId, intStageIds);
+    }
+
+    @Override
+    public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+        finishedJobIds.add(jobEnd.jobId());
+        notify();
+    }
+
+    @Override
+    public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate 
environmentUpdate) {
+
+    }
+
+    @Override
+    public void onBlockManagerAdded(SparkListenerBlockManagerAdded 
blockManagerAdded) {
+
+    }
+
+    @Override
+    public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved 
blockManagerRemoved) {
+
+    }
+
+    @Override
+    public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+    }
+
+    @Override
+    public void onApplicationStart(SparkListenerApplicationStart 
applicationStart) {
+
+    }
+
+    @Override
+    public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+    }
+
+    @Override
+    public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate 
executorMetricsUpdate) {
+
+    }
+
+    public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) 
{
+        return allJobMetrics.get(jobId);
+    }
+
+    public synchronized boolean waitForJobToEnd(int jobId) throws 
InterruptedException {
+        if (finishedJobIds.contains(jobId)) {
+            finishedJobIds.remove(jobId);
+            return true;
+        }
+
+        wait();
+        return false;
+    }
+
+    public synchronized void cleanup(int jobId) {
+        allJobMetrics.remove(jobId);
+        jobIdToStageId.remove(jobId);
+        Iterator<Map.Entry<Integer, Integer>> iterator = 
stageIdToJobId.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Integer> entry = iterator.next();
+            if (entry.getValue() == jobId) {
+                iterator.remove();
+            }
+        }
+    }
+
+    public synchronized void reset() {
+        stageIdToJobId.clear();
+        jobIdToStageId.clear();
+        allJobMetrics.clear();
+        finishedJobIds.clear();
+    }
 }
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1689010&r1=1689009&r2=1689010&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Fri Jul  3 12:48:05 2015
@@ -104,89 +104,89 @@ import org.apache.spark.SparkException;
  */
 public class SparkLauncher extends Launcher {
 
-       private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
+    private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
 
-       // Our connection to Spark. It needs to be static so that it can be 
reused
-       // across jobs, because a
-       // new SparkLauncher gets created for each job.
-       private static JavaSparkContext sparkContext = null;
-       private static JobMetricsListener jobMetricsListener = new 
JobMetricsListener();
-       private String jobGroupID;
+    // Our connection to Spark. It needs to be static so that it can be reused
+    // across jobs, because a
+    // new SparkLauncher gets created for each job.
+    private static JavaSparkContext sparkContext = null;
+    private static JobMetricsListener jobMetricsListener = new 
JobMetricsListener();
+    private String jobGroupID;
     private PigContext pigContext = null;
     private JobConf jobConf = null;
     private String currentDirectoryPath = null;
 
-       @Override
-       public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
-                       PigContext pigContext) throws Exception {
-               if (LOG.isDebugEnabled())
-                   LOG.debug(physicalPlan);
+    @Override
+    public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
+                              PigContext pigContext) throws Exception {
+        if (LOG.isDebugEnabled())
+            LOG.debug(physicalPlan);
         this.pigContext = pigContext;
-               initialize();
-               SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
-               if (LOG.isDebugEnabled())
-                         explain(sparkplan, System.out, "text", true);
-               SparkPigStats sparkStats = (SparkPigStats) pigContext
-                               .getExecutionEngine().instantiatePigStats();
+        initialize();
+        SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
+        if (LOG.isDebugEnabled())
+            explain(sparkplan, System.out, "text", true);
+        SparkPigStats sparkStats = (SparkPigStats) pigContext
+                .getExecutionEngine().instantiatePigStats();
         sparkStats.initialize(sparkplan);
-               PigStats.start(sparkStats);
+        PigStats.start(sparkStats);
 
-               startSparkIfNeeded(pigContext);
+        startSparkIfNeeded(pigContext);
 
-               // Set a unique group id for this query, so we can lookup all 
Spark job
-               // ids
-               // related to this query.
-               jobGroupID = UUID.randomUUID().toString();
-               sparkContext.setJobGroup(jobGroupID, "Pig query to Spark 
cluster",
-                               false);
-               jobMetricsListener.reset();
-
-               this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
-                               .normalize().toString()
-                               + "/";
-               addFilesToSparkJob();
-               LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
-                               physicalPlan, POStore.class);
-               POStore firstStore = stores.getFirst();
-               if (firstStore != null) {
-                       MapRedUtil.setupStreamingDirsConfSingle(firstStore, 
pigContext,
-                                       jobConf);
-               }
-
-               new ParallelismSetter(sparkplan, jobConf).visit();
-
-               byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
-
-               // Create conversion map, mapping between pig operator and 
spark convertor
-               Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
-                               = new HashMap<Class<? extends 
PhysicalOperator>, RDDConverter>();
-               convertMap.put(POLoad.class, new LoadConverter(pigContext,
-                               physicalPlan, sparkContext.sc()));
-               convertMap.put(POStore.class, new StoreConverter(pigContext));
-               convertMap.put(POForEach.class, new 
ForEachConverter(confBytes));
-               convertMap.put(POFilter.class, new FilterConverter());
-               convertMap.put(POPackage.class, new 
PackageConverter(confBytes));
-               convertMap.put(POLocalRearrange.class, new 
LocalRearrangeConverter());
+        // Set a unique group id for this query, so we can lookup all Spark job
+        // ids
+        // related to this query.
+        jobGroupID = UUID.randomUUID().toString();
+        sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster",
+                false);
+        jobMetricsListener.reset();
+
+        this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
+                .normalize().toString()
+                + "/";
+        addFilesToSparkJob();
+        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+                physicalPlan, POStore.class);
+        POStore firstStore = stores.getFirst();
+        if (firstStore != null) {
+            MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext,
+                    jobConf);
+        }
+
+        new ParallelismSetter(sparkplan, jobConf).visit();
+
+        byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+
+        // Create conversion map, mapping between pig operator and spark 
convertor
+        Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
+                = new HashMap<Class<? extends PhysicalOperator>, 
RDDConverter>();
+        convertMap.put(POLoad.class, new LoadConverter(pigContext,
+                physicalPlan, sparkContext.sc()));
+        convertMap.put(POStore.class, new StoreConverter(pigContext));
+        convertMap.put(POForEach.class, new ForEachConverter(confBytes));
+        convertMap.put(POFilter.class, new FilterConverter());
+        convertMap.put(POPackage.class, new PackageConverter(confBytes));
+        convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
         convertMap.put(POGlobalRearrangeSpark.class, new 
GlobalRearrangeConverter());
         convertMap.put(POLimit.class, new LimitConverter());
         convertMap.put(PODistinct.class, new DistinctConverter());
-               convertMap.put(POUnion.class, new 
UnionConverter(sparkContext.sc()));
-               convertMap.put(POSort.class, new SortConverter());
-               convertMap.put(POSplit.class, new SplitConverter());
-               convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
-               convertMap.put(POMergeJoin.class, new MergeJoinConverter());
-               convertMap.put(POCollectedGroup.class, new 
CollectedGroupConverter());
-               convertMap.put(POCounter.class, new CounterConverter());
-               convertMap.put(PORank.class, new RankConverter());
-               convertMap.put(POStream.class, new StreamConverter(confBytes));
-                convertMap.put(POFRJoin.class, new FRJoinConverter());
-
-               sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
-               cleanUpSparkJob();
-               sparkStats.finish();
+        convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
+        convertMap.put(POSort.class, new SortConverter());
+        convertMap.put(POSplit.class, new SplitConverter());
+        convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
+        convertMap.put(POMergeJoin.class, new MergeJoinConverter());
+        convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
+        convertMap.put(POCounter.class, new CounterConverter());
+        convertMap.put(PORank.class, new RankConverter());
+        convertMap.put(POStream.class, new StreamConverter(confBytes));
+        convertMap.put(POFRJoin.class, new FRJoinConverter());
+
+        sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
+        cleanUpSparkJob();
+        sparkStats.finish();
 
-               return sparkStats;
-       }
+        return sparkStats;
+    }
 
     private void optimize(PigContext pc, SparkOperPlan plan) throws 
VisitorException {
         String prop = 
pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
@@ -203,122 +203,121 @@ public class SparkLauncher extends Launc
         }
     }
 
-       /**
-        * In Spark, currently only async actions return job id. There is no 
async
-        * equivalent of actions like saveAsNewAPIHadoopFile()
-        * 
-        * The only other way to get a job id is to register a "job group ID" 
with
-        * the spark context and request all job ids corresponding to that job 
group
-        * via getJobIdsForGroup.
-        * 
-        * However getJobIdsForGroup does not guarantee the order of the 
elements in
-        * it's result.
-        * 
-        * This method simply returns the previously unseen job ids.
-        * 
-        * @param seenJobIDs
-        *            job ids in the job group that are already seen
-        * @return Spark job ids not seen before
-        */
-       private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
-               Set<Integer> groupjobIDs = new HashSet<Integer>(
-                               
Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
-                                               
.getJobIdsForGroup(jobGroupID))));
-               groupjobIDs.removeAll(seenJobIDs);
-               List<Integer> unseenJobIDs = new 
ArrayList<Integer>(groupjobIDs);
-               if (unseenJobIDs.size() == 0) {
-                       throw new RuntimeException("Expected at least one 
unseen jobID "
-                                       + " in this call to getJobIdsForGroup, 
but got "
-                                       + unseenJobIDs.size());
-               }
-
-               seenJobIDs.addAll(unseenJobIDs);
-               return unseenJobIDs;
-       }
-
-       private void cleanUpSparkJob() {
-               LOG.info("clean up Spark Job");
-               boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
-                               
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
-               if (isLocal) {
-                       String shipFiles = 
pigContext.getProperties().getProperty(
-                                       "pig.streaming.ship.files");
-                       if (shipFiles != null) {
-                               for (String file : shipFiles.split(",")) {
-                                       File shipFile = new File(file);
-                                       File deleteFile = new 
File(currentDirectoryPath + "/"
-                                                       + shipFile.getName());
-                                       if (deleteFile.exists()) {
-                                               LOG.info(String.format("delete 
ship file result: %b",
-                                                               
deleteFile.delete()));
-                                       }
-                               }
-                       }
-                       String cacheFiles = 
pigContext.getProperties().getProperty(
-                                       "pig.streaming.cache.files");
-                       if (cacheFiles != null) {
-                               for (String file : cacheFiles.split(",")) {
-                                       String fileName = 
extractFileName(file.trim());
-                                       File deleteFile = new 
File(currentDirectoryPath + "/"
-                                                       + fileName);
-                                       if (deleteFile.exists()) {
-                                               LOG.info(String.format("delete 
cache file result: %b",
-                                                               
deleteFile.delete()));
-                                       }
-                               }
-                       }
-               }
-       }
-
-       private void addFilesToSparkJob() throws IOException {
-               LOG.info("add Files Spark Job");
-               String shipFiles = pigContext.getProperties().getProperty(
-                               "pig.streaming.ship.files");
-               shipFiles(shipFiles);
-               String cacheFiles = pigContext.getProperties().getProperty(
-                               "pig.streaming.cache.files");
-               cacheFiles(cacheFiles);
-       }
-
-
-       private void shipFiles(String shipFiles)
-                       throws IOException {
-               if (shipFiles != null) {
-                       for (String file : shipFiles.split(",")) {
-                               File shipFile = new File(file.trim());
-                               if (shipFile.exists()) {
-                                       LOG.info(String.format("shipFile:%s", 
shipFile));
-                    
addJarToSparkJobWorkingDirectory(shipFile,shipFile.getName());
-                               }
-                       }
-               }
-       }
-
-       private void cacheFiles(String cacheFiles) throws IOException {
-               if (cacheFiles != null) {
-                       Configuration conf = SparkUtil.newJobConf(pigContext);
-                       for (String file : cacheFiles.split(",")) {
-                               String fileName = extractFileName(file.trim());
-                               Path src = new 
Path(extractFileUrl(file.trim()));
-                               File tmpFile = File.createTempFile(fileName, 
".tmp");
-                               Path tmpFilePath = new 
Path(tmpFile.getAbsolutePath());
-                               FileSystem fs = tmpFilePath.getFileSystem(conf);
-                               fs.copyToLocalFile(src, tmpFilePath);
-                               tmpFile.deleteOnExit();
+    /**
+     * In Spark, currently only async actions return job id. There is no async
+     * equivalent of actions like saveAsNewAPIHadoopFile()
+     * <p/>
+     * The only other way to get a job id is to register a "job group ID" with
+     * the spark context and request all job ids corresponding to that job 
group
+     * via getJobIdsForGroup.
+     * <p/>
+     * However getJobIdsForGroup does not guarantee the order of the elements 
in
+     * it's result.
+     * <p/>
+     * This method simply returns the previously unseen job ids.
+     *
+     * @param seenJobIDs job ids in the job group that are already seen
+     * @return Spark job ids not seen before
+     */
+    private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+        Set<Integer> groupjobIDs = new HashSet<Integer>(
+                Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
+                        .getJobIdsForGroup(jobGroupID))));
+        groupjobIDs.removeAll(seenJobIDs);
+        List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
+        if (unseenJobIDs.size() == 0) {
+            throw new RuntimeException("Expected at least one unseen jobID "
+                    + " in this call to getJobIdsForGroup, but got "
+                    + unseenJobIDs.size());
+        }
+
+        seenJobIDs.addAll(unseenJobIDs);
+        return unseenJobIDs;
+    }
+
+    private void cleanUpSparkJob() {
+        LOG.info("clean up Spark Job");
+        boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
+                .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+        if (isLocal) {
+            String shipFiles = pigContext.getProperties().getProperty(
+                    "pig.streaming.ship.files");
+            if (shipFiles != null) {
+                for (String file : shipFiles.split(",")) {
+                    File shipFile = new File(file);
+                    File deleteFile = new File(currentDirectoryPath + "/"
+                            + shipFile.getName());
+                    if (deleteFile.exists()) {
+                        LOG.info(String.format("delete ship file result: %b",
+                                deleteFile.delete()));
+                    }
+                }
+            }
+            String cacheFiles = pigContext.getProperties().getProperty(
+                    "pig.streaming.cache.files");
+            if (cacheFiles != null) {
+                for (String file : cacheFiles.split(",")) {
+                    String fileName = extractFileName(file.trim());
+                    File deleteFile = new File(currentDirectoryPath + "/"
+                            + fileName);
+                    if (deleteFile.exists()) {
+                        LOG.info(String.format("delete cache file result: %b",
+                                deleteFile.delete()));
+                    }
+                }
+            }
+        }
+    }
+
+    private void addFilesToSparkJob() throws IOException {
+        LOG.info("add Files Spark Job");
+        String shipFiles = pigContext.getProperties().getProperty(
+                "pig.streaming.ship.files");
+        shipFiles(shipFiles);
+        String cacheFiles = pigContext.getProperties().getProperty(
+                "pig.streaming.cache.files");
+        cacheFiles(cacheFiles);
+    }
+
+
+    private void shipFiles(String shipFiles)
+            throws IOException {
+        if (shipFiles != null) {
+            for (String file : shipFiles.split(",")) {
+                File shipFile = new File(file.trim());
+                if (shipFile.exists()) {
+                    LOG.info(String.format("shipFile:%s", shipFile));
+                    addJarToSparkJobWorkingDirectory(shipFile, 
shipFile.getName());
+                }
+            }
+        }
+    }
+
+    private void cacheFiles(String cacheFiles) throws IOException {
+        if (cacheFiles != null) {
+            Configuration conf = SparkUtil.newJobConf(pigContext);
+            for (String file : cacheFiles.split(",")) {
+                String fileName = extractFileName(file.trim());
+                Path src = new Path(extractFileUrl(file.trim()));
+                File tmpFile = File.createTempFile(fileName, ".tmp");
+                Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
+                FileSystem fs = tmpFilePath.getFileSystem(conf);
+                fs.copyToLocalFile(src, tmpFilePath);
+                tmpFile.deleteOnExit();
                 LOG.info(String.format("cacheFile:%s", fileName));
-                           addJarToSparkJobWorkingDirectory(tmpFile, fileName);
-                       }
-               }
-       }
+                addJarToSparkJobWorkingDirectory(tmpFile, fileName);
+            }
+        }
+    }
 
     private void addJarToSparkJobWorkingDirectory(File jarFile, String 
jarName) throws IOException {
-        LOG.info("Added jar "+jarName);
+        LOG.info("Added jar " + jarName);
         boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
                 .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
         if (isLocal) {
             File localFile = new File(currentDirectoryPath + "/"
                     + jarName);
-            if (jarFile.getAbsolutePath().equals(localFile.getAbsolutePath()) 
+            if (jarFile.getAbsolutePath().equals(localFile.getAbsolutePath())
                     && jarFile.exists()) {
                 return;
             }
@@ -339,119 +338,119 @@ public class SparkLauncher extends Launc
         }
     }
 
-       private String extractFileName(String cacheFileUrl) {
-               String[] tmpAry = cacheFileUrl.split("#");
-               String fileName = tmpAry != null && tmpAry.length == 2 ? 
tmpAry[1]
-                               : null;
-               if (fileName == null) {
-                       throw new RuntimeException("cache file is invalid 
format, file:"
-                                       + cacheFileUrl);
-               } else {
-                       LOG.debug("cache file name is valid:" + cacheFileUrl);
-                       return fileName;
-               }
-       }
-
-       private String extractFileUrl(String cacheFileUrl) {
-               String[] tmpAry = cacheFileUrl.split("#");
-               String fileName = tmpAry != null && tmpAry.length == 2 ? 
tmpAry[0]
-                               : null;
-               if (fileName == null) {
-                       throw new RuntimeException("cache file is invalid 
format, file:"
-                                       + cacheFileUrl);
-               } else {
-                       LOG.debug("cache file name is valid:" + cacheFileUrl);
-                       return fileName;
-               }
-       }
-
-       private SparkOperPlan compile(PhysicalPlan physicalPlan,
-                       PigContext pigContext) throws PlanException, 
IOException,
-                       VisitorException {
-               SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
-                               pigContext);
-               sparkCompiler.compile();
-               SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
-
-               // optimize key - value handling in package
-               SparkPOPackageAnnotator pkgAnnotator = new 
SparkPOPackageAnnotator(
-                               sparkPlan);
-               pkgAnnotator.visit();
-
-               optimize(pigContext, sparkPlan);
-               return sparkPlan;
-       }
-
-       private static void startSparkIfNeeded(PigContext pc) throws 
PigException {
-               if (sparkContext == null) {
-                       String master = null;
-                       if (pc.getExecType().isLocal()) {
-                               master = "local";
-                       } else {
-                               master = System.getenv("SPARK_MASTER");
-                               if (master == null) {
-                                       LOG.info("SPARK_MASTER not specified, 
using \"local\"");
-                                       master = "local";
-                               }
-                       }
-
-                       String sparkHome = System.getenv("SPARK_HOME");
-                       String sparkJarsSetting = System.getenv("SPARK_JARS");
-                       String pigJar = System.getenv("SPARK_PIG_JAR");
-                       String[] sparkJars = sparkJarsSetting == null ? new 
String[] {}
-                                       : sparkJarsSetting.split(",");
-                       List<String> jars = Lists.asList(pigJar, sparkJars);
-
-                       if (!master.startsWith("local") && 
!master.equals("yarn-client")) {
-                               // Check that we have the Mesos native library 
and Spark home
-                               // are set
-                               if (sparkHome == null) {
-                                       System.err
-                                                       .println("You need to 
set SPARK_HOME to run on a Mesos cluster!");
-                                       throw new PigException("SPARK_HOME is 
not set");
-                               }
-                       }
-
-                       sparkContext = new JavaSparkContext(master, 
"PigOnSpark", sparkHome,
-                                       jars.toArray(new String[jars.size()]));
-                       sparkContext.sc().addSparkListener(new 
StatsReportListener());
-                       sparkContext.sc().addSparkListener(new JobLogger());
-                       sparkContext.sc().addSparkListener(jobMetricsListener);
-               }
-       }
-
-       // You can use this in unit tests to stop the SparkContext between 
tests.
-       static void stopSpark() {
-               if (sparkContext != null) {
-                       sparkContext.stop();
-                       sparkContext = null;
-               }
-       }
-
-       private void sparkPlanToRDD(SparkOperPlan sparkPlan,
-                       Map<Class<? extends PhysicalOperator>, RDDConverter> 
convertMap,
-                       SparkPigStats sparkStats, JobConf jobConf)
-                       throws IOException, InterruptedException {
-               Set<Integer> seenJobIDs = new HashSet<Integer>();
-               if (sparkPlan == null) {
-                       throw new RuntimeException("SparkPlan is null.");
-               }
-
-               List<SparkOperator> leaves = sparkPlan.getLeaves();
-               Collections.sort(leaves);
-               Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Converting " + leaves.size() + " Spark 
Operators to RDDs");
-               }
-
-               for (SparkOperator leaf : leaves) {
-                       new PhyPlanSetter(leaf.physicalPlan).visit();
-                       Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new 
HashMap();
-                       sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
-                                       physicalOpToRdds, convertMap, 
seenJobIDs, sparkStats,
-                                       jobConf);
-               }
-       }
+    private String extractFileName(String cacheFileUrl) {
+        String[] tmpAry = cacheFileUrl.split("#");
+        String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1]
+                : null;
+        if (fileName == null) {
+            throw new RuntimeException("cache file is invalid format, file:"
+                    + cacheFileUrl);
+        } else {
+            LOG.debug("cache file name is valid:" + cacheFileUrl);
+            return fileName;
+        }
+    }
+
+    private String extractFileUrl(String cacheFileUrl) {
+        String[] tmpAry = cacheFileUrl.split("#");
+        String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0]
+                : null;
+        if (fileName == null) {
+            throw new RuntimeException("cache file is invalid format, file:"
+                    + cacheFileUrl);
+        } else {
+            LOG.debug("cache file name is valid:" + cacheFileUrl);
+            return fileName;
+        }
+    }
+
+    private SparkOperPlan compile(PhysicalPlan physicalPlan,
+                                  PigContext pigContext) throws PlanException, 
IOException,
+            VisitorException {
+        SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
+                pigContext);
+        sparkCompiler.compile();
+        SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
+
+        // optimize key - value handling in package
+        SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator(
+                sparkPlan);
+        pkgAnnotator.visit();
+
+        optimize(pigContext, sparkPlan);
+        return sparkPlan;
+    }
+
+    private static void startSparkIfNeeded(PigContext pc) throws PigException {
+        if (sparkContext == null) {
+            String master = null;
+            if (pc.getExecType().isLocal()) {
+                master = "local";
+            } else {
+                master = System.getenv("SPARK_MASTER");
+                if (master == null) {
+                    LOG.info("SPARK_MASTER not specified, using \"local\"");
+                    master = "local";
+                }
+            }
+
+            String sparkHome = System.getenv("SPARK_HOME");
+            String sparkJarsSetting = System.getenv("SPARK_JARS");
+            String pigJar = System.getenv("SPARK_PIG_JAR");
+            String[] sparkJars = sparkJarsSetting == null ? new String[]{}
+                    : sparkJarsSetting.split(",");
+            List<String> jars = Lists.asList(pigJar, sparkJars);
+
+            if (!master.startsWith("local") && !master.equals("yarn-client")) {
+                // Check that we have the Mesos native library and Spark home
+                // are set
+                if (sparkHome == null) {
+                    System.err
+                            .println("You need to set SPARK_HOME to run on a 
Mesos cluster!");
+                    throw new PigException("SPARK_HOME is not set");
+                }
+            }
+
+            sparkContext = new JavaSparkContext(master, "PigOnSpark", 
sparkHome,
+                    jars.toArray(new String[jars.size()]));
+            sparkContext.sc().addSparkListener(new StatsReportListener());
+            sparkContext.sc().addSparkListener(new JobLogger());
+            sparkContext.sc().addSparkListener(jobMetricsListener);
+        }
+    }
+
+    // You can use this in unit tests to stop the SparkContext between tests.
+    static void stopSpark() {
+        if (sparkContext != null) {
+            sparkContext.stop();
+            sparkContext = null;
+        }
+    }
+
+    private void sparkPlanToRDD(SparkOperPlan sparkPlan,
+                                Map<Class<? extends PhysicalOperator>, 
RDDConverter> convertMap,
+                                SparkPigStats sparkStats, JobConf jobConf)
+            throws IOException, InterruptedException {
+        Set<Integer> seenJobIDs = new HashSet<Integer>();
+        if (sparkPlan == null) {
+            throw new RuntimeException("SparkPlan is null.");
+        }
+
+        List<SparkOperator> leaves = sparkPlan.getLeaves();
+        Collections.sort(leaves);
+        Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Converting " + leaves.size() + " Spark Operators to 
RDDs");
+        }
+
+        for (SparkOperator leaf : leaves) {
+            new PhyPlanSetter(leaf.physicalPlan).visit();
+            Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap();
+            sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
+                    physicalOpToRdds, convertMap, seenJobIDs, sparkStats,
+                    jobConf);
+        }
+    }
 
     private void addUDFJarsToSparkJobWorkingDirectory(SparkOperator leaf) 
throws IOException {
 
@@ -459,7 +458,7 @@ public class SparkLauncher extends Launc
             Class clazz = pigContext.getClassForAlias(udf);
             if (clazz != null) {
                 String jar = JarManager.findContainingJar(clazz);
-                if( jar != null) {
+                if (jar != null) {
                     File jarFile = new File(jar);
                     addJarToSparkJobWorkingDirectory(jarFile, 
jarFile.getName());
                 }
@@ -468,60 +467,60 @@ public class SparkLauncher extends Launc
     }
 
     private void sparkOperToRDD(SparkOperPlan sparkPlan,
-                       SparkOperator sparkOperator,
-                       Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
-                       Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
-                       Map<Class<? extends PhysicalOperator>, RDDConverter> 
convertMap,
-                       Set<Integer> seenJobIDs, SparkPigStats sparkStats, 
JobConf conf)
-                       throws IOException, InterruptedException {
+                                SparkOperator sparkOperator,
+                                Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
+                                Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
+                                Map<Class<? extends PhysicalOperator>, 
RDDConverter> convertMap,
+                                Set<Integer> seenJobIDs, SparkPigStats 
sparkStats, JobConf conf)
+            throws IOException, InterruptedException {
         addUDFJarsToSparkJobWorkingDirectory(sparkOperator);
-               List<SparkOperator> predecessors = sparkPlan
-                               .getPredecessors(sparkOperator);
-               List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
-               if (predecessors != null) {
-                       for (SparkOperator pred : predecessors) {
-                               if (sparkOpRdds.get(pred.getOperatorKey()) == 
null) {
-                                       sparkOperToRDD(sparkPlan, pred, 
sparkOpRdds,
-                                                       physicalOpRdds, 
convertMap, seenJobIDs, sparkStats,
-                                                       conf);
-                               }
-                               
predecessorRDDs.add(sparkOpRdds.get(pred.getOperatorKey()));
-                       }
-               }
-
-               List<PhysicalOperator> leafPOs = 
sparkOperator.physicalPlan.getLeaves();
-               boolean isFail = false;
-               Exception exception = null;
-               if (leafPOs != null && leafPOs.size() != 1) {
-                       throw new IllegalArgumentException(
-                                       String.format(
-                                                       "sparkOperator "
-                                                                       + 
".physicalPlan should have 1 leaf, but  sparkOperator"
-                                                                       + 
".physicalPlan.getLeaves():{} not equals 1, sparkOperator"
-                                                                       + 
"sparkOperator:{}",
-                                                       
sparkOperator.physicalPlan.getLeaves().size(),
-                                                       sparkOperator.name()));
-               }
-
-               PhysicalOperator leafPO = leafPOs.get(0);
-               try {
-                       physicalToRDD(sparkOperator.physicalPlan, leafPO, 
physicalOpRdds,
-                                       predecessorRDDs, convertMap);
-                       sparkOpRdds.put(sparkOperator.getOperatorKey(),
-                                       
physicalOpRdds.get(leafPO.getOperatorKey()));
-               } catch(Exception e) {
-                       if( e instanceof  SparkException) {
-                               LOG.info("throw SparkException, error founds 
when running " +
-                                               "rdds in spark");
-                       }
-                       exception = e;
-                       isFail = true;
-               }
-
-               List<POStore> poStores = PlanHelper.getPhysicalOperators(
-                               sparkOperator.physicalPlan, POStore.class);
-               if (poStores != null && poStores.size() == 1) {
-                         POStore poStore = poStores.get(0);
+        List<SparkOperator> predecessors = sparkPlan
+                .getPredecessors(sparkOperator);
+        List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+        if (predecessors != null) {
+            for (SparkOperator pred : predecessors) {
+                if (sparkOpRdds.get(pred.getOperatorKey()) == null) {
+                    sparkOperToRDD(sparkPlan, pred, sparkOpRdds,
+                            physicalOpRdds, convertMap, seenJobIDs, sparkStats,
+                            conf);
+                }
+                predecessorRDDs.add(sparkOpRdds.get(pred.getOperatorKey()));
+            }
+        }
+
+        List<PhysicalOperator> leafPOs = 
sparkOperator.physicalPlan.getLeaves();
+        boolean isFail = false;
+        Exception exception = null;
+        if (leafPOs != null && leafPOs.size() != 1) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "sparkOperator "
+                                    + ".physicalPlan should have 1 leaf, but  
sparkOperator"
+                                    + ".physicalPlan.getLeaves():{} not equals 
1, sparkOperator"
+                                    + "sparkOperator:{}",
+                            sparkOperator.physicalPlan.getLeaves().size(),
+                            sparkOperator.name()));
+        }
+
+        PhysicalOperator leafPO = leafPOs.get(0);
+        try {
+            physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+                    predecessorRDDs, convertMap);
+            sparkOpRdds.put(sparkOperator.getOperatorKey(),
+                    physicalOpRdds.get(leafPO.getOperatorKey()));
+        } catch (Exception e) {
+            if (e instanceof SparkException) {
+                LOG.info("throw SparkException, error founds when running " +
+                        "rdds in spark");
+            }
+            exception = e;
+            isFail = true;
+        }
+
+        List<POStore> poStores = PlanHelper.getPhysicalOperators(
+                sparkOperator.physicalPlan, POStore.class);
+        if (poStores != null && poStores.size() == 1) {
+            POStore poStore = poStores.get(0);
             if (!isFail) {
                 for (int jobID : getJobIDs(seenJobIDs)) {
                     SparkStatsUtil.waitForJobAddStats(jobID, poStore, 
sparkOperator,
@@ -533,19 +532,19 @@ public class SparkLauncher extends Launc
                         conf, exception);
             }
         } else {
-                       LOG.info(String
-                                       .format(String.format("sparkOperator:{} 
does not have POStore or" +
-                                                                       " 
sparkOperator has more than 1 POStore. {} is the size of POStore."),
-                                                       sparkOperator.name(), 
poStores.size()));
-               }
-       }
-
-       private void physicalToRDD(PhysicalPlan plan,
-                       PhysicalOperator physicalOperator,
-                       Map<OperatorKey, RDD<Tuple>> rdds,
-                       List<RDD<Tuple>> rddsFromPredeSparkOper,
-                       Map<Class<? extends PhysicalOperator>, RDDConverter> 
convertMap)
-                       throws IOException {
+            LOG.info(String
+                    .format(String.format("sparkOperator:{} does not have 
POStore or" +
+                                    " sparkOperator has more than 1 POStore. 
{} is the size of POStore."),
+                            sparkOperator.name(), poStores.size()));
+        }
+    }
+
+    private void physicalToRDD(PhysicalPlan plan,
+                               PhysicalOperator physicalOperator,
+                               Map<OperatorKey, RDD<Tuple>> rdds,
+                               List<RDD<Tuple>> rddsFromPredeSparkOper,
+                               Map<Class<? extends PhysicalOperator>, 
RDDConverter> convertMap)
+            throws IOException {
         RDD<Tuple> nextRDD = null;
         List<PhysicalOperator> predecessors = plan
                 .getPredecessors(physicalOperator);
@@ -554,88 +553,88 @@ public class SparkLauncher extends Launc
         }
 
         List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
-               if (predecessors != null) {
-                       for (PhysicalOperator predecessor : predecessors) {
-                               physicalToRDD(plan, predecessor, rdds, 
rddsFromPredeSparkOper,
-                                               convertMap);
-                               
predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
-                       }
-
-               } else {
-                       if (rddsFromPredeSparkOper != null
-                                       && rddsFromPredeSparkOper.size() > 0) {
-                               predecessorRdds.addAll(rddsFromPredeSparkOper);
-                       }
-               }
-
-               RDDConverter converter = 
convertMap.get(physicalOperator.getClass());
-               if (converter == null) {
-                       throw new IllegalArgumentException(
-                                       "Pig on Spark does not support Physical 
Operator: " + physicalOperator);
-               }
-
-               LOG.info("Converting operator "
-                               + physicalOperator.getClass().getSimpleName() + 
" "
-                               + physicalOperator);
-               nextRDD = converter.convert(predecessorRdds, physicalOperator);
-
-               if (nextRDD == null) {
-                       throw new IllegalArgumentException(
-                                       "RDD should not be null after 
PhysicalOperator: "
-                                                       + physicalOperator);
-               }
-
-               rdds.put(physicalOperator.getOperatorKey(), nextRDD);
-       }
-
-       @Override
-       public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
-                       String format, boolean verbose) throws IOException {
-               SparkOperPlan sparkPlan = compile(pp, pc);
-               explain(sparkPlan, ps, format, verbose);
-       }
-
-       private void explain(SparkOperPlan sparkPlan, PrintStream ps,
-           String format, boolean verbose)
-                 throws IOException {
-               Map<OperatorKey, SparkOperator> allOperKeys = 
sparkPlan.getKeys();
-               List<OperatorKey> operKeyList = new 
ArrayList(allOperKeys.keySet());
-               Collections.sort(operKeyList);
-               for (OperatorKey operatorKey : operKeyList) {
-                       SparkOperator op = sparkPlan.getOperator(operatorKey);
-                       ps.print(op.getOperatorKey());
-                       List<SparkOperator> successors = 
sparkPlan.getSuccessors(op);
-                       if (successors != null) {
-                               ps.print("->");
-                               for (SparkOperator suc : successors) {
-                                       ps.print(suc.getOperatorKey() + " ");
-                               }
-                       }
-                       ps.println();
-               }
-
-               if (format.equals("text")) {
-                       SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
-                       printer.setVerbose(verbose);
-                       printer.visit();
-               } else { // TODO: add support for other file format
-                       throw new IOException(
-                                       "Non-text output of explain is not 
supported.");
-               }
-       }
-
-       @Override
-       public void kill() throws BackendException {
-               // TODO Auto-generated method stub
-
-       }
-
-       @Override
-       public void killJob(String jobID, Configuration conf)
-                       throws BackendException {
-               // TODO Auto-generated method stub
+        if (predecessors != null) {
+            for (PhysicalOperator predecessor : predecessors) {
+                physicalToRDD(plan, predecessor, rdds, rddsFromPredeSparkOper,
+                        convertMap);
+                predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
+            }
 
-       }
+        } else {
+            if (rddsFromPredeSparkOper != null
+                    && rddsFromPredeSparkOper.size() > 0) {
+                predecessorRdds.addAll(rddsFromPredeSparkOper);
+            }
+        }
+
+        RDDConverter converter = convertMap.get(physicalOperator.getClass());
+        if (converter == null) {
+            throw new IllegalArgumentException(
+                    "Pig on Spark does not support Physical Operator: " + 
physicalOperator);
+        }
+
+        LOG.info("Converting operator "
+                + physicalOperator.getClass().getSimpleName() + " "
+                + physicalOperator);
+        nextRDD = converter.convert(predecessorRdds, physicalOperator);
+
+        if (nextRDD == null) {
+            throw new IllegalArgumentException(
+                    "RDD should not be null after PhysicalOperator: "
+                            + physicalOperator);
+        }
+
+        rdds.put(physicalOperator.getOperatorKey(), nextRDD);
+    }
+
+    @Override
+    public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
+                        String format, boolean verbose) throws IOException {
+        SparkOperPlan sparkPlan = compile(pp, pc);
+        explain(sparkPlan, ps, format, verbose);
+    }
+
+    private void explain(SparkOperPlan sparkPlan, PrintStream ps,
+                         String format, boolean verbose)
+            throws IOException {
+        Map<OperatorKey, SparkOperator> allOperKeys = sparkPlan.getKeys();
+        List<OperatorKey> operKeyList = new ArrayList(allOperKeys.keySet());
+        Collections.sort(operKeyList);
+        for (OperatorKey operatorKey : operKeyList) {
+            SparkOperator op = sparkPlan.getOperator(operatorKey);
+            ps.print(op.getOperatorKey());
+            List<SparkOperator> successors = sparkPlan.getSuccessors(op);
+            if (successors != null) {
+                ps.print("->");
+                for (SparkOperator suc : successors) {
+                    ps.print(suc.getOperatorKey() + " ");
+                }
+            }
+            ps.println();
+        }
+
+        if (format.equals("text")) {
+            SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
+            printer.setVerbose(verbose);
+            printer.visit();
+        } else { // TODO: add support for other file format
+            throw new IOException(
+                    "Non-text output of explain is not supported.");
+        }
+    }
+
+    @Override
+    public void kill() throws BackendException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void killJob(String jobID, Configuration conf)
+            throws BackendException {
+        // TODO Auto-generated method stub
+
+    }
 
     /**
      * We store the value of udf.import.list in 
PigContext#properties.getProperty("spark.udf.import.list") in spark mode.

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java?rev=1689010&r1=1689009&r2=1689010&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
 Fri Jul  3 12:48:05 2015
@@ -25,25 +25,25 @@ import java.util.Properties;
  */
 public class SparkLocalExecType extends SparkExecType {
 
-  private static final long serialVersionUID = 1L;
-  private static final String mode ="SPARK_LOCAL";
+    private static final long serialVersionUID = 1L;
+    private static final String mode = "SPARK_LOCAL";
 
-  @Override
-  public boolean accepts(Properties properties) {
-    String execTypeSpecified = properties.getProperty("exectype", "")
-        .toUpperCase();
-    if (execTypeSpecified.equals(mode))
-      return true;
-    return false;
-  }
+    @Override
+    public boolean accepts(Properties properties) {
+        String execTypeSpecified = properties.getProperty("exectype", "")
+                .toUpperCase();
+        if (execTypeSpecified.equals(mode))
+            return true;
+        return false;
+    }
 
-  @Override
-  public boolean isLocal() {
-    return true;
-  }
+    @Override
+    public boolean isLocal() {
+        return true;
+    }
 
-  @Override
-  public String name() {
-    return "SPARK_LOCAL";
-  }
+    @Override
+    public String name() {
+        return "SPARK_LOCAL";
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1689010&r1=1689009&r2=1689010&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 Fri Jul  3 12:48:05 2015
@@ -29,60 +29,60 @@ import org.apache.pig.data.Tuple;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
-@SuppressWarnings({ "serial"})
+@SuppressWarnings({"serial"})
 public class CollectedGroupConverter implements RDDConverter<Tuple, Tuple, 
POCollectedGroup> {
 
-       @Override
-  public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
-      POCollectedGroup physicalOperator) throws IOException {
-    SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
-    RDD<Tuple> rdd = predecessors.get(0);
-    CollectedGroupFunction collectedGroupFunction
-        = new CollectedGroupFunction(physicalOperator);
-    return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true)
-                       .rdd();
-  }
-
-       private static class CollectedGroupFunction
-                       implements FlatMapFunction<Iterator<Tuple>, Tuple> {
-
-               private POCollectedGroup poCollectedGroup;
-
-               public long current_val;
-               public boolean proceed;
-
-               private CollectedGroupFunction(POCollectedGroup 
poCollectedGroup) {
-                       this.poCollectedGroup = poCollectedGroup;
-                       this.current_val = 0;
-               }
-
-               public Iterable<Tuple> call(final Iterator<Tuple> input) {
-
-                         return new Iterable<Tuple>() {
-
-                               @Override
-                               public Iterator<Tuple> iterator() {
-
-                                 return new OutputConsumerIterator(input) {
-
-                                         @Override
-                                         protected void attach(Tuple tuple) {
-                                                 
poCollectedGroup.setInputs(null);
-                                                 
poCollectedGroup.attachInput(tuple);
-                                         }
-
-                                         @Override
-                                         protected Result getNextResult() 
throws ExecException {
-                                             return 
poCollectedGroup.getNextTuple();
-                                         }
-
-                                         @Override
-                                         protected void endOfInput() {
-                                                 
poCollectedGroup.getParentPlan().endOfAllInput = true;
-                                         }
-                                 };
-                               }
-                 };
-               }
-       }
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POCollectedGroup physicalOperator) throws 
IOException {
+        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        CollectedGroupFunction collectedGroupFunction
+                = new CollectedGroupFunction(physicalOperator);
+        return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true)
+                .rdd();
+    }
+
+    private static class CollectedGroupFunction
+            implements FlatMapFunction<Iterator<Tuple>, Tuple> {
+
+        private POCollectedGroup poCollectedGroup;
+
+        public long current_val;
+        public boolean proceed;
+
+        private CollectedGroupFunction(POCollectedGroup poCollectedGroup) {
+            this.poCollectedGroup = poCollectedGroup;
+            this.current_val = 0;
+        }
+
+        public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+            return new Iterable<Tuple>() {
+
+                @Override
+                public Iterator<Tuple> iterator() {
+
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poCollectedGroup.setInputs(null);
+                            poCollectedGroup.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return poCollectedGroup.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                            poCollectedGroup.getParentPlan().endOfAllInput = 
true;
+                        }
+                    };
+                }
+            };
+        }
+    }
 }


Reply via email to