Author: xuefu
Date: Wed Jul  1 13:34:06 2015
New Revision: 1688648

URL: http://svn.apache.org/r1688648
Log:
PIG-4614: Enable TestLocationInPhysicalPlan in spark mode (Liyun via Xuefu)

Added:
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
    
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
 Wed Jul  1 13:34:06 2015
@@ -23,8 +23,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
-import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkScriptState;
 
 public class SparkExecutionEngine extends HExecutionEngine {
 
@@ -35,7 +35,7 @@ public class SparkExecutionEngine extend
 
     @Override
     public ScriptState instantiateScriptState() {
-        MRScriptState ss = new MRScriptState(UUID.randomUUID().toString());
+        SparkScriptState ss = new 
SparkScriptState(UUID.randomUUID().toString());
         ss.setPigContext(pigContext);
         return ss;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed Jul  1 13:34:06 2015
@@ -128,6 +128,7 @@ public class SparkLauncher extends Launc
                          explain(sparkplan, System.out, "text", true);
                SparkPigStats sparkStats = (SparkPigStats) pigContext
                                .getExecutionEngine().instantiatePigStats();
+        sparkStats.initialize(sparkplan);
                PigStats.start(sparkStats);
 
                startSparkIfNeeded(pigContext);
@@ -523,12 +524,12 @@ public class SparkLauncher extends Launc
                          POStore poStore = poStores.get(0);
             if (!isFail) {
                 for (int jobID : getJobIDs(seenJobIDs)) {
-                    SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+                    SparkStatsUtil.waitForJobAddStats(jobID, poStore, 
sparkOperator,
                             jobMetricsListener, sparkContext, sparkStats, 
conf);
                 }
             } else {
                 String failJobID = sparkOperator.name().concat("_fail");
-                SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkStats,
+                SparkStatsUtil.addFailJobStats(failJobID, poStore, 
sparkOperator, sparkStats,
                         conf, exception);
             }
         } else {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Wed Jul  1 13:34:06 2015
@@ -376,6 +376,7 @@ public class SparkCompiler extends PhyPl
                try {
                        SparkOperator nativesparkOpper = getNativeSparkOp(
                                        op.getNativeMRjar(), op.getParams());
+            nativesparkOpper.markNative();
                        sparkPlan.add(nativesparkOpper);
                        sparkPlan.connect(curSparkOp, nativesparkOpper);
                        phyToSparkOpMap.put(op, nativesparkOpper);
@@ -459,6 +460,7 @@ public class SparkCompiler extends PhyPl
                 POLimit pLimit2 = new POLimit(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
                 pLimit2.setLimit(limit);
                 curSparkOp.physicalPlan.addAsLeaf(pLimit2);
+                curSparkOp.markLimitAfterSort();
             }
             phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
@@ -473,6 +475,7 @@ public class SparkCompiler extends PhyPl
        public void visitLimit(POLimit op) throws VisitorException {
                try {
                        addToPlan(op);
+            curSparkOp.markLimit();
                } catch (Exception e) {
                        int errCode = 2034;
                        String msg = "Error compiling operator "
@@ -640,6 +643,7 @@ public class SparkCompiler extends PhyPl
                try {
                        addToPlan(op);
                        phyToSparkOpMap.put(op, curSparkOp);
+            curSparkOp.markUnion();
                } catch (Exception e) {
                        int errCode = 2034;
                        String msg = "Error compiling operator "

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 Wed Jul  1 13:34:06 2015
@@ -45,7 +45,15 @@ public class SparkOperator extends Opera
                // Indicate if this job is a cogroup job
                COGROUP,
                // Indicate if this job is a regular join job
-               HASHJOIN;
+               HASHJOIN,
+        // Indicate if this job is a union job
+        UNION,
+        // Indicate if this job is a native job
+        NATIVE,
+        // Indicate if this job is a limit job
+        LIMIT,
+        // Indicate if this job is a limit job after sort
+        LIMIT_AFTER_SORT;
        };
 
        public PhysicalPlan physicalPlan;
@@ -206,4 +214,35 @@ public class SparkOperator extends Opera
     public void markIndexer() {
         feature.set(OPER_FEATURE.INDEXER.ordinal());
     }
+    public boolean isUnion() {
+        return feature.get(OPER_FEATURE.UNION.ordinal());
+    }
+
+    public void markUnion() {
+        feature.set(OPER_FEATURE.UNION.ordinal());
+    }
+
+    public boolean isNative() {
+        return feature.get(OPER_FEATURE.NATIVE.ordinal());
+    }
+
+    public void markNative() {
+        feature.set(OPER_FEATURE.NATIVE.ordinal());
+    }
+
+    public boolean isLimit() {
+        return feature.get(OPER_FEATURE.LIMIT.ordinal());
+    }
+
+    public void markLimit() {
+        feature.set(OPER_FEATURE.LIMIT.ordinal());
+    }
+
+    public boolean isLimitAfterSort() {
+        return feature.get(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
+    }
+
+    public void markLimitAfterSort() {
+        feature.set(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
Wed Jul  1 13:34:06 2015
@@ -20,6 +20,7 @@ package org.apache.pig.tools.pigstats.sp
 
 import java.util.List;
 import java.util.Map;
+
 import scala.Option;
 
 import com.google.common.collect.Maps;
@@ -28,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.Counters;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.tools.pigstats.JobStats;
@@ -40,244 +42,252 @@ import org.apache.spark.executor.TaskMet
 
 public class SparkJobStats extends JobStats {
 
-  private int jobId;
-  private Map<String, Long> stats = Maps.newLinkedHashMap();
+    private int jobId;
+    private Map<String, Long> stats = Maps.newLinkedHashMap();
+
+    protected SparkJobStats(int jobId, PigStats.JobGraph plan) {
+        this(String.valueOf(jobId), plan);
+        this.jobId = jobId;
+    }
+
+    protected SparkJobStats(String jobId, PigStats.JobGraph plan) {
+        super(jobId, plan);
+    }
+
+    public void addOutputInfo(POStore poStore, boolean success,
+                              JobMetricsListener jobMetricsListener,
+                              Configuration conf) {
+        // TODO: Compute #records
+        long bytes = getOutputSize(poStore, conf);
+        OutputStats outputStats = new 
OutputStats(poStore.getSFile().getFileName(),
+                bytes, 1, success);
+        outputStats.setPOStore(poStore);
+        outputStats.setConf(conf);
+        if (!poStore.isTmpStore()) {
+            outputs.add(outputStats);
+        }
+    }
+
+    public void collectStats(JobMetricsListener jobMetricsListener) {
+        if (jobMetricsListener != null) {
+            Map<String, List<TaskMetrics>> taskMetrics = 
jobMetricsListener.getJobMetric(jobId);
+            if (taskMetrics == null) {
+                throw new RuntimeException("No task metrics available for 
jobId " + jobId);
+            }
+            stats = combineTaskMetrics(taskMetrics);
+        }
+    }
 
-  protected SparkJobStats(int jobId, PigStats.JobGraph plan) {
-      this(String.valueOf(jobId), plan);
-      this.jobId = jobId;
-  }
-
-  protected SparkJobStats(String jobId, PigStats.JobGraph plan) {
-      super(jobId, plan);
-  }
-
-  public void addOutputInfo(POStore poStore,  boolean success,
-                            JobMetricsListener jobMetricsListener,
-                            Configuration conf) {
-      // TODO: Compute #records
-      long bytes = getOutputSize(poStore, conf);
-      OutputStats outputStats = new 
OutputStats(poStore.getSFile().getFileName(),
-          bytes, 1, success);
-      outputStats.setPOStore(poStore);
-      outputStats.setConf(conf);
-      if( !poStore.isTmpStore()) {
-          outputs.add(outputStats);
-      }
-  }
-
-       public void collectStats(JobMetricsListener jobMetricsListener) {
-               if (jobMetricsListener != null) {
-                       Map<String, List<TaskMetrics>> taskMetrics = 
jobMetricsListener.getJobMetric(jobId);
-                       if (taskMetrics == null) {
-                               throw new RuntimeException("No task metrics 
available for jobId " + jobId);
-                       }
-                       stats = combineTaskMetrics(taskMetrics);
-               }
-       }
-
-  public Map<String, Long> getStats() {
-      return stats;
-  }
-
-  private Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> 
jobMetric) {
-      Map<String, Long> results = Maps.newLinkedHashMap();
-
-      long executorDeserializeTime = 0;
-      long executorRunTime = 0;
-      long resultSize = 0;
-      long jvmGCTime = 0;
-      long resultSerializationTime = 0;
-      long memoryBytesSpilled = 0;
-      long diskBytesSpilled = 0;
-      long bytesRead = 0;
-      long bytesWritten = 0;
-      long remoteBlocksFetched = 0;
-      long localBlocksFetched = 0;
-      long fetchWaitTime = 0;
-      long remoteBytesRead = 0;
-      long shuffleBytesWritten = 0;
-      long shuffleWriteTime = 0;
-      boolean inputMetricExist = false;
-      boolean outputMetricExist = false;
-      boolean shuffleReadMetricExist = false;
-      boolean shuffleWriteMetricExist = false;
-
-      for (List<TaskMetrics> stageMetric : jobMetric.values()) {
-        if (stageMetric != null) {
-          for (TaskMetrics taskMetrics : stageMetric) {
-            if (taskMetrics != null) {
-              executorDeserializeTime += taskMetrics.executorDeserializeTime();
-              executorRunTime += taskMetrics.executorRunTime();
-              resultSize += taskMetrics.resultSize();
-              jvmGCTime += taskMetrics.jvmGCTime();
-              resultSerializationTime += taskMetrics.resultSerializationTime();
-              memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
-              diskBytesSpilled += taskMetrics.diskBytesSpilled();
-              if (!taskMetrics.inputMetrics().isEmpty()) {
-                inputMetricExist = true;
-                bytesRead += taskMetrics.inputMetrics().get().bytesRead();
-              }
-
-              if (!taskMetrics.outputMetrics().isEmpty()) {
-                outputMetricExist = true;
-                bytesWritten += 
taskMetrics.outputMetrics().get().bytesWritten();
-              }
-
-              Option<ShuffleReadMetrics> shuffleReadMetricsOption = 
taskMetrics.shuffleReadMetrics();
-              if (!shuffleReadMetricsOption.isEmpty()) {
-                shuffleReadMetricExist = true;
-                remoteBlocksFetched += 
shuffleReadMetricsOption.get().remoteBlocksFetched();
-                localBlocksFetched += 
shuffleReadMetricsOption.get().localBlocksFetched();
-                fetchWaitTime += 
shuffleReadMetricsOption.get().fetchWaitTime();
-                remoteBytesRead += 
shuffleReadMetricsOption.get().remoteBytesRead();
-              }
-
-              Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = 
taskMetrics.shuffleWriteMetrics();
-              if (!shuffleWriteMetricsOption.isEmpty()) {
-                shuffleWriteMetricExist = true;
-                shuffleBytesWritten += 
shuffleWriteMetricsOption.get().shuffleBytesWritten();
-                shuffleWriteTime += 
shuffleWriteMetricsOption.get().shuffleWriteTime();
-              }
+    public Map<String, Long> getStats() {
+        return stats;
+    }
+
+    private Map<String, Long> combineTaskMetrics(Map<String, 
List<TaskMetrics>> jobMetric) {
+        Map<String, Long> results = Maps.newLinkedHashMap();
+
+        long executorDeserializeTime = 0;
+        long executorRunTime = 0;
+        long resultSize = 0;
+        long jvmGCTime = 0;
+        long resultSerializationTime = 0;
+        long memoryBytesSpilled = 0;
+        long diskBytesSpilled = 0;
+        long bytesRead = 0;
+        long bytesWritten = 0;
+        long remoteBlocksFetched = 0;
+        long localBlocksFetched = 0;
+        long fetchWaitTime = 0;
+        long remoteBytesRead = 0;
+        long shuffleBytesWritten = 0;
+        long shuffleWriteTime = 0;
+        boolean inputMetricExist = false;
+        boolean outputMetricExist = false;
+        boolean shuffleReadMetricExist = false;
+        boolean shuffleWriteMetricExist = false;
+
+        for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+            if (stageMetric != null) {
+                for (TaskMetrics taskMetrics : stageMetric) {
+                    if (taskMetrics != null) {
+                        executorDeserializeTime += 
taskMetrics.executorDeserializeTime();
+                        executorRunTime += taskMetrics.executorRunTime();
+                        resultSize += taskMetrics.resultSize();
+                        jvmGCTime += taskMetrics.jvmGCTime();
+                        resultSerializationTime += 
taskMetrics.resultSerializationTime();
+                        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+                        diskBytesSpilled += taskMetrics.diskBytesSpilled();
+                        if (!taskMetrics.inputMetrics().isEmpty()) {
+                            inputMetricExist = true;
+                            bytesRead += 
taskMetrics.inputMetrics().get().bytesRead();
+                        }
+
+                        if (!taskMetrics.outputMetrics().isEmpty()) {
+                            outputMetricExist = true;
+                            bytesWritten += 
taskMetrics.outputMetrics().get().bytesWritten();
+                        }
+
+                        Option<ShuffleReadMetrics> shuffleReadMetricsOption = 
taskMetrics.shuffleReadMetrics();
+                        if (!shuffleReadMetricsOption.isEmpty()) {
+                            shuffleReadMetricExist = true;
+                            remoteBlocksFetched += 
shuffleReadMetricsOption.get().remoteBlocksFetched();
+                            localBlocksFetched += 
shuffleReadMetricsOption.get().localBlocksFetched();
+                            fetchWaitTime += 
shuffleReadMetricsOption.get().fetchWaitTime();
+                            remoteBytesRead += 
shuffleReadMetricsOption.get().remoteBytesRead();
+                        }
+
+                        Option<ShuffleWriteMetrics> shuffleWriteMetricsOption 
= taskMetrics.shuffleWriteMetrics();
+                        if (!shuffleWriteMetricsOption.isEmpty()) {
+                            shuffleWriteMetricExist = true;
+                            shuffleBytesWritten += 
shuffleWriteMetricsOption.get().shuffleBytesWritten();
+                            shuffleWriteTime += 
shuffleWriteMetricsOption.get().shuffleWriteTime();
+                        }
 
+                    }
+                }
             }
-          }
         }
-      }
 
-      results.put("EexcutorDeserializeTime", executorDeserializeTime);
-      results.put("ExecutorRunTime", executorRunTime);
-      results.put("ResultSize", resultSize);
-      results.put("JvmGCTime", jvmGCTime);
-      results.put("ResultSerializationTime", resultSerializationTime);
-      results.put("MemoryBytesSpilled", memoryBytesSpilled);
-      results.put("DiskBytesSpilled", diskBytesSpilled);
-      if (inputMetricExist) {
-        results.put("BytesRead", bytesRead);
-      }
-
-      if (outputMetricExist) {
-        results.put("BytesWritten", bytesWritten);
-      }
-
-      if (shuffleReadMetricExist) {
-        results.put("RemoteBlocksFetched", remoteBlocksFetched);
-        results.put("LocalBlocksFetched", localBlocksFetched);
-        results.put("TotalBlocksFetched", localBlocksFetched + 
remoteBlocksFetched);
-        results.put("FetchWaitTime", fetchWaitTime);
-        results.put("RemoteBytesRead", remoteBytesRead);
-      }
-
-      if (shuffleWriteMetricExist) {
-        results.put("ShuffleBytesWritten", shuffleBytesWritten);
-        results.put("ShuffleWriteTime", shuffleWriteTime);
-      }
-
-      return results;
-  }
-
-  @Override
-  public String getJobId() {
-      return String.valueOf(jobId);
-  }
-
-  @Override
-  public void accept(PlanVisitor v) throws FrontendException {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String getDisplayString() {
-      return null;
-  }
-
-  @Override
-  public int getNumberMaps() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getNumberReduces() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getMaxMapTime() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getMinMapTime() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getAvgMapTime() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getMaxReduceTime() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getMinReduceTime() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getAvgREduceTime() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getMapInputRecords() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getMapOutputRecords() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getReduceInputRecords() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getReduceOutputRecords() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getSMMSpillCount() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getProactiveSpillCountObjects() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getProactiveSpillCountRecs() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Counters getHadoopCounters() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Map<String, Long> getMultiStoreCounters() {
-      throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Map<String, Long> getMultiInputCounters() {
-      throw new UnsupportedOperationException();
-  }
+        results.put("EexcutorDeserializeTime", executorDeserializeTime);
+        results.put("ExecutorRunTime", executorRunTime);
+        results.put("ResultSize", resultSize);
+        results.put("JvmGCTime", jvmGCTime);
+        results.put("ResultSerializationTime", resultSerializationTime);
+        results.put("MemoryBytesSpilled", memoryBytesSpilled);
+        results.put("DiskBytesSpilled", diskBytesSpilled);
+        if (inputMetricExist) {
+            results.put("BytesRead", bytesRead);
+        }
+
+        if (outputMetricExist) {
+            results.put("BytesWritten", bytesWritten);
+        }
+
+        if (shuffleReadMetricExist) {
+            results.put("RemoteBlocksFetched", remoteBlocksFetched);
+            results.put("LocalBlocksFetched", localBlocksFetched);
+            results.put("TotalBlocksFetched", localBlocksFetched + 
remoteBlocksFetched);
+            results.put("FetchWaitTime", fetchWaitTime);
+            results.put("RemoteBytesRead", remoteBytesRead);
+        }
+
+        if (shuffleWriteMetricExist) {
+            results.put("ShuffleBytesWritten", shuffleBytesWritten);
+            results.put("ShuffleWriteTime", shuffleWriteTime);
+        }
+
+        return results;
+    }
+
+    @Override
+    public String getJobId() {
+        return String.valueOf(jobId);
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws FrontendException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getDisplayString() {
+        return null;
+    }
+
+    @Override
+    public int getNumberMaps() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getNumberReduces() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxMapTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMinMapTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getAvgMapTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxReduceTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMinReduceTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getAvgREduceTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMapInputRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMapOutputRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getReduceInputRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getReduceOutputRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getProactiveSpillCountRecs() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Counters getHadoopCounters() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Long> getMultiStoreCounters() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Long> getMultiInputCounters() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setAlias(SparkOperator sparkOperator) {
+        SparkScriptState ss = (SparkScriptState) SparkScriptState.get();
+        SparkScriptState.SparkScriptInfo sparkScriptInfo = ss.getScriptInfo();
+        annotate(ALIAS, sparkScriptInfo.getAlias(sparkOperator));
+        annotate(ALIAS_LOCATION, 
sparkScriptInfo.getAliasLocation(sparkOperator));
+        annotate(FEATURE, sparkScriptInfo.getPigFeatures(sparkOperator));
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
Wed Jul  1 13:34:06 2015
@@ -1,30 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.tools.pigstats.spark;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobClient;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.spark.api.java.JavaSparkContext;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 public class SparkPigStats extends PigStats {
 
+    private Map<SparkJobStats,SparkOperator> jobSparkOperatorMap = new 
HashMap<SparkJobStats, SparkOperator>();
     private static final Log LOG = LogFactory.getLog(SparkPigStats.class);
 
+    private SparkScriptState sparkScriptState;
+
     public SparkPigStats() {
-      jobPlan = new JobGraph();
+        jobPlan = new JobGraph();
+        this.sparkScriptState = (SparkScriptState) ScriptState.get();
+    }
+
+    public void initialize(SparkOperPlan sparkPlan){
+        super.start();
+        sparkScriptState.setScriptInfo(sparkPlan);
     }
 
-    public void addJobStats(POStore poStore, int jobId,
+    public void addJobStats(POStore poStore, SparkOperator sparkOperator, int 
jobId,
                             JobMetricsListener jobMetricsListener,
                             JavaSparkContext sparkContext,
                             Configuration conf) {
@@ -33,10 +63,11 @@ public class SparkPigStats extends PigSt
         jobStats.setSuccessful(isSuccess);
         jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
         jobStats.collectStats(jobMetricsListener);
+        jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
     }
 
-    public void addFailJobStats(POStore poStore, String jobId,
+    public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, 
String jobId,
                                 JobMetricsListener jobMetricsListener,
                                 JavaSparkContext sparkContext,
                                 Configuration conf,
@@ -46,6 +77,7 @@ public class SparkPigStats extends PigSt
         jobStats.setSuccessful(isSuccess);
         jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
         jobStats.collectStats(jobMetricsListener);
+        jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
         if (e != null) {
             jobStats.setBackendException(e);
@@ -61,6 +93,10 @@ public class SparkPigStats extends PigSt
         Iterator<JobStats> iter = jobPlan.iterator();
         while (iter.hasNext()) {
             SparkJobStats js = (SparkJobStats)iter.next();
+            if (jobSparkOperatorMap.containsKey(js)) {
+                SparkOperator sparkOperator = jobSparkOperatorMap.get(js);
+                js.setAlias(sparkOperator);
+            }
             LOG.info( "Spark Job [" + js.getJobId() + "] Metrics");
             Map<String, Long> stats = js.getStats();
             if (stats == null) {

Added: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java?rev=1688648&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
 Wed Jul  1 13:34:06 2015
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.spark;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.collect.Maps;
+
+/**
+ * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
+ * cluster. These settings are added to all Spark jobs spawned by the script 
and
+ * in turn are persisted in the hadoop job xml. With the properties already in
+ * the job xml, users who want to know the relations between the script and 
Spark
+ * jobs can derive them from the job xmls.
+ */
+public class SparkScriptState extends ScriptState {
+    public SparkScriptState(String id) {
+        super(id);
+    }
+
+    private SparkScriptInfo scriptInfo = null;
+
+    public void setScriptInfo(SparkOperPlan plan) {
+        this.scriptInfo = new SparkScriptInfo(plan);
+    }
+
+    public SparkScriptInfo getScriptInfo() {
+        return scriptInfo;
+    }
+
+    public static class SparkScriptInfo {
+
+        private static final Log LOG = 
LogFactory.getLog(SparkScriptInfo.class);
+        private SparkOperPlan sparkPlan;
+        private String alias;
+        private String aliasLocation;
+        private String features;
+
+        private Map<OperatorKey, String> featuresMap = Maps.newHashMap();
+        private Map<OperatorKey, String> aliasMap = Maps.newHashMap();
+        private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap();
+
+        public SparkScriptInfo(SparkOperPlan sparkPlan) {
+            this.sparkPlan = sparkPlan;
+            initialize();
+        }
+
+        private void initialize() {
+            try {
+                new DAGAliasVisitor(sparkPlan).visit();
+            } catch (VisitorException e) {
+                LOG.warn("Cannot calculate alias information for DAG", e);
+            }
+        }
+
+        public String getAlias(SparkOperator sparkOp) {
+            return aliasMap.get(sparkOp.getOperatorKey());
+        }
+
+        public String getAliasLocation(SparkOperator sparkOp) {
+            return aliasLocationMap.get(sparkOp.getOperatorKey());
+        }
+
+        public String getPigFeatures(SparkOperator sparkOp) {
+            return featuresMap.get(sparkOp.getOperatorKey());
+        }
+
+        class DAGAliasVisitor extends SparkOpPlanVisitor {
+
+            private Set<String> aliases;
+            private Set<String> aliasLocations;
+            private BitSet featureSet;
+
+            public DAGAliasVisitor(SparkOperPlan plan) {
+                super(plan, new DependencyOrderWalker<SparkOperator, 
SparkOperPlan>(plan));
+                this.aliases = new HashSet<String>();
+                this.aliasLocations = new HashSet<String>();
+                this.featureSet = new BitSet();
+            }
+
+            @Override
+            public void visitSparkOp(SparkOperator sparkOp) throws 
VisitorException {
+
+                ArrayList<String> aliasList = new ArrayList<String>();
+                String aliasLocationStr = "";
+                try {
+                    ArrayList<String> aliasLocationList = new 
ArrayList<String>();
+                    new AliasVisitor(sparkOp.physicalPlan, aliasList, 
aliasLocationList).visit();
+                    aliasLocationStr += LoadFunc.join(aliasLocationList, ",");
+                    if (!aliasList.isEmpty()) {
+                        Collections.sort(aliasList);
+                        aliases.addAll(aliasList);
+                        aliasLocations.addAll(aliasLocationList);
+                    }
+                } catch (VisitorException e) {
+                    LOG.warn("unable to get alias", e);
+                }
+                aliasMap.put(sparkOp.getOperatorKey(), 
LoadFunc.join(aliasList, ","));
+                aliasLocationMap.put(sparkOp.getOperatorKey(), 
aliasLocationStr);
+
+
+                BitSet feature = new BitSet();
+                feature.clear();
+                if (sparkOp.isSampler()) {
+                    feature.set(PIG_FEATURE.SAMPLER.ordinal());
+                }
+                if (sparkOp.isIndexer()) {
+                    feature.set(PIG_FEATURE.INDEXER.ordinal());
+                }
+                if (sparkOp.isCogroup()) {
+                    feature.set(PIG_FEATURE.COGROUP.ordinal());
+                }
+                if (sparkOp.isGroupBy()) {
+                    feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+                }
+                if (sparkOp.isRegularJoin()) {
+                    feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+                }
+                if (sparkOp.isUnion()) {
+                    feature.set(PIG_FEATURE.UNION.ordinal());
+                }
+                if (sparkOp.isNative()) {
+                    feature.set(PIG_FEATURE.NATIVE.ordinal());
+                }
+                if (sparkOp.isLimit() || sparkOp.isLimitAfterSort()) {
+                    feature.set(PIG_FEATURE.LIMIT.ordinal());
+                }
+                try {
+                    new FeatureVisitor(sparkOp.physicalPlan, feature).visit();
+                } catch (VisitorException e) {
+                    LOG.warn("Feature visitor failed", e);
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i = feature.nextSetBit(0); i >= 0; i = 
feature.nextSetBit(i + 1)) {
+                    if (sb.length() > 0) sb.append(",");
+                    sb.append(PIG_FEATURE.values()[i].name());
+                }
+                featuresMap.put(sparkOp.getOperatorKey(), sb.toString());
+                for (int i = 0; i < feature.length(); i++) {
+                    if (feature.get(i)) {
+                        featureSet.set(i);
+                    }
+                }
+            }
+
+            @Override
+            public void visit() throws VisitorException {
+                super.visit();
+                if (!aliases.isEmpty()) {
+                    ArrayList<String> aliasList = new 
ArrayList<String>(aliases);
+                    ArrayList<String> aliasLocationList = new 
ArrayList<String>(aliasLocations);
+                    Collections.sort(aliasList);
+                    Collections.sort(aliasLocationList);
+                    alias = LoadFunc.join(aliasList, ",");
+                    aliasLocation = LoadFunc.join(aliasLocationList, ",");
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i = featureSet.nextSetBit(0); i >= 0; i = 
featureSet.nextSetBit(i + 1)) {
+                    if (sb.length() > 0) sb.append(",");
+                    sb.append(PIG_FEATURE.values()[i].name());
+                }
+                features = sb.toString();
+            }
+        }
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
Wed Jul  1 13:34:06 2015
@@ -21,6 +21,7 @@ package org.apache.pig.tools.pigstats.sp
 import org.apache.hadoop.mapred.JobConf;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.spark.JobExecutionStatus;
 import org.apache.spark.SparkJobInfo;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -28,7 +29,7 @@ import org.apache.spark.api.java.JavaSpa
 public class SparkStatsUtil {
 
   public static void waitForJobAddStats(int jobID,
-                                        POStore poStore,
+                                        POStore poStore, SparkOperator 
sparkOperator,
                                         JobMetricsListener jobMetricsListener,
                                         JavaSparkContext sparkContext,
                                         SparkPigStats sparkPigStats,
@@ -42,18 +43,18 @@ public class SparkStatsUtil {
       // this driver thread calling SparkStatusTracker.
       // To workaround this, we will wait for this job to "finish".
       jobMetricsListener.waitForJobToEnd(jobID);
-      sparkPigStats.addJobStats(poStore, jobID, jobMetricsListener,
+      sparkPigStats.addJobStats(poStore, sparkOperator, jobID, 
jobMetricsListener,
               sparkContext, jobConf);
       jobMetricsListener.cleanup(jobID);
   }
 
     public static void addFailJobStats(String jobID,
-                                       POStore poStore,
+                                       POStore poStore, SparkOperator 
sparkOperator,
                                        SparkPigStats sparkPigStats,
                                        JobConf jobConf, Exception e) {
         JobMetricsListener jobMetricsListener = null;
         JavaSparkContext sparkContext = null;
-        sparkPigStats.addFailJobStats(poStore, jobID, jobMetricsListener,
+        sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, 
jobMetricsListener,
                 sparkContext, jobConf, e);
     }
 

Modified: 
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
--- 
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
 (original)
+++ 
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
 Wed Jul  1 13:34:06 2015
@@ -62,6 +62,8 @@ public class TestLocationInPhysicalPlan
         JobStats jStats = 
(JobStats)job.getStatistics().getJobGraph().getSinks().get(0);
         if (Util.getLocalTestMode().toString().equals("TEZ_LOCAL")) {
             Assert.assertEquals("A[1,4],A[3,4],B[2,4]", 
jStats.getAliasLocation());
+        } else if (Util.getLocalTestMode().toString().equals("SPARK_LOCAL")) {
+            Assert.assertEquals("A[1,4],B[2,4],A[3,4]", 
jStats.getAliasLocation());
         } else {
             Assert.assertEquals("M: A[1,4],A[3,4],B[2,4] C: A[3,4],B[2,4] R: 
A[3,4]", jStats.getAliasLocation());
         }


Reply via email to