Author: xuefu
Date: Wed Jul 1 13:34:06 2015
New Revision: 1688648
URL: http://svn.apache.org/r1688648
Log:
PIG-4614: Enable TestLocationInPhysicalPlan in spark mode (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
Wed Jul 1 13:34:06 2015
@@ -23,8 +23,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
-import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkScriptState;
public class SparkExecutionEngine extends HExecutionEngine {
@@ -35,7 +35,7 @@ public class SparkExecutionEngine extend
@Override
public ScriptState instantiateScriptState() {
- MRScriptState ss = new MRScriptState(UUID.randomUUID().toString());
+ SparkScriptState ss = new
SparkScriptState(UUID.randomUUID().toString());
ss.setPigContext(pigContext);
return ss;
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Jul 1 13:34:06 2015
@@ -128,6 +128,7 @@ public class SparkLauncher extends Launc
explain(sparkplan, System.out, "text", true);
SparkPigStats sparkStats = (SparkPigStats) pigContext
.getExecutionEngine().instantiatePigStats();
+ sparkStats.initialize(sparkplan);
PigStats.start(sparkStats);
startSparkIfNeeded(pigContext);
@@ -523,12 +524,12 @@ public class SparkLauncher extends Launc
POStore poStore = poStores.get(0);
if (!isFail) {
for (int jobID : getJobIDs(seenJobIDs)) {
- SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+ SparkStatsUtil.waitForJobAddStats(jobID, poStore,
sparkOperator,
jobMetricsListener, sparkContext, sparkStats,
conf);
}
} else {
String failJobID = sparkOperator.name().concat("_fail");
- SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkStats,
+ SparkStatsUtil.addFailJobStats(failJobID, poStore,
sparkOperator, sparkStats,
conf, exception);
}
} else {
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Wed Jul 1 13:34:06 2015
@@ -376,6 +376,7 @@ public class SparkCompiler extends PhyPl
try {
SparkOperator nativesparkOpper = getNativeSparkOp(
op.getNativeMRjar(), op.getParams());
+ nativesparkOpper.markNative();
sparkPlan.add(nativesparkOpper);
sparkPlan.connect(curSparkOp, nativesparkOpper);
phyToSparkOpMap.put(op, nativesparkOpper);
@@ -459,6 +460,7 @@ public class SparkCompiler extends PhyPl
POLimit pLimit2 = new POLimit(new
OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit2.setLimit(limit);
curSparkOp.physicalPlan.addAsLeaf(pLimit2);
+ curSparkOp.markLimitAfterSort();
}
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
@@ -473,6 +475,7 @@ public class SparkCompiler extends PhyPl
public void visitLimit(POLimit op) throws VisitorException {
try {
addToPlan(op);
+ curSparkOp.markLimit();
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator "
@@ -640,6 +643,7 @@ public class SparkCompiler extends PhyPl
try {
addToPlan(op);
phyToSparkOpMap.put(op, curSparkOp);
+ curSparkOp.markUnion();
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator "
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Wed Jul 1 13:34:06 2015
@@ -45,7 +45,15 @@ public class SparkOperator extends Opera
// Indicate if this job is a cogroup job
COGROUP,
// Indicate if this job is a regular join job
- HASHJOIN;
+ HASHJOIN,
+ // Indicate if this job is a union job
+ UNION,
+ // Indicate if this job is a native job
+ NATIVE,
+ // Indicate if this job is a limit job
+ LIMIT,
+ // Indicate if this job is a limit job after sort
+ LIMIT_AFTER_SORT;
};
public PhysicalPlan physicalPlan;
@@ -206,4 +214,35 @@ public class SparkOperator extends Opera
public void markIndexer() {
feature.set(OPER_FEATURE.INDEXER.ordinal());
}
+ public boolean isUnion() {
+ return feature.get(OPER_FEATURE.UNION.ordinal());
+ }
+
+ public void markUnion() {
+ feature.set(OPER_FEATURE.UNION.ordinal());
+ }
+
+ public boolean isNative() {
+ return feature.get(OPER_FEATURE.NATIVE.ordinal());
+ }
+
+ public void markNative() {
+ feature.set(OPER_FEATURE.NATIVE.ordinal());
+ }
+
+ public boolean isLimit() {
+ return feature.get(OPER_FEATURE.LIMIT.ordinal());
+ }
+
+ public void markLimit() {
+ feature.set(OPER_FEATURE.LIMIT.ordinal());
+ }
+
+ public boolean isLimitAfterSort() {
+ return feature.get(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
+ }
+
+ public void markLimitAfterSort() {
+ feature.set(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
Wed Jul 1 13:34:06 2015
@@ -20,6 +20,7 @@ package org.apache.pig.tools.pigstats.sp
import java.util.List;
import java.util.Map;
+
import scala.Option;
import com.google.common.collect.Maps;
@@ -28,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapred.Counters;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.tools.pigstats.JobStats;
@@ -40,244 +42,252 @@ import org.apache.spark.executor.TaskMet
public class SparkJobStats extends JobStats {
- private int jobId;
- private Map<String, Long> stats = Maps.newLinkedHashMap();
+ private int jobId;
+ private Map<String, Long> stats = Maps.newLinkedHashMap();
+
+ protected SparkJobStats(int jobId, PigStats.JobGraph plan) {
+ this(String.valueOf(jobId), plan);
+ this.jobId = jobId;
+ }
+
+ protected SparkJobStats(String jobId, PigStats.JobGraph plan) {
+ super(jobId, plan);
+ }
+
+ public void addOutputInfo(POStore poStore, boolean success,
+ JobMetricsListener jobMetricsListener,
+ Configuration conf) {
+ // TODO: Compute #records
+ long bytes = getOutputSize(poStore, conf);
+ OutputStats outputStats = new
OutputStats(poStore.getSFile().getFileName(),
+ bytes, 1, success);
+ outputStats.setPOStore(poStore);
+ outputStats.setConf(conf);
+ if (!poStore.isTmpStore()) {
+ outputs.add(outputStats);
+ }
+ }
+
+ public void collectStats(JobMetricsListener jobMetricsListener) {
+ if (jobMetricsListener != null) {
+ Map<String, List<TaskMetrics>> taskMetrics =
jobMetricsListener.getJobMetric(jobId);
+ if (taskMetrics == null) {
+ throw new RuntimeException("No task metrics available for
jobId " + jobId);
+ }
+ stats = combineTaskMetrics(taskMetrics);
+ }
+ }
- protected SparkJobStats(int jobId, PigStats.JobGraph plan) {
- this(String.valueOf(jobId), plan);
- this.jobId = jobId;
- }
-
- protected SparkJobStats(String jobId, PigStats.JobGraph plan) {
- super(jobId, plan);
- }
-
- public void addOutputInfo(POStore poStore, boolean success,
- JobMetricsListener jobMetricsListener,
- Configuration conf) {
- // TODO: Compute #records
- long bytes = getOutputSize(poStore, conf);
- OutputStats outputStats = new
OutputStats(poStore.getSFile().getFileName(),
- bytes, 1, success);
- outputStats.setPOStore(poStore);
- outputStats.setConf(conf);
- if( !poStore.isTmpStore()) {
- outputs.add(outputStats);
- }
- }
-
- public void collectStats(JobMetricsListener jobMetricsListener) {
- if (jobMetricsListener != null) {
- Map<String, List<TaskMetrics>> taskMetrics =
jobMetricsListener.getJobMetric(jobId);
- if (taskMetrics == null) {
- throw new RuntimeException("No task metrics
available for jobId " + jobId);
- }
- stats = combineTaskMetrics(taskMetrics);
- }
- }
-
- public Map<String, Long> getStats() {
- return stats;
- }
-
- private Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>>
jobMetric) {
- Map<String, Long> results = Maps.newLinkedHashMap();
-
- long executorDeserializeTime = 0;
- long executorRunTime = 0;
- long resultSize = 0;
- long jvmGCTime = 0;
- long resultSerializationTime = 0;
- long memoryBytesSpilled = 0;
- long diskBytesSpilled = 0;
- long bytesRead = 0;
- long bytesWritten = 0;
- long remoteBlocksFetched = 0;
- long localBlocksFetched = 0;
- long fetchWaitTime = 0;
- long remoteBytesRead = 0;
- long shuffleBytesWritten = 0;
- long shuffleWriteTime = 0;
- boolean inputMetricExist = false;
- boolean outputMetricExist = false;
- boolean shuffleReadMetricExist = false;
- boolean shuffleWriteMetricExist = false;
-
- for (List<TaskMetrics> stageMetric : jobMetric.values()) {
- if (stageMetric != null) {
- for (TaskMetrics taskMetrics : stageMetric) {
- if (taskMetrics != null) {
- executorDeserializeTime += taskMetrics.executorDeserializeTime();
- executorRunTime += taskMetrics.executorRunTime();
- resultSize += taskMetrics.resultSize();
- jvmGCTime += taskMetrics.jvmGCTime();
- resultSerializationTime += taskMetrics.resultSerializationTime();
- memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
- diskBytesSpilled += taskMetrics.diskBytesSpilled();
- if (!taskMetrics.inputMetrics().isEmpty()) {
- inputMetricExist = true;
- bytesRead += taskMetrics.inputMetrics().get().bytesRead();
- }
-
- if (!taskMetrics.outputMetrics().isEmpty()) {
- outputMetricExist = true;
- bytesWritten +=
taskMetrics.outputMetrics().get().bytesWritten();
- }
-
- Option<ShuffleReadMetrics> shuffleReadMetricsOption =
taskMetrics.shuffleReadMetrics();
- if (!shuffleReadMetricsOption.isEmpty()) {
- shuffleReadMetricExist = true;
- remoteBlocksFetched +=
shuffleReadMetricsOption.get().remoteBlocksFetched();
- localBlocksFetched +=
shuffleReadMetricsOption.get().localBlocksFetched();
- fetchWaitTime +=
shuffleReadMetricsOption.get().fetchWaitTime();
- remoteBytesRead +=
shuffleReadMetricsOption.get().remoteBytesRead();
- }
-
- Option<ShuffleWriteMetrics> shuffleWriteMetricsOption =
taskMetrics.shuffleWriteMetrics();
- if (!shuffleWriteMetricsOption.isEmpty()) {
- shuffleWriteMetricExist = true;
- shuffleBytesWritten +=
shuffleWriteMetricsOption.get().shuffleBytesWritten();
- shuffleWriteTime +=
shuffleWriteMetricsOption.get().shuffleWriteTime();
- }
+ public Map<String, Long> getStats() {
+ return stats;
+ }
+
+ private Map<String, Long> combineTaskMetrics(Map<String,
List<TaskMetrics>> jobMetric) {
+ Map<String, Long> results = Maps.newLinkedHashMap();
+
+ long executorDeserializeTime = 0;
+ long executorRunTime = 0;
+ long resultSize = 0;
+ long jvmGCTime = 0;
+ long resultSerializationTime = 0;
+ long memoryBytesSpilled = 0;
+ long diskBytesSpilled = 0;
+ long bytesRead = 0;
+ long bytesWritten = 0;
+ long remoteBlocksFetched = 0;
+ long localBlocksFetched = 0;
+ long fetchWaitTime = 0;
+ long remoteBytesRead = 0;
+ long shuffleBytesWritten = 0;
+ long shuffleWriteTime = 0;
+ boolean inputMetricExist = false;
+ boolean outputMetricExist = false;
+ boolean shuffleReadMetricExist = false;
+ boolean shuffleWriteMetricExist = false;
+
+ for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+ if (stageMetric != null) {
+ for (TaskMetrics taskMetrics : stageMetric) {
+ if (taskMetrics != null) {
+ executorDeserializeTime +=
taskMetrics.executorDeserializeTime();
+ executorRunTime += taskMetrics.executorRunTime();
+ resultSize += taskMetrics.resultSize();
+ jvmGCTime += taskMetrics.jvmGCTime();
+ resultSerializationTime +=
taskMetrics.resultSerializationTime();
+ memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+ diskBytesSpilled += taskMetrics.diskBytesSpilled();
+ if (!taskMetrics.inputMetrics().isEmpty()) {
+ inputMetricExist = true;
+ bytesRead +=
taskMetrics.inputMetrics().get().bytesRead();
+ }
+
+ if (!taskMetrics.outputMetrics().isEmpty()) {
+ outputMetricExist = true;
+ bytesWritten +=
taskMetrics.outputMetrics().get().bytesWritten();
+ }
+
+ Option<ShuffleReadMetrics> shuffleReadMetricsOption =
taskMetrics.shuffleReadMetrics();
+ if (!shuffleReadMetricsOption.isEmpty()) {
+ shuffleReadMetricExist = true;
+ remoteBlocksFetched +=
shuffleReadMetricsOption.get().remoteBlocksFetched();
+ localBlocksFetched +=
shuffleReadMetricsOption.get().localBlocksFetched();
+ fetchWaitTime +=
shuffleReadMetricsOption.get().fetchWaitTime();
+ remoteBytesRead +=
shuffleReadMetricsOption.get().remoteBytesRead();
+ }
+
+ Option<ShuffleWriteMetrics> shuffleWriteMetricsOption
= taskMetrics.shuffleWriteMetrics();
+ if (!shuffleWriteMetricsOption.isEmpty()) {
+ shuffleWriteMetricExist = true;
+ shuffleBytesWritten +=
shuffleWriteMetricsOption.get().shuffleBytesWritten();
+ shuffleWriteTime +=
shuffleWriteMetricsOption.get().shuffleWriteTime();
+ }
+ }
+ }
}
- }
}
- }
- results.put("EexcutorDeserializeTime", executorDeserializeTime);
- results.put("ExecutorRunTime", executorRunTime);
- results.put("ResultSize", resultSize);
- results.put("JvmGCTime", jvmGCTime);
- results.put("ResultSerializationTime", resultSerializationTime);
- results.put("MemoryBytesSpilled", memoryBytesSpilled);
- results.put("DiskBytesSpilled", diskBytesSpilled);
- if (inputMetricExist) {
- results.put("BytesRead", bytesRead);
- }
-
- if (outputMetricExist) {
- results.put("BytesWritten", bytesWritten);
- }
-
- if (shuffleReadMetricExist) {
- results.put("RemoteBlocksFetched", remoteBlocksFetched);
- results.put("LocalBlocksFetched", localBlocksFetched);
- results.put("TotalBlocksFetched", localBlocksFetched +
remoteBlocksFetched);
- results.put("FetchWaitTime", fetchWaitTime);
- results.put("RemoteBytesRead", remoteBytesRead);
- }
-
- if (shuffleWriteMetricExist) {
- results.put("ShuffleBytesWritten", shuffleBytesWritten);
- results.put("ShuffleWriteTime", shuffleWriteTime);
- }
-
- return results;
- }
-
- @Override
- public String getJobId() {
- return String.valueOf(jobId);
- }
-
- @Override
- public void accept(PlanVisitor v) throws FrontendException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getDisplayString() {
- return null;
- }
-
- @Override
- public int getNumberMaps() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getNumberReduces() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getMaxMapTime() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getMinMapTime() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getAvgMapTime() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getMaxReduceTime() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getMinReduceTime() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getAvgREduceTime() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getMapInputRecords() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getMapOutputRecords() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getReduceInputRecords() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getReduceOutputRecords() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getSMMSpillCount() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getProactiveSpillCountObjects() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getProactiveSpillCountRecs() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Counters getHadoopCounters() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, Long> getMultiStoreCounters() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, Long> getMultiInputCounters() {
- throw new UnsupportedOperationException();
- }
+ results.put("EexcutorDeserializeTime", executorDeserializeTime);
+ results.put("ExecutorRunTime", executorRunTime);
+ results.put("ResultSize", resultSize);
+ results.put("JvmGCTime", jvmGCTime);
+ results.put("ResultSerializationTime", resultSerializationTime);
+ results.put("MemoryBytesSpilled", memoryBytesSpilled);
+ results.put("DiskBytesSpilled", diskBytesSpilled);
+ if (inputMetricExist) {
+ results.put("BytesRead", bytesRead);
+ }
+
+ if (outputMetricExist) {
+ results.put("BytesWritten", bytesWritten);
+ }
+
+ if (shuffleReadMetricExist) {
+ results.put("RemoteBlocksFetched", remoteBlocksFetched);
+ results.put("LocalBlocksFetched", localBlocksFetched);
+ results.put("TotalBlocksFetched", localBlocksFetched +
remoteBlocksFetched);
+ results.put("FetchWaitTime", fetchWaitTime);
+ results.put("RemoteBytesRead", remoteBytesRead);
+ }
+
+ if (shuffleWriteMetricExist) {
+ results.put("ShuffleBytesWritten", shuffleBytesWritten);
+ results.put("ShuffleWriteTime", shuffleWriteTime);
+ }
+
+ return results;
+ }
+
+ @Override
+ public String getJobId() {
+ return String.valueOf(jobId);
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws FrontendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getDisplayString() {
+ return null;
+ }
+
+ @Override
+ public int getNumberMaps() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumberReduces() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxMapTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMinMapTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getAvgMapTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxReduceTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMinReduceTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getAvgREduceTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMapInputRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMapOutputRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getReduceInputRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getReduceOutputRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getSMMSpillCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getProactiveSpillCountObjects() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getProactiveSpillCountRecs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Counters getHadoopCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, Long> getMultiStoreCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, Long> getMultiInputCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setAlias(SparkOperator sparkOperator) {
+ SparkScriptState ss = (SparkScriptState) SparkScriptState.get();
+ SparkScriptState.SparkScriptInfo sparkScriptInfo = ss.getScriptInfo();
+ annotate(ALIAS, sparkScriptInfo.getAlias(sparkOperator));
+ annotate(ALIAS_LOCATION,
sparkScriptInfo.getAliasLocation(sparkOperator));
+ annotate(FEATURE, sparkScriptInfo.getPigFeatures(sparkOperator));
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
Wed Jul 1 13:34:06 2015
@@ -1,30 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig.tools.pigstats.spark;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.spark.api.java.JavaSparkContext;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
public class SparkPigStats extends PigStats {
+ private Map<SparkJobStats,SparkOperator> jobSparkOperatorMap = new
HashMap<SparkJobStats, SparkOperator>();
private static final Log LOG = LogFactory.getLog(SparkPigStats.class);
+ private SparkScriptState sparkScriptState;
+
public SparkPigStats() {
- jobPlan = new JobGraph();
+ jobPlan = new JobGraph();
+ this.sparkScriptState = (SparkScriptState) ScriptState.get();
+ }
+
+ public void initialize(SparkOperPlan sparkPlan){
+ super.start();
+ sparkScriptState.setScriptInfo(sparkPlan);
}
- public void addJobStats(POStore poStore, int jobId,
+ public void addJobStats(POStore poStore, SparkOperator sparkOperator, int
jobId,
JobMetricsListener jobMetricsListener,
JavaSparkContext sparkContext,
Configuration conf) {
@@ -33,10 +63,11 @@ public class SparkPigStats extends PigSt
jobStats.setSuccessful(isSuccess);
jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
jobStats.collectStats(jobMetricsListener);
+ jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
}
- public void addFailJobStats(POStore poStore, String jobId,
+ public void addFailJobStats(POStore poStore, SparkOperator sparkOperator,
String jobId,
JobMetricsListener jobMetricsListener,
JavaSparkContext sparkContext,
Configuration conf,
@@ -46,6 +77,7 @@ public class SparkPigStats extends PigSt
jobStats.setSuccessful(isSuccess);
jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
jobStats.collectStats(jobMetricsListener);
+ jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
if (e != null) {
jobStats.setBackendException(e);
@@ -61,6 +93,10 @@ public class SparkPigStats extends PigSt
Iterator<JobStats> iter = jobPlan.iterator();
while (iter.hasNext()) {
SparkJobStats js = (SparkJobStats)iter.next();
+ if (jobSparkOperatorMap.containsKey(js)) {
+ SparkOperator sparkOperator = jobSparkOperatorMap.get(js);
+ js.setAlias(sparkOperator);
+ }
LOG.info( "Spark Job [" + js.getJobId() + "] Metrics");
Map<String, Long> stats = js.getStats();
if (stats == null) {
Added:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java?rev=1688648&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
(added)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
Wed Jul 1 13:34:06 2015
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.spark;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.collect.Maps;
+
+/**
+ * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
+ * cluster. These settings are added to all Spark jobs spawned by the script
and
+ * in turn are persisted in the hadoop job xml. With the properties already in
+ * the job xml, users who want to know the relations between the script and
Spark
+ * jobs can derive them from the job xmls.
+ */
+public class SparkScriptState extends ScriptState {
+ public SparkScriptState(String id) {
+ super(id);
+ }
+
+ private SparkScriptInfo scriptInfo = null;
+
+ public void setScriptInfo(SparkOperPlan plan) {
+ this.scriptInfo = new SparkScriptInfo(plan);
+ }
+
+ public SparkScriptInfo getScriptInfo() {
+ return scriptInfo;
+ }
+
+ public static class SparkScriptInfo {
+
+ private static final Log LOG =
LogFactory.getLog(SparkScriptInfo.class);
+ private SparkOperPlan sparkPlan;
+ private String alias;
+ private String aliasLocation;
+ private String features;
+
+ private Map<OperatorKey, String> featuresMap = Maps.newHashMap();
+ private Map<OperatorKey, String> aliasMap = Maps.newHashMap();
+ private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap();
+
+ public SparkScriptInfo(SparkOperPlan sparkPlan) {
+ this.sparkPlan = sparkPlan;
+ initialize();
+ }
+
+ private void initialize() {
+ try {
+ new DAGAliasVisitor(sparkPlan).visit();
+ } catch (VisitorException e) {
+ LOG.warn("Cannot calculate alias information for DAG", e);
+ }
+ }
+
+ public String getAlias(SparkOperator sparkOp) {
+ return aliasMap.get(sparkOp.getOperatorKey());
+ }
+
+ public String getAliasLocation(SparkOperator sparkOp) {
+ return aliasLocationMap.get(sparkOp.getOperatorKey());
+ }
+
+ public String getPigFeatures(SparkOperator sparkOp) {
+ return featuresMap.get(sparkOp.getOperatorKey());
+ }
+
+ class DAGAliasVisitor extends SparkOpPlanVisitor {
+
+ private Set<String> aliases;
+ private Set<String> aliasLocations;
+ private BitSet featureSet;
+
+ public DAGAliasVisitor(SparkOperPlan plan) {
+ super(plan, new DependencyOrderWalker<SparkOperator,
SparkOperPlan>(plan));
+ this.aliases = new HashSet<String>();
+ this.aliasLocations = new HashSet<String>();
+ this.featureSet = new BitSet();
+ }
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOp) throws
VisitorException {
+
+ ArrayList<String> aliasList = new ArrayList<String>();
+ String aliasLocationStr = "";
+ try {
+ ArrayList<String> aliasLocationList = new
ArrayList<String>();
+ new AliasVisitor(sparkOp.physicalPlan, aliasList,
aliasLocationList).visit();
+ aliasLocationStr += LoadFunc.join(aliasLocationList, ",");
+ if (!aliasList.isEmpty()) {
+ Collections.sort(aliasList);
+ aliases.addAll(aliasList);
+ aliasLocations.addAll(aliasLocationList);
+ }
+ } catch (VisitorException e) {
+ LOG.warn("unable to get alias", e);
+ }
+ aliasMap.put(sparkOp.getOperatorKey(),
LoadFunc.join(aliasList, ","));
+ aliasLocationMap.put(sparkOp.getOperatorKey(),
aliasLocationStr);
+
+
+ BitSet feature = new BitSet();
+ feature.clear();
+ if (sparkOp.isSampler()) {
+ feature.set(PIG_FEATURE.SAMPLER.ordinal());
+ }
+ if (sparkOp.isIndexer()) {
+ feature.set(PIG_FEATURE.INDEXER.ordinal());
+ }
+ if (sparkOp.isCogroup()) {
+ feature.set(PIG_FEATURE.COGROUP.ordinal());
+ }
+ if (sparkOp.isGroupBy()) {
+ feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+ }
+ if (sparkOp.isRegularJoin()) {
+ feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+ }
+ if (sparkOp.isUnion()) {
+ feature.set(PIG_FEATURE.UNION.ordinal());
+ }
+ if (sparkOp.isNative()) {
+ feature.set(PIG_FEATURE.NATIVE.ordinal());
+ }
+ if (sparkOp.isLimit() || sparkOp.isLimitAfterSort()) {
+ feature.set(PIG_FEATURE.LIMIT.ordinal());
+ }
+ try {
+ new FeatureVisitor(sparkOp.physicalPlan, feature).visit();
+ } catch (VisitorException e) {
+ LOG.warn("Feature visitor failed", e);
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = feature.nextSetBit(0); i >= 0; i =
feature.nextSetBit(i + 1)) {
+ if (sb.length() > 0) sb.append(",");
+ sb.append(PIG_FEATURE.values()[i].name());
+ }
+ featuresMap.put(sparkOp.getOperatorKey(), sb.toString());
+ for (int i = 0; i < feature.length(); i++) {
+ if (feature.get(i)) {
+ featureSet.set(i);
+ }
+ }
+ }
+
+ @Override
+ public void visit() throws VisitorException {
+ super.visit();
+ if (!aliases.isEmpty()) {
+ ArrayList<String> aliasList = new
ArrayList<String>(aliases);
+ ArrayList<String> aliasLocationList = new
ArrayList<String>(aliasLocations);
+ Collections.sort(aliasList);
+ Collections.sort(aliasLocationList);
+ alias = LoadFunc.join(aliasList, ",");
+ aliasLocation = LoadFunc.join(aliasLocationList, ",");
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = featureSet.nextSetBit(0); i >= 0; i =
featureSet.nextSetBit(i + 1)) {
+ if (sb.length() > 0) sb.append(",");
+ sb.append(PIG_FEATURE.values()[i].name());
+ }
+ features = sb.toString();
+ }
+ }
+ }
+}
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Wed Jul 1 13:34:06 2015
@@ -21,6 +21,7 @@ package org.apache.pig.tools.pigstats.sp
import org.apache.hadoop.mapred.JobConf;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.spark.JobExecutionStatus;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.api.java.JavaSparkContext;
@@ -28,7 +29,7 @@ import org.apache.spark.api.java.JavaSpa
public class SparkStatsUtil {
public static void waitForJobAddStats(int jobID,
- POStore poStore,
+ POStore poStore, SparkOperator
sparkOperator,
JobMetricsListener jobMetricsListener,
JavaSparkContext sparkContext,
SparkPigStats sparkPigStats,
@@ -42,18 +43,18 @@ public class SparkStatsUtil {
// this driver thread calling SparkStatusTracker.
// To workaround this, we will wait for this job to "finish".
jobMetricsListener.waitForJobToEnd(jobID);
- sparkPigStats.addJobStats(poStore, jobID, jobMetricsListener,
+ sparkPigStats.addJobStats(poStore, sparkOperator, jobID,
jobMetricsListener,
sparkContext, jobConf);
jobMetricsListener.cleanup(jobID);
}
public static void addFailJobStats(String jobID,
- POStore poStore,
+ POStore poStore, SparkOperator
sparkOperator,
SparkPigStats sparkPigStats,
JobConf jobConf, Exception e) {
JobMetricsListener jobMetricsListener = null;
JavaSparkContext sparkContext = null;
- sparkPigStats.addFailJobStats(poStore, jobID, jobMetricsListener,
+ sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID,
jobMetricsListener,
sparkContext, jobConf, e);
}
Modified:
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java?rev=1688648&r1=1688647&r2=1688648&view=diff
==============================================================================
---
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
(original)
+++
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
Wed Jul 1 13:34:06 2015
@@ -62,6 +62,8 @@ public class TestLocationInPhysicalPlan
JobStats jStats =
(JobStats)job.getStatistics().getJobGraph().getSinks().get(0);
if (Util.getLocalTestMode().toString().equals("TEZ_LOCAL")) {
Assert.assertEquals("A[1,4],A[3,4],B[2,4]",
jStats.getAliasLocation());
+ } else if (Util.getLocalTestMode().toString().equals("SPARK_LOCAL")) {
+ Assert.assertEquals("A[1,4],B[2,4],A[3,4]",
jStats.getAliasLocation());
} else {
Assert.assertEquals("M: A[1,4],A[3,4],B[2,4] C: A[3,4],B[2,4] R:
A[3,4]", jStats.getAliasLocation());
}