Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Wed Jul 19 01:32:41 2017 @@ -25,11 +25,12 @@ import java.util.List; import java.util.Map; import java.util.HashMap; -import com.google.common.base.Optional; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.data.DataBag; import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.util.Pair; @@ -54,7 +55,6 @@ import org.apache.pig.impl.plan.PlanExce import org.apache.pig.impl.util.MultiMap; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; public class SkewedJoinConverter implements @@ -103,7 +103,7 @@ public class SkewedJoinConverter impleme // with partition id StreamPartitionIndexKeyFunction streamFun = new StreamPartitionIndexKeyFunction(this, keyDist, defaultParallelism); - JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(streamFun); + JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(SparkShims.getInstance().flatMapFunction(streamFun)); // Tuple2 RDD to Pair RDD JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD = new JavaPairRDD<PartitionIndexedKey, Tuple>( @@ -146,7 +146,7 @@ public class SkewedJoinConverter impleme * @param <R> be generic because it can be Optional<Tuple> or Tuple */ private static class ToValueFunction<L, R> implements - FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable { + FlatMapFunctionAdapter<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable { private boolean[] innerFlags; private int[] schemaSize; @@ -188,7 +188,7 @@ public class SkewedJoinConverter impleme Tuple leftTuple = tf.newTuple(); if (!innerFlags[0]) { // left should be Optional<Tuple> - Optional<Tuple> leftOption = (Optional<Tuple>) left; + SparkShims.OptionalWrapper<L> leftOption = SparkShims.getInstance().wrapOptional(left); if (!leftOption.isPresent()) { // Add an empty left record for RIGHT OUTER JOIN. // Notice: if it is a skewed, only join the first reduce key @@ -200,7 +200,7 @@ public class SkewedJoinConverter impleme return this.next(); } } else { - leftTuple = leftOption.get(); + leftTuple = (Tuple) leftOption.get(); } } else { leftTuple = (Tuple) left; @@ -212,13 +212,13 @@ public class SkewedJoinConverter impleme Tuple rightTuple = tf.newTuple(); if (!innerFlags[1]) { // right should be Optional<Tuple> - Optional<Tuple> rightOption = (Optional<Tuple>) right; + SparkShims.OptionalWrapper<R> rightOption = SparkShims.getInstance().wrapOptional(right); if (!rightOption.isPresent()) { for (int i = 0; i < schemaSize[1]; i++) { rightTuple.append(null); } } else { - rightTuple = rightOption.get(); + rightTuple = (Tuple) rightOption.get(); } } else { rightTuple = (Tuple) right; @@ -234,17 +234,17 @@ public class SkewedJoinConverter impleme return result; } catch (Exception e) { log.warn(e); + return null; } - return null; } }; } } @Override - public Iterable<Tuple> call( - Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) { - return new Tuple2TransformIterable(input); + public Iterator<Tuple> call( + Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) throws Exception { + return new Tuple2TransformIterable(input).iterator(); } private boolean isFirstReduceKey(PartitionIndexedKey pKey) { @@ -413,7 +413,7 @@ public class SkewedJoinConverter impleme return tuple_KeyValue; } catch (Exception e) { - System.out.print(e); + log.warn(e); return null; } } @@ -469,7 +469,7 @@ public class SkewedJoinConverter impleme * <p> * see: https://wiki.apache.org/pig/PigSkewedJoinSpec */ - private static class StreamPartitionIndexKeyFunction implements FlatMapFunction<Tuple, Tuple2<PartitionIndexedKey, Tuple>> { + private static class StreamPartitionIndexKeyFunction implements FlatMapFunctionAdapter<Tuple, Tuple2<PartitionIndexedKey, Tuple>> { private SkewedJoinConverter poSkewedJoin; private final Broadcast<List<Tuple>> keyDist; @@ -487,7 +487,8 @@ public class SkewedJoinConverter impleme this.defaultParallelism = defaultParallelism; } - public Iterable<Tuple2<PartitionIndexedKey, Tuple>> call(Tuple tuple) throws Exception { + @Override + public Iterator<Tuple2<PartitionIndexedKey, Tuple>> call(Tuple tuple) throws Exception { if (!initialized) { Integer[] reducers = new Integer[1]; reducerMap = loadKeyDistribution(keyDist, reducers); @@ -526,12 +527,12 @@ public class SkewedJoinConverter impleme l.add(new Tuple2(pIndexKey, tuple)); } - return l; + return l.iterator(); } } /** - * user defined spark partitioner for skewed join + * User defined spark partitioner for skewed join */ private static class SkewedJoinPartitioner extends Partitioner { private int numPartitions; @@ -568,12 +569,8 @@ public class SkewedJoinConverter impleme } /** - * use parallelism from keyDist or the default parallelism to + * Use parallelism from keyDist or the default parallelism to * create user defined partitioner - * - * @param keyDist - * @param defaultParallelism - * @return */ private SkewedJoinPartitioner buildPartitioner(Broadcast<List<Tuple>> keyDist, Integer defaultParallelism) { Integer parallelism = -1; @@ -588,12 +585,7 @@ public class SkewedJoinConverter impleme } /** - * do all kinds of Join (inner, left outer, right outer, full outer) - * - * @param skewIndexedJavaPairRDD - * @param streamIndexedJavaPairRDD - * @param partitioner - * @return + * Do all kinds of Join (inner, left outer, right outer, full outer) */ private JavaRDD<Tuple> doJoin( JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD, @@ -616,25 +608,22 @@ public class SkewedJoinConverter impleme JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> resultKeyValue = skewIndexedJavaPairRDD. join(streamIndexedJavaPairRDD, partitioner); - return resultKeyValue.mapPartitions(toValueFun); + return resultKeyValue.mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun)); } else if (innerFlags[0] && !innerFlags[1]) { // left outer join - JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD. - leftOuterJoin(streamIndexedJavaPairRDD, partitioner); - - return resultKeyValue.mapPartitions(toValueFun); + return skewIndexedJavaPairRDD + .leftOuterJoin(streamIndexedJavaPairRDD, partitioner) + .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun)); } else if (!innerFlags[0] && innerFlags[1]) { // right outer join - JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Tuple>> resultKeyValue = skewIndexedJavaPairRDD. - rightOuterJoin(streamIndexedJavaPairRDD, partitioner); - - return resultKeyValue.mapPartitions(toValueFun); + return skewIndexedJavaPairRDD + .rightOuterJoin(streamIndexedJavaPairRDD, partitioner) + .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun)); } else { // full outer join - JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD. - fullOuterJoin(streamIndexedJavaPairRDD, partitioner); - - return resultKeyValue.mapPartitions(toValueFun); + return skewIndexedJavaPairRDD + .fullOuterJoin(streamIndexedJavaPairRDD, partitioner) + .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun)); } }
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Wed Jul 19 01:32:41 2017 @@ -22,25 +22,26 @@ import java.io.Serializable; import java.util.Iterator; import java.util.List; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import scala.Tuple2; import scala.runtime.AbstractFunction1; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; @SuppressWarnings("serial") public class SortConverter implements RDDConverter<Tuple, Tuple, POSort> { private static final Log LOG = LogFactory.getLog(SortConverter.class); - private static final FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction(); + private static final FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction(); @Override public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSort sortOperator) @@ -57,13 +58,13 @@ public class SortConverter implements RD JavaPairRDD<Tuple, Object> sorted = r.sortByKey( sortOperator.getMComparator(), true, parallelism); - JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION); + JavaRDD<Tuple> mapped = sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(TO_VALUE_FUNCTION)); return mapped.rdd(); } private static class ToValueFunction implements - FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable { + FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable { private class Tuple2TransformIterable implements Iterable<Tuple> { @@ -84,8 +85,8 @@ public class SortConverter implements RD } @Override - public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) { - return new Tuple2TransformIterable(input); + public Iterator<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) { + return new Tuple2TransformIterable(input).iterator(); } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java Wed Jul 19 01:32:41 2017 @@ -19,18 +19,18 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.pig.backend.hadoop.executionengine.spark.PairFlatMapFunctionAdapter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.spark.api.java.function.Function2; import scala.Tuple2; import scala.runtime.AbstractFunction1; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; @@ -40,12 +40,11 @@ import org.apache.pig.data.TupleFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.rdd.RDD; - /* - sort the sample data and convert the sample data to the format (all,{(sampleEle1),(sampleEle2),...}) - */ +/* + sort the sample data and convert the sample data to the format (all,{(sampleEle1),(sampleEle2),...}) + */ @SuppressWarnings("serial") public class SparkSampleSortConverter implements RDDConverter<Tuple, Tuple, POSampleSortSpark> { private static final Log LOG = LogFactory.getLog(SparkSampleSortConverter.class); @@ -66,14 +65,14 @@ public class SparkSampleSortConverter im //sort sample data JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true); //convert every element in sample data from element to (all, element) format - JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(new AggregateFunction()); + JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(SparkShims.getInstance().pairFlatMapFunction(new AggregateFunction())); //use groupByKey to aggregate all values( the format will be ((all),{(sampleEle1),(sampleEle2),...} ) JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new ToValueFunction()); return groupByKey.rdd(); } - private static class MergeFunction implements org.apache.spark.api.java.function.Function2<Tuple, Tuple, Tuple> + private static class MergeFunction implements Function2<Tuple, Tuple, Tuple> , Serializable { @Override @@ -89,7 +88,7 @@ public class SparkSampleSortConverter im // input: Tuple2<Tuple,Object> // output: Tuple2("all", Tuple) private static class AggregateFunction implements - PairFlatMapFunction<Iterator<Tuple2<Tuple, Object>>, String,Tuple>, Serializable { + PairFlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, String,Tuple>, Serializable { private class Tuple2TransformIterable implements Iterable<Tuple2<String,Tuple>> { @@ -111,8 +110,8 @@ public class SparkSampleSortConverter im } @Override - public Iterable<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, Object>> input) throws Exception { - return new Tuple2TransformIterable(input); + public Iterator<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, Object>> input) throws Exception { + return new Tuple2TransformIterable(input).iterator(); } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Wed Jul 19 01:32:41 2017 @@ -25,9 +25,8 @@ import java.util.List; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.backend.hadoop.executionengine.spark.*; import org.apache.pig.data.Tuple; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; public class StreamConverter implements @@ -35,44 +34,40 @@ public class StreamConverter implements @Override public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, - POStream poStream) throws IOException { + POStream poStream) throws IOException { SparkUtil.assertPredecessorSize(predecessors, poStream, 1); RDD<Tuple> rdd = predecessors.get(0); StreamFunction streamFunction = new StreamFunction(poStream); - return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(streamFunction), true).rdd(); } private static class StreamFunction implements - FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable { private POStream poStream; private StreamFunction(POStream poStream) { this.poStream = poStream; } - public Iterable<Tuple> call(final Iterator<Tuple> input) { - return new Iterable<Tuple>() { + @Override + public Iterator<Tuple> call(final Iterator<Tuple> input) { + return new OutputConsumerIterator(input) { + + @Override + protected void attach(Tuple tuple) { + poStream.setInputs(null); + poStream.attachInput(tuple); + } + @Override - public Iterator<Tuple> iterator() { - return new OutputConsumerIterator(input) { + protected Result getNextResult() throws ExecException { + Result result = poStream.getNextTuple(); + return result; + } - @Override - protected void attach(Tuple tuple) { - poStream.setInputs(null); - poStream.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - Result result = poStream.getNextTuple(); - return result; - } - - @Override - protected void endOfInput() { - poStream.setFetchable(true); - } - }; + @Override + protected void endOfInput() { + poStream.setFetchable(true); } }; } Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java?rev=1802347&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java Wed Jul 19 01:32:41 2017 @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tools.pigstats.spark; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.PigStatsUtil; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; +import scala.Option; + +import java.util.List; +import java.util.Map; + +public class Spark1JobStats extends SparkJobStats { + public Spark1JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { + super(jobId, plan, conf); + } + + public Spark1JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { + super(jobId, plan, conf); + } + + @Override + protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) { + Map<String, Long> results = Maps.newLinkedHashMap(); + + long executorDeserializeTime = 0; + long executorRunTime = 0; + long resultSize = 0; + long jvmGCTime = 0; + long resultSerializationTime = 0; + long memoryBytesSpilled = 0; + long diskBytesSpilled = 0; + long bytesRead = 0; + long bytesWritten = 0; + long remoteBlocksFetched = 0; + long localBlocksFetched = 0; + long fetchWaitTime = 0; + long remoteBytesRead = 0; + long shuffleBytesWritten = 0; + long shuffleWriteTime = 0; + boolean inputMetricExist = false; + boolean outputMetricExist = false; + boolean shuffleReadMetricExist = false; + boolean shuffleWriteMetricExist = false; + + for (List<TaskMetrics> stageMetric : jobMetric.values()) { + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + if (!taskMetrics.inputMetrics().isEmpty()) { + inputMetricExist = true; + bytesRead += taskMetrics.inputMetrics().get().bytesRead(); + } + + if (!taskMetrics.outputMetrics().isEmpty()) { + outputMetricExist = true; + bytesWritten += taskMetrics.outputMetrics().get().bytesWritten(); + } + + Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + if (!shuffleReadMetricsOption.isEmpty()) { + shuffleReadMetricExist = true; + remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); + } + + Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + if (!shuffleWriteMetricsOption.isEmpty()) { + shuffleWriteMetricExist = true; + shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + } + + } + } + } + } + + results.put("ExcutorDeserializeTime", executorDeserializeTime); + results.put("ExecutorRunTime", executorRunTime); + results.put("ResultSize", resultSize); + results.put("JvmGCTime", jvmGCTime); + results.put("ResultSerializationTime", resultSerializationTime); + results.put("MemoryBytesSpilled", memoryBytesSpilled); + results.put("DiskBytesSpilled", diskBytesSpilled); + if (inputMetricExist) { + results.put("BytesRead", bytesRead); + hdfsBytesRead = bytesRead; + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); + } + + if (outputMetricExist) { + results.put("BytesWritten", bytesWritten); + hdfsBytesWritten = bytesWritten; + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); + } + + if (shuffleReadMetricExist) { + results.put("RemoteBlocksFetched", remoteBlocksFetched); + results.put("LocalBlocksFetched", localBlocksFetched); + results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); + results.put("FetchWaitTime", fetchWaitTime); + results.put("RemoteBytesRead", remoteBytesRead); + } + + if (shuffleWriteMetricExist) { + results.put("ShuffleBytesWritten", shuffleBytesWritten); + results.put("ShuffleWriteTime", shuffleWriteTime); + } + + return results; + } +} Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java?rev=1802347&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java Wed Jul 19 01:32:41 2017 @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tools.pigstats.spark; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.PigStatsUtil; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; + +import java.util.List; +import java.util.Map; + +public class Spark2JobStats extends SparkJobStats { + public Spark2JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { + super(jobId, plan, conf); + } + + public Spark2JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { + super(jobId, plan, conf); + } + + @Override + protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) { + Map<String, Long> results = Maps.newLinkedHashMap(); + + long executorDeserializeTime = 0; + long executorRunTime = 0; + long resultSize = 0; + long jvmGCTime = 0; + long resultSerializationTime = 0; + long memoryBytesSpilled = 0; + long diskBytesSpilled = 0; + long bytesRead = 0; + long bytesWritten = 0; + long remoteBlocksFetched = 0; + long localBlocksFetched = 0; + long fetchWaitTime = 0; + long remoteBytesRead = 0; + long shuffleBytesWritten = 0; + long shuffleWriteTime = 0; + + for (List<TaskMetrics> stageMetric : jobMetric.values()) { + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + bytesRead += taskMetrics.inputMetrics().bytesRead(); + + bytesWritten += taskMetrics.outputMetrics().bytesWritten(); + + ShuffleReadMetrics shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + remoteBlocksFetched += shuffleReadMetricsOption.remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.remoteBytesRead(); + + ShuffleWriteMetrics shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + shuffleBytesWritten += shuffleWriteMetricsOption.shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.shuffleWriteTime(); + } + } + } + } + + results.put("ExcutorDeserializeTime", executorDeserializeTime); + results.put("ExecutorRunTime", executorRunTime); + results.put("ResultSize", resultSize); + results.put("JvmGCTime", jvmGCTime); + results.put("ResultSerializationTime", resultSerializationTime); + results.put("MemoryBytesSpilled", memoryBytesSpilled); + results.put("DiskBytesSpilled", diskBytesSpilled); + + results.put("BytesRead", bytesRead); + hdfsBytesRead = bytesRead; + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); + + results.put("BytesWritten", bytesWritten); + hdfsBytesWritten = bytesWritten; + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); + + results.put("RemoteBlocksFetched", remoteBlocksFetched); + results.put("LocalBlocksFetched", localBlocksFetched); + results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); + results.put("FetchWaitTime", fetchWaitTime); + results.put("RemoteBytesRead", remoteBytesRead); + + results.put("ShuffleBytesWritten", shuffleBytesWritten); + results.put("ShuffleWriteTime", shuffleWriteTime); + + return results; + } +} Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Jul 19 01:32:41 2017 @@ -21,30 +21,30 @@ package org.apache.pig.tools.pigstats.sp import java.util.List; import java.util.Map; -import org.apache.pig.tools.pigstats.*; -import scala.Option; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.Counters; import org.apache.pig.PigWarning; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; +import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.newplan.PlanVisitor; -import org.apache.spark.executor.ShuffleReadMetrics; -import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.pig.tools.pigstats.InputStats; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.PigStatsUtil; import org.apache.spark.executor.TaskMetrics; import com.google.common.collect.Maps; -public class SparkJobStats extends JobStats { +public abstract class SparkJobStats extends JobStats { private int jobId; private Map<String, Long> stats = Maps.newLinkedHashMap(); private boolean disableCounter; - private Counters counters = null; + protected Counters counters = null; public static String FS_COUNTER_GROUP = "FS_GROUP"; private Map<String, SparkCounter<Map<String, Long>>> warningCounters = null; @@ -58,6 +58,7 @@ public class SparkJobStats extends JobSt setConf(conf); } + @Override public void setConf(Configuration conf) { super.setConf(conf); disableCounter = conf.getBoolean("pig.disable.counter", false); @@ -65,7 +66,7 @@ public class SparkJobStats extends JobSt } public void addOutputInfo(POStore poStore, boolean success, - JobMetricsListener jobMetricsListener) { + JobStatisticCollector jobStatisticCollector) { if (!poStore.isTmpStore()) { long bytes = getOutputSize(poStore, conf); long recordsCount = -1; @@ -99,9 +100,9 @@ public class SparkJobStats extends JobSt inputs.add(inputStats); } - public void collectStats(JobMetricsListener jobMetricsListener) { - if (jobMetricsListener != null) { - Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId); + public void collectStats(JobStatisticCollector jobStatisticCollector) { + if (jobStatisticCollector != null) { + Map<String, List<TaskMetrics>> taskMetrics = jobStatisticCollector.getJobMetric(jobId); if (taskMetrics == null) { throw new RuntimeException("No task metrics available for jobId " + jobId); } @@ -109,110 +110,12 @@ public class SparkJobStats extends JobSt } } + protected abstract Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric); + public Map<String, Long> getStats() { return stats; } - private Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) { - Map<String, Long> results = Maps.newLinkedHashMap(); - - long executorDeserializeTime = 0; - long executorRunTime = 0; - long resultSize = 0; - long jvmGCTime = 0; - long resultSerializationTime = 0; - long memoryBytesSpilled = 0; - long diskBytesSpilled = 0; - long bytesRead = 0; - long bytesWritten = 0; - long remoteBlocksFetched = 0; - long localBlocksFetched = 0; - long fetchWaitTime = 0; - long remoteBytesRead = 0; - long shuffleBytesWritten = 0; - long shuffleWriteTime = 0; - boolean inputMetricExist = false; - boolean outputMetricExist = false; - boolean shuffleReadMetricExist = false; - boolean shuffleWriteMetricExist = false; - - for (List<TaskMetrics> stageMetric : jobMetric.values()) { - if (stageMetric != null) { - for (TaskMetrics taskMetrics : stageMetric) { - if (taskMetrics != null) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - if (!taskMetrics.inputMetrics().isEmpty()) { - inputMetricExist = true; - bytesRead += taskMetrics.inputMetrics().get().bytesRead(); - } - - if (!taskMetrics.outputMetrics().isEmpty()) { - outputMetricExist = true; - bytesWritten += taskMetrics.outputMetrics().get().bytesWritten(); - } - - Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - if (!shuffleReadMetricsOption.isEmpty()) { - shuffleReadMetricExist = true; - remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); - } - - Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - if (!shuffleWriteMetricsOption.isEmpty()) { - shuffleWriteMetricExist = true; - shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); - } - - } - } - } - } - - results.put("EexcutorDeserializeTime", executorDeserializeTime); - results.put("ExecutorRunTime", executorRunTime); - results.put("ResultSize", resultSize); - results.put("JvmGCTime", jvmGCTime); - results.put("ResultSerializationTime", resultSerializationTime); - results.put("MemoryBytesSpilled", memoryBytesSpilled); - results.put("DiskBytesSpilled", diskBytesSpilled); - if (inputMetricExist) { - results.put("BytesRead", bytesRead); - hdfsBytesRead = bytesRead; - counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); - } - - if (outputMetricExist) { - results.put("BytesWritten", bytesWritten); - hdfsBytesWritten = bytesWritten; - counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); - } - - if (shuffleReadMetricExist) { - results.put("RemoteBlocksFetched", remoteBlocksFetched); - results.put("LocalBlocksFetched", localBlocksFetched); - results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); - results.put("FetchWaitTime", fetchWaitTime); - results.put("RemoteBytesRead", remoteBytesRead); - } - - if (shuffleWriteMetricExist) { - results.put("ShuffleBytesWritten", shuffleBytesWritten); - results.put("ShuffleWriteTime", shuffleWriteTime); - } - - return results; - } - @Override public String getJobId() { return String.valueOf(jobId); Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Jul 19 01:32:41 2017 @@ -32,7 +32,8 @@ import org.apache.pig.PigWarning; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; +import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; @@ -69,14 +70,14 @@ public class SparkPigStats extends PigSt } public void addJobStats(POStore poStore, SparkOperator sparkOperator, int jobId, - JobMetricsListener jobMetricsListener, + JobStatisticCollector jobStatisticCollector, JavaSparkContext sparkContext) { boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext); - SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); + SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf); jobStats.setSuccessful(isSuccess); - jobStats.collectStats(jobMetricsListener); - jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener); - addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf); + jobStats.collectStats(jobStatisticCollector); + jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector); + addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobStatisticCollector, conf); jobStats.initWarningCounters(); jobSparkOperatorMap.put(jobStats, sparkOperator); @@ -85,22 +86,22 @@ public class SparkPigStats extends PigSt public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, String jobId, - JobMetricsListener jobMetricsListener, + JobStatisticCollector jobStatisticCollector, JavaSparkContext sparkContext, Exception e) { boolean isSuccess = false; - SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); + SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf); jobStats.setSuccessful(isSuccess); - jobStats.collectStats(jobMetricsListener); - jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener); - addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf); + jobStats.collectStats(jobStatisticCollector); + jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector); + addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobStatisticCollector, conf); jobSparkOperatorMap.put(jobStats, sparkOperator); jobPlan.add(jobStats); jobStats.setBackendException(e); } public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean isSuccess, Exception e) { - SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); + SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf); jobStats.setSuccessful(isSuccess); jobSparkOperatorMap.put(jobStats, sparkOperator); jobPlan.add(jobStats); @@ -229,7 +230,7 @@ public class SparkPigStats extends PigSt private void addInputInfoForSparkOper(SparkOperator sparkOperator, SparkJobStats jobStats, boolean isSuccess, - JobMetricsListener jobMetricsListener, + JobStatisticCollector jobStatisticCollector, Configuration conf) { //to avoid repetition if (sparkOperatorsSet.contains(sparkOperator)) { Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Jul 19 01:32:41 2017 @@ -26,7 +26,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.spark.JobGraphBuilder; -import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; +import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector; import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.tools.pigstats.PigStatsUtil; @@ -44,7 +44,7 @@ public class SparkStatsUtil { public static void waitForJobAddStats(int jobID, POStore poStore, SparkOperator sparkOperator, - JobMetricsListener jobMetricsListener, + JobStatisticCollector jobStatisticCollector, JavaSparkContext sparkContext, SparkPigStats sparkPigStats) throws InterruptedException { @@ -55,20 +55,17 @@ public class SparkStatsUtil { // "event bus" thread updating it's internal listener and // this driver thread calling SparkStatusTracker. // To workaround this, we will wait for this job to "finish". - jobMetricsListener.waitForJobToEnd(jobID); - sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobMetricsListener, + jobStatisticCollector.waitForJobToEnd(jobID); + sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobStatisticCollector, sparkContext); - jobMetricsListener.cleanup(jobID); + jobStatisticCollector.cleanup(jobID); } public static void addFailJobStats(String jobID, POStore poStore, SparkOperator sparkOperator, SparkPigStats sparkPigStats, Exception e) { - JobMetricsListener jobMetricsListener = null; - JavaSparkContext sparkContext = null; - sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, jobMetricsListener, - sparkContext, e); + sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, null, null, e); } public static String getCounterName(POStore store) { Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jul 19 01:32:41 2017 @@ -60,7 +60,6 @@ import org.apache.pig.tools.pigstats.Pig import org.apache.pig.tools.pigstats.PigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; -import org.apache.pig.tools.pigstats.spark.SparkJobStats; import org.junit.AfterClass; import org.junit.Assume; import org.junit.Before;
