Author: knoguchi
Date: Tue May 14 20:35:53 2024
New Revision: 1917722
URL: http://svn.apache.org/viewvc?rev=1917722&view=rev
Log:
PIG-5438: Update SparkCounter.Accumulator to AccumulatorV2 (knoguchi)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1917722&r1=1917721&r2=1917722&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May 14 20:35:53 2024
@@ -23,6 +23,7 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-5438: Update SparkCounter.Accumulator to AccumulatorV2 (knoguchi)
OPTIMIZATIONS
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1917722&r1=1917721&r2=1917722&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java Tue May
14 20:35:53 2024
@@ -23,15 +23,15 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import org.apache.spark.Accumulator;
-import org.apache.spark.AccumulatorParam;
+import org.apache.spark.util.AccumulatorV2;
+import org.apache.spark.util.LongAccumulator;
import org.apache.spark.api.java.JavaSparkContext;
public abstract class SparkCounter<T> implements Serializable {
private String name;
private String displayName;
- private Accumulator<T> accumulator;
+ private AccumulatorV2<T,T> accumulator;
public SparkCounter() {
// For serialization.
@@ -52,14 +52,11 @@ public abstract class SparkCounter<T> im
if (sparkContext == null){
//Spark executors can register new Accumulators but they won't
make it back to the driver hence the limitation
throw new RuntimeException("Not allowed to create SparkCounter on
backend executor.");
-
}
- this.accumulator = sparkContext.accumulator(initValue,
accumulatorName, createAccumulatorParam());
-
+ accumulator = getAccumulator();
+ sparkContext.sc().register(accumulator, accumulatorName);
}
- protected abstract AccumulatorParam<T> createAccumulatorParam();
-
public T getValue() {
if (accumulator != null) {
return accumulator.value();
@@ -84,6 +81,8 @@ public abstract class SparkCounter<T> im
this.displayName = displayName;
}
+ abstract public AccumulatorV2<T,T> getAccumulator();
+
public static class LongSparkCounter extends SparkCounter<Long> {
public LongSparkCounter(){}
@@ -98,26 +97,8 @@ public abstract class SparkCounter<T> im
}
@Override
- protected AccumulatorParam<Long> createAccumulatorParam() {
- return new LongAccumulatorParam();
- }
-
- private class LongAccumulatorParam implements AccumulatorParam<Long> {
-
- @Override
- public Long addAccumulator(Long t1, Long t2) {
- return t1 + t2;
- }
-
- @Override
- public Long addInPlace(Long r1, Long r2) {
- return r1 + r2;
- }
-
- @Override
- public Long zero(Long initialValue) {
- return 0L;
- }
+ public AccumulatorV2<Long,Long> getAccumulator() {
+ return new LongAccumulator();
}
}
@@ -134,31 +115,63 @@ public abstract class SparkCounter<T> im
super(name, displayName, groupName, initValue, sparkContext);
}
+
@Override
- protected AccumulatorParam<Map<String, Long>> createAccumulatorParam()
{
- return new MapAccumulatorParam();
+ public AccumulatorV2<Map<String,Long>, Map<String,Long>>
getAccumulator() {
+ return new MapAccumulator();
}
- private class MapAccumulatorParam implements
AccumulatorParam<Map<String,Long>> {
+
+ private class MapAccumulator extends AccumulatorV2<Map<String,Long>,
Map<String,Long>> {
+
+ private Map<String, Long> map;
+
+ public MapAccumulator() {
+ this(new HashMap<String, Long>());
+ }
+
+ public MapAccumulator(Map <String,Long> in) {
+ map = in;
+ }
@Override
- public Map<String, Long> addAccumulator(Map<String, Long> t1,
Map<String, Long> t2) {
- return addInPlace(t1, t2);
+ public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
+ return new MapAccumulator(value());
}
@Override
- public Map<String, Long> addInPlace(Map<String, Long> r1,
Map<String, Long> r2) {
+ public void add(Map<String,Long> r2) {
for (String key : r2.keySet()){
- Long r1val = r1.get(key);
+ Long r1val = map.get(key);
Long r2val = r2.get(key);
- r1.put(key,r1val == null ? r2val : r1val+r2val);
+ map.put(key,r1val == null ? r2val : r1val+r2val);
+ }
+ return;
+ }
+
+ @Override
+ public boolean isZero() {
+ return map == null || map.isEmpty();
+ }
+
+ @Override
+ public void merge(AccumulatorV2<Map<String,Long>,Map<String,Long>>
other) {
+ add(other.value());
+ }
+
+ @Override
+ public void reset() {
+ if( map != null ) {
+ map.clear();
+ } else {
+ map = new HashMap<String, Long> ();
}
- return r1;
}
+
@Override
- public Map<String, Long> zero(Map<String, Long> initialValue) {
- return new HashMap<>();
+ public Map<String, Long> value() {
+ return map;
}
}
}