Author: rohini
Date: Wed May 17 20:48:21 2017
New Revision: 1795451

URL: http://svn.apache.org/viewvc?rev=1795451&view=rev
Log:
PIG-5186: Support aggregate warnings with Spark engine (szita via rohini)

Added:
    pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java
Modified:
    pig/branches/spark/src/org/apache/pig/PigWarning.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
    pig/branches/spark/test/e2e/pig/tests/nightly.conf

Modified: pig/branches/spark/src/org/apache/pig/PigWarning.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigWarning.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigWarning.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigWarning.java Wed May 17 20:48:21 
2017
@@ -68,6 +68,8 @@ public enum PigWarning {
     DELETE_FAILED,
     PROJECTION_INVALID_RANGE,
     NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY,
-    SKIP_UDF_CALL_FOR_NULL
+    SKIP_UDF_CALL_FOR_NULL,
+    SPARK_WARN, //bulk collection of warnings under Spark exec engine
+    SPARK_CUSTOM_WARN // same as above but for custom UDF warnings only, see 
PIG-2207
     ;
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 Wed May 17 20:48:21 2017
@@ -23,7 +23,7 @@ import org.apache.pig.EvalFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
+import org.apache.pig.tools.pigstats.PigWarnCounter;
 
 /**
  *
@@ -36,7 +36,7 @@ public final class PigHadoopLogger imple
     private static Log log = LogFactory.getLog(PigHadoopLogger.class);
     private static PigHadoopLogger logger = null;
 
-    private PigStatusReporter reporter = null;
+    private PigWarnCounter reporter = null;
     private boolean aggregate = false;
 
     private PigHadoopLogger() {
@@ -52,7 +52,7 @@ public final class PigHadoopLogger imple
         return logger;
     }
 
-    public void setReporter(PigStatusReporter reporter) {
+    public void setReporter(PigWarnCounter reporter) {
         this.reporter = reporter;
     }
 
@@ -65,10 +65,10 @@ public final class PigHadoopLogger imple
         if (getAggregate()) {
             if (reporter != null) {
                 if (o instanceof EvalFunc || o instanceof LoadFunc || o 
instanceof StoreFunc) {
-                    reporter.incrCounter(className, warningEnum.name(), 1);
+                    reporter.incrWarnCounter(className, warningEnum.name(), 
1L);
                 }
                 // For backwards compatibility, always report with 
warningEnum, see PIG-3739
-                reporter.incrCounter(warningEnum, 1);
+                reporter.incrWarnCounter(warningEnum, 1L);
             } else {
                 //TODO:
                 //in local mode of execution if the PigHadoopLogger is used 
initially,

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed May 17 20:48:21 2017
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -123,10 +124,12 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkCounterGroup;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
@@ -154,6 +157,7 @@ public class SparkLauncher extends Launc
     private JobConf jobConf = null;
     private String currentDirectoryPath = null;
     private SparkEngineConf sparkEngineConf = new SparkEngineConf();
+    private static final String PIG_WARNING_FQCN = 
PigWarning.class.getCanonicalName();
 
     @Override
     public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
@@ -185,7 +189,7 @@ public class SparkLauncher extends Launc
 
         new ParallelismSetter(sparkplan, jobConf).visit();
 
-        SparkPigStatusReporter.getInstance().setCounters(new 
SparkCounters(sparkContext));
+        prepareSparkCounters(jobConf);
 
         // Create conversion map, mapping between pig operator and spark 
convertor
         Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
@@ -699,4 +703,25 @@ public class SparkLauncher extends Launc
             
SparkPigContext.get().setPigDefaultParallelism(Integer.parseInt(parallelism));
         }
     }
+
+    /**
+     * Creates new SparkCounters instance for the job, initializes aggregate 
warning counters if required
+     * @param jobConf
+     * @throws IOException
+     */
+    private static void prepareSparkCounters(JobConf jobConf) throws 
IOException {
+        SparkPigStatusReporter statusReporter = 
SparkPigStatusReporter.getInstance();
+        SparkCounters counters = new SparkCounters(sparkContext);
+
+        if ("true".equalsIgnoreCase(jobConf.get("aggregate.warning"))) {
+            SparkCounterGroup pigWarningGroup = new 
SparkCounterGroup.MapSparkCounterGroup(
+                    PIG_WARNING_FQCN, PIG_WARNING_FQCN,sparkContext
+            );
+            pigWarningGroup.createCounter(PigWarning.SPARK_WARN.name(), new 
HashMap<String,Long>());
+            pigWarningGroup.createCounter(PigWarning.SPARK_CUSTOM_WARN.name(), 
new HashMap<String,Long>());
+            counters.getSparkCounterGroups().put(PIG_WARNING_FQCN, 
pigWarningGroup);
+        }
+        statusReporter.setCounters(counters);
+        jobConf.set("pig.spark.counters", 
ObjectSerializer.serialize(counters));
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 Wed May 17 20:48:21 2017
@@ -38,47 +38,44 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
 
 public class PigInputFormatSpark extends PigInputFormat {
 
-@Override
-public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
-                                                    TaskAttemptContext 
context) throws IOException,
-        InterruptedException {
-    initLogger();
-    resetUDFContext();
-    //PigSplit#conf is the default hadoop configuration, we need get the 
configuration
-    //from context.getConfigration() to retrieve pig properties
-    PigSplit pigSplit = (PigSplit) split;
-    Configuration conf = context.getConfiguration();
-    pigSplit.setConf(conf);
-    //Set current splitIndex in 
PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
-    //which will be used in POMergeCogroup#setup
-    if (PigMapReduce.sJobContext == null) {
-        PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new 
JobID());
+       @Override
+       public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+                       TaskAttemptContext context) throws IOException,
+                       InterruptedException {
+        resetUDFContext();
+        //PigSplit#conf is the default hadoop configuration, we need get the 
configuration
+        //from context.getConfigration() to retrieve pig properties
+        PigSplit pigSplit = (PigSplit) split;
+        Configuration conf = context.getConfiguration();
+        pigSplit.setConf(conf);
+        //Set current splitIndex in 
PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
+        //which will be used in POMergeCogroup#setup
+        if (PigMapReduce.sJobContext == null) {
+            PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new 
JobID());
+        }
+        
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX,
 pigSplit.getSplitIndex());
+        // Here JobConf is first available in spark Executor thread, we 
initialize PigContext,UDFContext and
+        // SchemaTupleBackend by reading properties from JobConf
+        initialize(conf);
+        return super.createRecordReader(split, context);
     }
