Author: knoguchi
Date: Tue May 14 20:35:53 2024
New Revision: 1917722

URL: http://svn.apache.org/viewvc?rev=1917722&view=rev
Log:
PIG-5438: Update SparkCounter.Accumulator to AccumulatorV2 (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1917722&r1=1917721&r2=1917722&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May 14 20:35:53 2024
@@ -23,6 +23,7 @@ Trunk (unreleased changes)
 INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
+PIG-5438: Update SparkCounter.Accumulator to AccumulatorV2 (knoguchi)
  
 OPTIMIZATIONS
  

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1917722&r1=1917721&r2=1917722&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java Tue May 
14 20:35:53 2024
@@ -23,15 +23,15 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.spark.Accumulator;
-import org.apache.spark.AccumulatorParam;
+import org.apache.spark.util.AccumulatorV2;
+import org.apache.spark.util.LongAccumulator;
 import org.apache.spark.api.java.JavaSparkContext;
 
 public abstract class SparkCounter<T> implements Serializable {
 
     private String name;
     private String displayName;
-    private Accumulator<T> accumulator;
+    private AccumulatorV2<T,T> accumulator;
 
     public SparkCounter() {
         // For serialization.
@@ -52,14 +52,11 @@ public abstract class SparkCounter<T> im
         if (sparkContext == null){
             //Spark executors can register new Accumulators but they won't 
make it back to the driver hence the limitation
             throw new  RuntimeException("Not allowed to create SparkCounter on 
backend executor.");
-
         }
-        this.accumulator = sparkContext.accumulator(initValue, 
accumulatorName,  createAccumulatorParam());
-
+        accumulator = getAccumulator(); 
+        sparkContext.sc().register(accumulator, accumulatorName);
     }
 
-    protected abstract AccumulatorParam<T> createAccumulatorParam();
-
     public T getValue() {
         if (accumulator != null) {
             return accumulator.value();
@@ -84,6 +81,8 @@ public abstract class SparkCounter<T> im
         this.displayName = displayName;
     }
 
+    abstract public AccumulatorV2<T,T> getAccumulator();
+
     public static class LongSparkCounter extends SparkCounter<Long> {
 
         public LongSparkCounter(){}
@@ -98,26 +97,8 @@ public abstract class SparkCounter<T> im
         }
 
         @Override
-        protected AccumulatorParam<Long> createAccumulatorParam() {
-            return new LongAccumulatorParam();
-        }
-
-        private class LongAccumulatorParam implements AccumulatorParam<Long> {
-
-            @Override
-            public Long addAccumulator(Long t1, Long t2) {
-                return t1 + t2;
-            }
-
-            @Override
-            public Long addInPlace(Long r1, Long r2) {
-                return r1 + r2;
-            }
-
-            @Override
-            public Long zero(Long initialValue) {
-                return 0L;
-            }
+        public AccumulatorV2<Long,Long> getAccumulator() {
+          return new LongAccumulator();
         }
     }
 
@@ -134,31 +115,63 @@ public abstract class SparkCounter<T> im
             super(name, displayName, groupName, initValue, sparkContext);
         }
 
+
         @Override
-        protected AccumulatorParam<Map<String, Long>> createAccumulatorParam() 
{
-            return new MapAccumulatorParam();
+        public AccumulatorV2<Map<String,Long>, Map<String,Long>> 
getAccumulator() {
+            return new MapAccumulator();
         }
 
-        private class MapAccumulatorParam implements 
AccumulatorParam<Map<String,Long>> {
+
+        private class MapAccumulator extends AccumulatorV2<Map<String,Long>, 
Map<String,Long>> {
+
+            private Map<String, Long> map;
+
+            public MapAccumulator() {
+                this(new HashMap<String, Long>());
+            }
+
+            public MapAccumulator(Map <String,Long> in) {
+                map = in;
+            }
 
             @Override
-            public Map<String, Long> addAccumulator(Map<String, Long> t1, 
Map<String, Long> t2) {
-                return addInPlace(t1, t2);
+            public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
+                return new MapAccumulator(value());
             }
 
             @Override
-            public Map<String, Long> addInPlace(Map<String, Long> r1, 
Map<String, Long> r2) {
+            public void add(Map<String,Long> r2) {
                 for (String key : r2.keySet()){
-                    Long r1val = r1.get(key);
+                    Long r1val = map.get(key);
                     Long r2val = r2.get(key);
-                    r1.put(key,r1val == null ? r2val : r1val+r2val);
+                    map.put(key,r1val == null ? r2val : r1val+r2val);
+                }
+                return;
+            }
+
+            @Override
+            public boolean isZero() {
+                return map == null || map.isEmpty();
+            }
+
+            @Override
+            public void merge(AccumulatorV2<Map<String,Long>,Map<String,Long>> 
other) {
+                add(other.value());
+            }
+
+            @Override
+            public void reset() {
+                if( map != null ) {
+                    map.clear();
+                } else {
+                    map = new HashMap<String, Long> ();
                 }
-                return r1;
             }
 
+
             @Override
-            public Map<String, Long> zero(Map<String, Long> initialValue) {
-                return new HashMap<>();
+            public Map<String, Long> value() {
+                return map;
             }
         }
     }


Reply via email to