Author: knoguchi Date: Tue May 14 20:35:53 2024 New Revision: 1917722 URL: http://svn.apache.org/viewvc?rev=1917722&view=rev Log: PIG-5438: Update SparkCounter.Accumulator to AccumulatorV2 (knoguchi)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1917722&r1=1917721&r2=1917722&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue May 14 20:35:53 2024 @@ -23,6 +23,7 @@ Trunk (unreleased changes) INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-5438: Update SparkCounter.Accumulator to AccumulatorV2 (knoguchi) OPTIMIZATIONS Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1917722&r1=1917721&r2=1917722&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java Tue May 14 20:35:53 2024 @@ -23,15 +23,15 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; -import org.apache.spark.Accumulator; -import org.apache.spark.AccumulatorParam; +import org.apache.spark.util.AccumulatorV2; +import org.apache.spark.util.LongAccumulator; import org.apache.spark.api.java.JavaSparkContext; public abstract class SparkCounter<T> implements Serializable { private String name; private String displayName; - private Accumulator<T> accumulator; + private AccumulatorV2<T,T> accumulator; public SparkCounter() { // For serialization. @@ -52,14 +52,11 @@ public abstract class SparkCounter<T> im if (sparkContext == null){ //Spark executors can register new Accumulators but they won't make it back to the driver hence the limitation throw new RuntimeException("Not allowed to create SparkCounter on backend executor."); - } - this.accumulator = sparkContext.accumulator(initValue, accumulatorName, createAccumulatorParam()); - + accumulator = getAccumulator(); + sparkContext.sc().register(accumulator, accumulatorName); } - protected abstract AccumulatorParam<T> createAccumulatorParam(); - public T getValue() { if (accumulator != null) { return accumulator.value(); @@ -84,6 +81,8 @@ public abstract class SparkCounter<T> im this.displayName = displayName; } + abstract public AccumulatorV2<T,T> getAccumulator(); + public static class LongSparkCounter extends SparkCounter<Long> { public LongSparkCounter(){} @@ -98,26 +97,8 @@ public abstract class SparkCounter<T> im } @Override - protected AccumulatorParam<Long> createAccumulatorParam() { - return new LongAccumulatorParam(); - } - - private class LongAccumulatorParam implements AccumulatorParam<Long> { - - @Override - public Long addAccumulator(Long t1, Long t2) { - return t1 + t2; - } - - @Override - public Long addInPlace(Long r1, Long r2) { - return r1 + r2; - } - - @Override - public Long zero(Long initialValue) { - return 0L; - } + public AccumulatorV2<Long,Long> getAccumulator() { + return new LongAccumulator(); } } @@ -134,31 +115,63 @@ public abstract class SparkCounter<T> im super(name, displayName, groupName, initValue, sparkContext); } + @Override - protected AccumulatorParam<Map<String, Long>> createAccumulatorParam() { - return new MapAccumulatorParam(); + public AccumulatorV2<Map<String,Long>, Map<String,Long>> getAccumulator() { + return new MapAccumulator(); } - private class MapAccumulatorParam implements AccumulatorParam<Map<String,Long>> { + + private class MapAccumulator extends AccumulatorV2<Map<String,Long>, Map<String,Long>> { + + private Map<String, Long> map; + + public MapAccumulator() { + this(new HashMap<String, Long>()); + } + + public MapAccumulator(Map <String,Long> in) { + map = in; + } @Override - public Map<String, Long> addAccumulator(Map<String, Long> t1, Map<String, Long> t2) { - return addInPlace(t1, t2); + public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() { + return new MapAccumulator(value()); } @Override - public Map<String, Long> addInPlace(Map<String, Long> r1, Map<String, Long> r2) { + public void add(Map<String,Long> r2) { for (String key : r2.keySet()){ - Long r1val = r1.get(key); + Long r1val = map.get(key); Long r2val = r2.get(key); - r1.put(key,r1val == null ? r2val : r1val+r2val); + map.put(key,r1val == null ? r2val : r1val+r2val); + } + return; + } + + @Override + public boolean isZero() { + return map == null || map.isEmpty(); + } + + @Override + public void merge(AccumulatorV2<Map<String,Long>,Map<String,Long>> other) { + add(other.value()); + } + + @Override + public void reset() { + if( map != null ) { + map.clear(); + } else { + map = new HashMap<String, Long> (); } - return r1; } + @Override - public Map<String, Long> zero(Map<String, Long> initialValue) { - return new HashMap<>(); + public Map<String, Long> value() { + return map; } } }