Author: rohini Date: Wed May 17 20:48:21 2017 New Revision: 1795451 URL: http://svn.apache.org/viewvc?rev=1795451&view=rev Log: PIG-5186: Support aggregate warnings with Spark engine (szita via rohini)
Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java Modified: pig/branches/spark/src/org/apache/pig/PigWarning.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java pig/branches/spark/test/e2e/pig/tests/nightly.conf Modified: pig/branches/spark/src/org/apache/pig/PigWarning.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigWarning.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/PigWarning.java (original) +++ pig/branches/spark/src/org/apache/pig/PigWarning.java Wed May 17 20:48:21 2017 @@ -68,6 +68,8 @@ public enum PigWarning { DELETE_FAILED, PROJECTION_INVALID_RANGE, NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY, - SKIP_UDF_CALL_FOR_NULL + SKIP_UDF_CALL_FOR_NULL, + SPARK_WARN, //bulk collection of warnings under Spark exec engine + SPARK_CUSTOM_WARN // same as above but for custom UDF warnings only, see PIG-2207 ; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed May 17 20:48:21 2017 @@ -23,7 +23,7 @@ import org.apache.pig.EvalFunc; import org.apache.pig.LoadFunc; import org.apache.pig.StoreFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger; -import org.apache.pig.tools.pigstats.PigStatusReporter; +import org.apache.pig.tools.pigstats.PigWarnCounter; /** * @@ -36,7 +36,7 @@ public final class PigHadoopLogger imple private static Log log = LogFactory.getLog(PigHadoopLogger.class); private static PigHadoopLogger logger = null; - private PigStatusReporter reporter = null; + private PigWarnCounter reporter = null; private boolean aggregate = false; private PigHadoopLogger() { @@ -52,7 +52,7 @@ public final class PigHadoopLogger imple return logger; } - public void setReporter(PigStatusReporter reporter) { + public void setReporter(PigWarnCounter reporter) { this.reporter = reporter; } @@ -65,10 +65,10 @@ public final class PigHadoopLogger imple if (getAggregate()) { if (reporter != null) { if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) { - reporter.incrCounter(className, warningEnum.name(), 1); + reporter.incrWarnCounter(className, warningEnum.name(), 1L); } // For backwards compatibility, always report with warningEnum, see PIG-3739 - reporter.incrCounter(warningEnum, 1); + reporter.incrWarnCounter(warningEnum, 1L); } else { //TODO: //in local mode of execution if the PigHadoopLogger is used initially, Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed May 17 20:48:21 2017 @@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; +import org.apache.pig.PigWarning; import org.apache.pig.backend.BackendException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; @@ -123,10 +124,12 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.JarManager; +import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.spark.SparkCounterGroup; import org.apache.pig.tools.pigstats.spark.SparkCounters; import org.apache.pig.tools.pigstats.spark.SparkPigStats; import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter; @@ -154,6 +157,7 @@ public class SparkLauncher extends Launc private JobConf jobConf = null; private String currentDirectoryPath = null; private SparkEngineConf sparkEngineConf = new SparkEngineConf(); + private static final String PIG_WARNING_FQCN = PigWarning.class.getCanonicalName(); @Override public PigStats launchPig(PhysicalPlan physicalPlan, String grpName, @@ -185,7 +189,7 @@ public class SparkLauncher extends Launc new ParallelismSetter(sparkplan, jobConf).visit(); - SparkPigStatusReporter.getInstance().setCounters(new SparkCounters(sparkContext)); + prepareSparkCounters(jobConf); // Create conversion map, mapping between pig operator and spark convertor Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap @@ -699,4 +703,25 @@ public class SparkLauncher extends Launc SparkPigContext.get().setPigDefaultParallelism(Integer.parseInt(parallelism)); } } + + /** + * Creates new SparkCounters instance for the job, initializes aggregate warning counters if required + * @param jobConf + * @throws IOException + */ + private static void prepareSparkCounters(JobConf jobConf) throws IOException { + SparkPigStatusReporter statusReporter = SparkPigStatusReporter.getInstance(); + SparkCounters counters = new SparkCounters(sparkContext); + + if ("true".equalsIgnoreCase(jobConf.get("aggregate.warning"))) { + SparkCounterGroup pigWarningGroup = new SparkCounterGroup.MapSparkCounterGroup( + PIG_WARNING_FQCN, PIG_WARNING_FQCN,sparkContext + ); + pigWarningGroup.createCounter(PigWarning.SPARK_WARN.name(), new HashMap<String,Long>()); + pigWarningGroup.createCounter(PigWarning.SPARK_CUSTOM_WARN.name(), new HashMap<String,Long>()); + counters.getSparkCounterGroups().put(PIG_WARNING_FQCN, pigWarningGroup); + } + statusReporter.setCounters(counters); + jobConf.set("pig.spark.counters", ObjectSerializer.serialize(counters)); + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed May 17 20:48:21 2017 @@ -38,47 +38,44 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; -import org.apache.pig.tools.pigstats.PigStatusReporter; +import org.apache.pig.tools.pigstats.spark.SparkCounters; public class PigInputFormatSpark extends PigInputFormat { -@Override -public RecordReader<Text, Tuple> createRecordReader(InputSplit split, - TaskAttemptContext context) throws IOException, - InterruptedException { - initLogger(); - resetUDFContext(); - //PigSplit#conf is the default hadoop configuration, we need get the configuration - //from context.getConfigration() to retrieve pig properties - PigSplit pigSplit = (PigSplit) split; - Configuration conf = context.getConfiguration(); - pigSplit.setConf(conf); - //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX) - //which will be used in POMergeCogroup#setup - if (PigMapReduce.sJobContext == null) { - PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID()); + @Override + public RecordReader<Text, Tuple> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, + InterruptedException { + resetUDFContext(); + //PigSplit#conf is the default hadoop configuration, we need get the configuration + //from context.getConfigration() to retrieve pig properties + PigSplit pigSplit = (PigSplit) split; + Configuration conf = context.getConfiguration(); + pigSplit.setConf(conf); + //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX) + //which will be used in POMergeCogroup#setup + if (PigMapReduce.sJobContext == null) { + PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID()); + } + PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex()); + // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and + // SchemaTupleBackend by reading properties from JobConf + initialize(conf); + return super.createRecordReader(split, context); } - PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex()); - // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and - // SchemaTupleBackend by reading properties from JobConf - initialize(conf); - return super.createRecordReader(split, context); -} private void initialize(Configuration jobConf) throws IOException { MapRedUtil.setupUDFContext(jobConf); PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext")); SchemaTupleBackend.initialize(jobConf, pc); PigMapReduce.sJobConfInternal.set(jobConf); - } - - private void resetUDFContext() { - UDFContext.getUDFContext().reset(); - } - - private void initLogger() { PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); - pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); + pigHadoopLogger.setAggregate("true".equalsIgnoreCase(jobConf.get("aggregate.warning"))); + pigHadoopLogger.setReporter((SparkCounters)ObjectSerializer.deserialize(jobConf.get("pig.spark.counters"))); PhysicalOperator.setPigLogger(pigHadoopLogger); } -} + + private void resetUDFContext() { + UDFContext.getUDFContext().reset(); + } +} \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Wed May 17 20:48:21 2017 @@ -27,7 +27,7 @@ import org.apache.pig.classification.Int @InterfaceAudience.Public @InterfaceStability.Evolving -public class PigStatusReporter extends StatusReporter implements Progressable { +public class PigStatusReporter extends StatusReporter implements Progressable, PigWarnCounter { private static PigStatusReporter reporter = null; @@ -86,6 +86,16 @@ public class PigStatusReporter extends S } @Override + public boolean incrWarnCounter(Enum<?> name, Object incr) { + return incrCounter(name, (Long)incr); + } + + @Override + public boolean incrWarnCounter(String group, String name, Object incr) { + return incrCounter(group, name, (Long)incr); + } + + @Override public void progress() { if (context != null) { context.progress(); Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java?rev=1795451&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java (added) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java Wed May 17 20:48:21 2017 @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats; + +/* + Interface for incrementing warning counters + */ +public interface PigWarnCounter { + + boolean incrWarnCounter(Enum<?> name, Object incr); + + boolean incrWarnCounter(String group, String name, Object incr); +} Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java Wed May 17 20:48:21 2017 @@ -20,16 +20,18 @@ package org.apache.pig.tools.pigstats.sp import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import org.apache.spark.Accumulator; import org.apache.spark.AccumulatorParam; import org.apache.spark.api.java.JavaSparkContext; -public class SparkCounter implements Serializable { +public abstract class SparkCounter<T> implements Serializable { private String name; private String displayName; - private Accumulator<Long> accumulator; + private Accumulator<T> accumulator; public SparkCounter() { // For serialization. @@ -39,25 +41,34 @@ public class SparkCounter implements Ser String name, String displayName, String groupName, - long initValue, + T initValue, JavaSparkContext sparkContext) { this.name = name; this.displayName = displayName; - LongAccumulatorParam longAccumulatorParam = new LongAccumulatorParam(); + String accumulatorName = groupName + "_" + name; - this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longAccumulatorParam); + + if (sparkContext == null){ + //Spark executors can register new Accumulators but they won't make it back to the driver hence the limitation + throw new RuntimeException("Not allowed to create SparkCounter on backend executor."); + + } + this.accumulator = sparkContext.accumulator(initValue, accumulatorName, createAccumulatorParam()); + } - public long getValue() { + protected abstract AccumulatorParam<T> createAccumulatorParam(); + + public T getValue() { if (accumulator != null) { return accumulator.value(); } else { - return 0L; + return null; } } - public void increment(long incr) { + public void increment(T incr) { accumulator.add(incr); } @@ -73,21 +84,82 @@ public class SparkCounter implements Ser this.displayName = displayName; } - class LongAccumulatorParam implements AccumulatorParam<Long> { + public static class LongSparkCounter extends SparkCounter<Long> { - @Override - public Long addAccumulator(Long t1, Long t2) { - return t1 + t2; + public LongSparkCounter(){} + + public LongSparkCounter( + String name, + String displayName, + String groupName, + Long initValue, + JavaSparkContext sparkContext){ + super(name, displayName, groupName, initValue, sparkContext); } @Override - public Long addInPlace(Long r1, Long r2) { - return r1 + r2; + protected AccumulatorParam<Long> createAccumulatorParam() { + return new LongAccumulatorParam(); + } + + private class LongAccumulatorParam implements AccumulatorParam<Long> { + + @Override + public Long addAccumulator(Long t1, Long t2) { + return t1 + t2; + } + + @Override + public Long addInPlace(Long r1, Long r2) { + return r1 + r2; + } + + @Override + public Long zero(Long initialValue) { + return 0L; + } + } + } + + public static class MapSparkCounter extends SparkCounter<Map<String,Long>> { + + public MapSparkCounter(){} + + public MapSparkCounter( + String name, + String displayName, + String groupName, + Map<String,Long> initValue, + JavaSparkContext sparkContext){ + super(name, displayName, groupName, initValue, sparkContext); } @Override - public Long zero(Long initialValue) { - return 0L; + protected AccumulatorParam<Map<String, Long>> createAccumulatorParam() { + return new MapAccumulatorParam(); + } + + private class MapAccumulatorParam implements AccumulatorParam<Map<String,Long>> { + + @Override + public Map<String, Long> addAccumulator(Map<String, Long> t1, Map<String, Long> t2) { + return addInPlace(t1, t2); + } + + @Override + public Map<String, Long> addInPlace(Map<String, Long> r1, Map<String, Long> r2) { + for (String key : r2.keySet()){ + Long r1val = r1.get(key); + Long r2val = r2.get(key); + r1.put(key,r1val == null ? r2val : r1val+r2val); + } + return r1; + } + + @Override + public Map<String, Long> zero(Map<String, Long> initialValue) { + return new HashMap<>(); + } } } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java Wed May 17 20:48:21 2017 @@ -25,12 +25,12 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; -public class SparkCounterGroup implements Serializable { - private String groupName; - private String groupDisplayName; - private Map<String, SparkCounter> sparkCounters; +public abstract class SparkCounterGroup<T> implements Serializable { + protected String groupName; + protected String groupDisplayName; + protected Map<String, SparkCounter<T>> sparkCounters; - private transient JavaSparkContext javaSparkContext; + protected transient JavaSparkContext javaSparkContext; private SparkCounterGroup() { // For serialization. @@ -43,13 +43,10 @@ public class SparkCounterGroup implement this.groupName = groupName; this.groupDisplayName = groupDisplayName; this.javaSparkContext = javaSparkContext; - this.sparkCounters = new HashMap<String, SparkCounter>(); + this.sparkCounters = new HashMap<String, SparkCounter<T>>(); } - public void createCounter(String name, long initValue) { - SparkCounter counter = new SparkCounter(name, name, groupName, initValue, javaSparkContext); - sparkCounters.put(name, counter); - } + public abstract void createCounter(String name, T initValue); public SparkCounter getCounter(String name) { return sparkCounters.get(name); @@ -67,7 +64,35 @@ public class SparkCounterGroup implement this.groupDisplayName = groupDisplayName; } - public Map<String, SparkCounter> getSparkCounters() { + public Map<String, SparkCounter<T>> getSparkCounters() { return sparkCounters; } + + public static class LongSparkCounterGroup extends SparkCounterGroup<Long> { + + public LongSparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + super(groupName,groupDisplayName,javaSparkContext); + } + public void createCounter(String name, Long initValue){ + SparkCounter counter = new SparkCounter.LongSparkCounter(name, name, groupName, initValue, javaSparkContext); + sparkCounters.put(name,counter); + } + } + + public static class MapSparkCounterGroup extends SparkCounterGroup<Map<String,Long>> { + + public MapSparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + super(groupName,groupDisplayName,javaSparkContext); + } + public void createCounter(String name, Map<String,Long> initValue){ + SparkCounter counter = new SparkCounter.MapSparkCounter(name, name, groupName, initValue, javaSparkContext); + sparkCounters.put(name,counter); + } + } } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java Wed May 17 20:48:21 2017 @@ -21,13 +21,15 @@ package org.apache.pig.tools.pigstats.sp import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigWarning; +import org.apache.pig.tools.pigstats.PigWarnCounter; import org.apache.spark.api.java.JavaSparkContext; import java.io.Serializable; import java.util.HashMap; import java.util.Map; -public class SparkCounters implements Serializable { +public class SparkCounters implements Serializable, PigWarnCounter { private static final long serialVersionUID = 1L; private static final Log LOG = LogFactory.getLog(SparkCounters.class); @@ -57,7 +59,7 @@ public class SparkCounters implements Se createCounter(groupName, counterName, 0L); } - public void createCounter(String groupName, String counterName, long initValue) { + public void createCounter(String groupName, String counterName, Object initValue) { getGroup(groupName).createCounter(counterName, initValue); } @@ -74,11 +76,11 @@ public class SparkCounters implements Se } } - public long getValue(String groupName, String counterName) { + public Object getValue(String groupName, String counterName) { SparkCounter counter = getGroup(groupName).getCounter(counterName); if (counter == null) { LOG.error(String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); - return 0; + return null; } else { return counter.getValue(); } @@ -95,7 +97,7 @@ public class SparkCounters implements Se private SparkCounterGroup getGroup(String groupName) { SparkCounterGroup group = sparkCounterGroups.get(groupName); if (group == null) { - group = new SparkCounterGroup(groupName, groupName, javaSparkContext); + group = new SparkCounterGroup.LongSparkCounterGroup(groupName, groupName, javaSparkContext); sparkCounterGroups.put(groupName, group); } return group; @@ -104,4 +106,27 @@ public class SparkCounters implements Se public Map<String, SparkCounterGroup> getSparkCounterGroups() { return sparkCounterGroups; } + + + @Override + public boolean incrWarnCounter(Enum<?> name, Object incr) { + SparkCounter counter = getCounter(PigWarning.SPARK_WARN); + return _incrWarnCounter(counter, name.name(), (Long) incr); + } + + @Override + public boolean incrWarnCounter(String group, String name, Object incr) { + SparkCounter counter = getCounter(PigWarning.SPARK_CUSTOM_WARN); + return _incrWarnCounter(counter, group+"::"+name, (Long) incr); + } + + private static boolean _incrWarnCounter(SparkCounter counter, String name, Long incr) { + if (counter == null){ + return false; + } + Map<String,Long> map = new HashMap<String,Long>(); + map.put(name, incr); + counter.increment(map); + return true; + } } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed May 17 20:48:21 2017 @@ -26,6 +26,7 @@ import scala.Option; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.Counters; +import org.apache.pig.PigWarning; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; @@ -45,6 +46,7 @@ public class SparkJobStats extends JobSt private boolean disableCounter; private Counters counters = null; public static String FS_COUNTER_GROUP = "FS_GROUP"; + private Map<String, SparkCounter<Map<String, Long>>> warningCounters = null; protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { this(String.valueOf(jobId), plan, conf); @@ -330,4 +332,18 @@ public class SparkJobStats extends JobSt fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_READ, PigStatsUtil.HDFS_BYTES_READ, 0); fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_WRITTEN, PigStatsUtil.HDFS_BYTES_WRITTEN, 0); } + + + public Map<String, SparkCounter<Map<String, Long>>> getWarningCounters() { + return warningCounters; + } + + public void initWarningCounters() { + SparkCounters counters = SparkPigStatusReporter.getInstance().getCounters(); + SparkCounterGroup<Map<String, Long>> sparkCounterGroup = counters.getSparkCounterGroups().get( + PigWarning.class.getCanonicalName()); + if (sparkCounterGroup != null) { + this.warningCounters = sparkCounterGroup.getSparkCounters(); + } + } } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed May 17 20:48:21 2017 @@ -23,12 +23,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobClient; +import org.apache.pig.PigWarning; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; @@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.CompilationMessageCollector; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; @@ -76,7 +77,9 @@ public class SparkPigStats extends PigSt jobStats.collectStats(jobMetricsListener); jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener); addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf); + jobStats.initWarningCounters(); jobSparkOperatorMap.put(jobStats, sparkOperator); + jobPlan.add(jobStats); } @@ -110,7 +113,35 @@ public class SparkPigStats extends PigSt } private void display() { - LOG.info(getDisplayString()); + LOG.info(getDisplayString()); + handleAggregateWarnings(); + } + + private void handleAggregateWarnings() { + Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>(); + + Iterator<JobStats> iter = jobPlan.iterator(); + while (iter.hasNext()) { + SparkJobStats js = (SparkJobStats) iter.next(); + Map<String, SparkCounter<Map<String,Long>>> counterMap = js.getWarningCounters(); + if (counterMap == null) { + continue; + } + Map<String, Long> warningCounters = counterMap.get(PigWarning.SPARK_WARN.name()).getValue(); + if (warningCounters == null) { + continue; + } + for (String warnKey : warningCounters.keySet()) { + Long val = warningAggMap.get(warnKey); + if (val != null) { + val += (Long)warningCounters.get(warnKey); + } else { + val = (Long)warningCounters.get(warnKey); + } + warningAggMap.put(PigWarning.valueOf(warnKey), val); + } + } + CompilationMessageCollector.logAggregate(warningAggMap, CompilationMessageCollector.MessageType.Warning, LOG); } @Override @@ -218,5 +249,4 @@ public class SparkPigStats extends PigSt sparkOperatorsSet.add(sparkOperator); } - } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed May 17 20:48:21 2017 @@ -97,13 +97,23 @@ public class SparkStatsUtil { public static long getRecordCount(POStore store) { SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); - return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getCounterName(store)); + Object value = reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getCounterName(store)); + if (value == null) { + return 0L; + } else { + return (Long)value; + } } public static long getRecordCount(POLoad load) { SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan()); - return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getCounterName(load))/loadersCount; + Object value = reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getCounterName(load)); + if (value == null) { + return 0L; + } else { + return (Long)value/loadersCount; + } } private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){ Modified: pig/branches/spark/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/nightly.conf?rev=1795451&r1=1795450&r2=1795451&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/nightly.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/nightly.conf Wed May 17 20:48:21 2017 @@ -6016,8 +6016,9 @@ store a into ':OUTPATH:';\, fs -rm :INPATH:/singlefile/names.txt# }, { - # Custom Hive UDF and MapredContext + # Custom Hive UDF and MapredContext - disabled for Spark: see PIG-5234 'num' => 7, + 'execonly' => 'mapred,tez', 'pig' => q\set mapred.max.split.size '100000000' register :FUNCPATH:/testudf.jar; define DummyContextUDF HiveUDF('org.apache.pig.test.udf.evalfunc.DummyContextUDF');