-    
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX,
 pigSplit.getSplitIndex());
-    // Here JobConf is first available in spark Executor thread, we initialize 
PigContext,UDFContext and
-    // SchemaTupleBackend by reading properties from JobConf
-    initialize(conf);
-    return super.createRecordReader(split, context);
-}
 
     private void initialize(Configuration jobConf) throws IOException {
         MapRedUtil.setupUDFContext(jobConf);
         PigContext pc = (PigContext) 
ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
         SchemaTupleBackend.initialize(jobConf, pc);
         PigMapReduce.sJobConfInternal.set(jobConf);
-    }
-
-    private void resetUDFContext() {
-        UDFContext.getUDFContext().reset();
-    }
-
-    private void initLogger() {
         PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
-        pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+        
pigHadoopLogger.setAggregate("true".equalsIgnoreCase(jobConf.get("aggregate.warning")));
+        
pigHadoopLogger.setReporter((SparkCounters)ObjectSerializer.deserialize(jobConf.get("pig.spark.counters")));
         PhysicalOperator.setPigLogger(pigHadoopLogger);
     }
-}
+
+    private void resetUDFContext() {
+               UDFContext.getUDFContext().reset();
+       }
+}
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java 
Wed May 17 20:48:21 2017
@@ -27,7 +27,7 @@ import org.apache.pig.classification.Int
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class PigStatusReporter extends StatusReporter implements Progressable {
+public class PigStatusReporter extends StatusReporter implements Progressable, 
PigWarnCounter {
 
     private static PigStatusReporter reporter = null;
 
@@ -86,6 +86,16 @@ public class PigStatusReporter extends S
     }
 
     @Override
+    public boolean incrWarnCounter(Enum<?> name, Object incr) {
+        return incrCounter(name, (Long)incr);
+    }
+
+    @Override
+    public boolean incrWarnCounter(String group, String name, Object incr) {
+        return incrCounter(group, name, (Long)incr);
+    }
+
+    @Override
     public void progress() {
         if (context != null) {
             context.progress();

Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java?rev=1795451&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java 
(added)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigWarnCounter.java 
Wed May 17 20:48:21 2017
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+/*
+    Interface for incrementing warning counters
+ */
+public interface PigWarnCounter {
+
+    boolean incrWarnCounter(Enum<?> name, Object incr);
+
+    boolean incrWarnCounter(String group, String name, Object incr);
+}

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java 
Wed May 17 20:48:21 2017
@@ -20,16 +20,18 @@ package org.apache.pig.tools.pigstats.sp
 
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.spark.Accumulator;
 import org.apache.spark.AccumulatorParam;
 import org.apache.spark.api.java.JavaSparkContext;
 
-public class SparkCounter implements Serializable {
+public abstract class SparkCounter<T> implements Serializable {
 
     private String name;
     private String displayName;
-    private Accumulator<Long> accumulator;
+    private Accumulator<T> accumulator;
 
     public SparkCounter() {
         // For serialization.
@@ -39,25 +41,34 @@ public class SparkCounter implements Ser
             String name,
             String displayName,
             String groupName,
-            long initValue,
+            T initValue,
             JavaSparkContext sparkContext) {
 
         this.name = name;
         this.displayName = displayName;
-        LongAccumulatorParam longAccumulatorParam = new LongAccumulatorParam();
+
         String accumulatorName = groupName + "_" + name;
-        this.accumulator = sparkContext.accumulator(initValue, 
accumulatorName, longAccumulatorParam);
+
+        if (sparkContext == null){
+            //Spark executors can register new Accumulators but they won't 
make it back to the driver hence the limitation
+            throw new  RuntimeException("Not allowed to create SparkCounter on 
backend executor.");
+
+        }
+        this.accumulator = sparkContext.accumulator(initValue, 
accumulatorName,  createAccumulatorParam());
+
     }
 
-    public long getValue() {
+    protected abstract AccumulatorParam<T> createAccumulatorParam();
+
+    public T getValue() {
         if (accumulator != null) {
             return accumulator.value();
         } else {
-            return 0L;
+            return null;
         }
     }
 
-    public void increment(long incr) {
+    public void increment(T incr) {
         accumulator.add(incr);
     }
 
@@ -73,21 +84,82 @@ public class SparkCounter implements Ser
         this.displayName = displayName;
     }
 
-    class LongAccumulatorParam implements AccumulatorParam<Long> {
+    public static class LongSparkCounter extends SparkCounter<Long> {
 
-        @Override
-        public Long addAccumulator(Long t1, Long t2) {
-            return t1 + t2;
+        public LongSparkCounter(){}
+
+        public LongSparkCounter(
+                String name,
+                String displayName,
+                String groupName,
+                Long initValue,
+                JavaSparkContext sparkContext){
+            super(name, displayName, groupName, initValue, sparkContext);
         }
 
         @Override
-        public Long addInPlace(Long r1, Long r2) {
-            return r1 + r2;
+        protected AccumulatorParam<Long> createAccumulatorParam() {
+            return new LongAccumulatorParam();
+        }
+
+        private class LongAccumulatorParam implements AccumulatorParam<Long> {
+
+            @Override
+            public Long addAccumulator(Long t1, Long t2) {
+                return t1 + t2;
+            }
+
+            @Override
+            public Long addInPlace(Long r1, Long r2) {
+                return r1 + r2;
+            }
+
+            @Override
+            public Long zero(Long initialValue) {
+                return 0L;
+            }
+        }
+    }
+
+    public static class MapSparkCounter extends SparkCounter<Map<String,Long>> 
{
+
+        public MapSparkCounter(){}
+
+        public MapSparkCounter(
+                String name,
+                String displayName,
+                String groupName,
+                Map<String,Long> initValue,
+                JavaSparkContext sparkContext){
+            super(name, displayName, groupName, initValue, sparkContext);
         }
 
         @Override
-        public Long zero(Long initialValue) {
-            return 0L;
+        protected AccumulatorParam<Map<String, Long>> createAccumulatorParam() 
{
+            return new MapAccumulatorParam();
+        }
+
+        private class MapAccumulatorParam implements 
AccumulatorParam<Map<String,Long>> {
+
+            @Override
+            public Map<String, Long> addAccumulator(Map<String, Long> t1, 
Map<String, Long> t2) {
+                return addInPlace(t1, t2);
+            }
+
+            @Override
+            public Map<String, Long> addInPlace(Map<String, Long> r1, 
Map<String, Long> r2) {
+                for (String key : r2.keySet()){
+                    Long r1val = r1.get(key);
+                    Long r2val = r2.get(key);
+                    r1.put(key,r1val == null ? r2val : r1val+r2val);
+                }
+                return r1;
+            }
+
+            @Override
+            public Map<String, Long> zero(Map<String, Long> initialValue) {
+                return new HashMap<>();
+            }
         }
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
 Wed May 17 20:48:21 2017
@@ -25,12 +25,12 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
-public class SparkCounterGroup implements Serializable {
-    private String groupName;
-    private String groupDisplayName;
-    private Map<String, SparkCounter> sparkCounters;
+public abstract class SparkCounterGroup<T> implements Serializable {
+    protected String groupName;
+    protected String groupDisplayName;
+    protected Map<String, SparkCounter<T>> sparkCounters;
 
-    private transient JavaSparkContext javaSparkContext;
+    protected transient JavaSparkContext javaSparkContext;
 
     private SparkCounterGroup() {
         // For serialization.
@@ -43,13 +43,10 @@ public class SparkCounterGroup implement
         this.groupName = groupName;
         this.groupDisplayName = groupDisplayName;
         this.javaSparkContext = javaSparkContext;
-        this.sparkCounters = new HashMap<String, SparkCounter>();
+        this.sparkCounters = new HashMap<String, SparkCounter<T>>();
     }
 
-    public void createCounter(String name, long initValue) {
-        SparkCounter counter = new SparkCounter(name, name, groupName, 
initValue, javaSparkContext);
-        sparkCounters.put(name, counter);
-    }
+    public abstract void createCounter(String name, T initValue);
 
     public SparkCounter getCounter(String name) {
         return sparkCounters.get(name);
@@ -67,7 +64,35 @@ public class SparkCounterGroup implement
         this.groupDisplayName = groupDisplayName;
     }
 
-    public Map<String, SparkCounter> getSparkCounters() {
+    public Map<String, SparkCounter<T>> getSparkCounters() {
         return sparkCounters;
     }
+
+    public static class LongSparkCounterGroup extends SparkCounterGroup<Long> {
+
+        public LongSparkCounterGroup(
+                String groupName,
+                String groupDisplayName,
+                JavaSparkContext javaSparkContext) {
+            super(groupName,groupDisplayName,javaSparkContext);
+        }
+        public void createCounter(String name, Long initValue){
+            SparkCounter counter = new SparkCounter.LongSparkCounter(name, 
name, groupName, initValue, javaSparkContext);
+            sparkCounters.put(name,counter);
+        }
+    }
+
+    public static class MapSparkCounterGroup extends 
SparkCounterGroup<Map<String,Long>> {
+
+        public MapSparkCounterGroup(
+                String groupName,
+                String groupDisplayName,
+                JavaSparkContext javaSparkContext) {
+            super(groupName,groupDisplayName,javaSparkContext);
+        }
+        public void createCounter(String name, Map<String,Long> initValue){
+            SparkCounter counter = new SparkCounter.MapSparkCounter(name, 
name, groupName, initValue, javaSparkContext);
+            sparkCounters.put(name,counter);
+        }
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java 
Wed May 17 20:48:21 2017
@@ -21,13 +21,15 @@ package org.apache.pig.tools.pigstats.sp
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
+import org.apache.pig.tools.pigstats.PigWarnCounter;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
-public class SparkCounters implements Serializable {
+public class SparkCounters implements Serializable, PigWarnCounter {
     private static final long serialVersionUID = 1L;
 
     private static final Log LOG = LogFactory.getLog(SparkCounters.class);
@@ -57,7 +59,7 @@ public class SparkCounters implements Se
         createCounter(groupName, counterName, 0L);
     }
 
-    public void createCounter(String groupName, String counterName, long 
initValue) {
+    public void createCounter(String groupName, String counterName, Object 
initValue) {
         getGroup(groupName).createCounter(counterName, initValue);
     }
 
@@ -74,11 +76,11 @@ public class SparkCounters implements Se
         }
     }
 
-    public long getValue(String groupName, String counterName) {
+    public Object getValue(String groupName, String counterName) {
         SparkCounter counter = getGroup(groupName).getCounter(counterName);
         if (counter == null) {
             LOG.error(String.format("counter[%s, %s] has not initialized 
before.", groupName, counterName));
-            return 0;
+            return null;
         } else {
             return counter.getValue();
         }
@@ -95,7 +97,7 @@ public class SparkCounters implements Se
     private SparkCounterGroup getGroup(String groupName) {
         SparkCounterGroup group = sparkCounterGroups.get(groupName);
         if (group == null) {
-            group = new SparkCounterGroup(groupName, groupName, 
javaSparkContext);
+            group = new SparkCounterGroup.LongSparkCounterGroup(groupName, 
groupName, javaSparkContext);
             sparkCounterGroups.put(groupName, group);
         }
         return group;
@@ -104,4 +106,27 @@ public class SparkCounters implements Se
     public Map<String, SparkCounterGroup> getSparkCounterGroups() {
         return sparkCounterGroups;
     }
+
+
+    @Override
+    public boolean incrWarnCounter(Enum<?> name, Object incr) {
+        SparkCounter counter = getCounter(PigWarning.SPARK_WARN);
+        return _incrWarnCounter(counter, name.name(), (Long) incr);
+    }
+
+    @Override
+    public boolean incrWarnCounter(String group, String name, Object incr) {
+        SparkCounter counter = getCounter(PigWarning.SPARK_CUSTOM_WARN);
+        return _incrWarnCounter(counter, group+"::"+name, (Long) incr);
+    }
+
+    private static boolean _incrWarnCounter(SparkCounter counter, String name, 
Long incr) {
+        if (counter == null){
+            return false;
+        }
+        Map<String,Long> map = new HashMap<String,Long>();
+        map.put(name, incr);
+        counter.increment(map);
+        return true;
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
Wed May 17 20:48:21 2017
@@ -26,6 +26,7 @@ import scala.Option;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.PigWarning;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
@@ -45,6 +46,7 @@ public class SparkJobStats extends JobSt
     private boolean disableCounter;
     private Counters counters = null;
     public static String FS_COUNTER_GROUP = "FS_GROUP";
+    private Map<String, SparkCounter<Map<String, Long>>> warningCounters = 
null;
 
     protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration 
conf) {
         this(String.valueOf(jobId), plan, conf);
@@ -330,4 +332,18 @@ public class SparkJobStats extends JobSt
         fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_READ, 
PigStatsUtil.HDFS_BYTES_READ, 0);
         fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_WRITTEN, 
PigStatsUtil.HDFS_BYTES_WRITTEN, 0);
     }
+
+
+    public Map<String, SparkCounter<Map<String, Long>>> getWarningCounters() {
+        return warningCounters;
+    }
+
+    public void initWarningCounters() {
+        SparkCounters counters = 
SparkPigStatusReporter.getInstance().getCounters();
+        SparkCounterGroup<Map<String, Long>> sparkCounterGroup = 
counters.getSparkCounterGroups().get(
+                PigWarning.class.getCanonicalName());
+        if (sparkCounterGroup != null) {
+            this.warningCounters = sparkCounterGroup.getSparkCounters();
+        }
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
Wed May 17 20:48:21 2017
@@ -23,12 +23,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.PigWarning;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
@@ -76,7 +77,9 @@ public class SparkPigStats extends PigSt
         jobStats.collectStats(jobMetricsListener);
         jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
         addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, 
jobMetricsListener, conf);
+        jobStats.initWarningCounters();
         jobSparkOperatorMap.put(jobStats, sparkOperator);
+
         jobPlan.add(jobStats);
     }
 
@@ -110,7 +113,35 @@ public class SparkPigStats extends PigSt
     }
 
     private void display() {
-       LOG.info(getDisplayString());
+        LOG.info(getDisplayString());
+        handleAggregateWarnings();
+    }
+
+    private void handleAggregateWarnings() {
+        Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
+
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            SparkJobStats js = (SparkJobStats) iter.next();
+            Map<String, SparkCounter<Map<String,Long>>> counterMap = 
js.getWarningCounters();
+            if (counterMap == null) {
+                continue;
+            }
+            Map<String, Long> warningCounters = 
counterMap.get(PigWarning.SPARK_WARN.name()).getValue();
+            if (warningCounters == null) {
+                continue;
+            }
+            for (String warnKey : warningCounters.keySet()) {
+                Long val = warningAggMap.get(warnKey);
+                if (val != null) {
+                    val += (Long)warningCounters.get(warnKey);
+                } else {
+                    val = (Long)warningCounters.get(warnKey);
+                }
+                warningAggMap.put(PigWarning.valueOf(warnKey), val);
+            }
+        }
+        CompilationMessageCollector.logAggregate(warningAggMap, 
CompilationMessageCollector.MessageType.Warning, LOG);
     }
 
     @Override
@@ -218,5 +249,4 @@ public class SparkPigStats extends PigSt
 
         sparkOperatorsSet.add(sparkOperator);
     }
-
 }

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
Wed May 17 20:48:21 2017
@@ -97,13 +97,23 @@ public class SparkStatsUtil {
 
     public static long getRecordCount(POStore store) {
         SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
-        return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, 
getCounterName(store));
+        Object value = 
reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, 
getCounterName(store));
+        if (value == null) {
+            return 0L;
+        } else {
+            return (Long)value;
+        }
     }
 
     public static long getRecordCount(POLoad load) {
         SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
         int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan());
-        return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, 
getCounterName(load))/loadersCount;
+        Object value = 
reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, 
getCounterName(load));
+        if (value == null) {
+            return 0L;
+        } else {
+            return (Long)value/loadersCount;
+        }
     }
 
     private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan 
pp){

Modified: pig/branches/spark/test/e2e/pig/tests/nightly.conf
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/nightly.conf?rev=1795451&r1=1795450&r2=1795451&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/nightly.conf Wed May 17 20:48:21 2017
@@ -6016,8 +6016,9 @@ store a into ':OUTPATH:';\,
                         fs -rm :INPATH:/singlefile/names.txt#
                         },
                         {
-                # Custom Hive UDF and MapredContext
+                # Custom Hive UDF and MapredContext - disabled for Spark: see 
PIG-5234
                 'num' => 7,
+                'execonly' => 'mapred,tez',
                 'pig' => q\set mapred.max.split.size '100000000'
                         register :FUNCPATH:/testudf.jar;
                         define DummyContextUDF 
HiveUDF('org.apache.pig.test.udf.evalfunc.DummyContextUDF');


Reply via email to