Author: xuefu
Date: Fri Aug 14 02:27:21 2015
New Revision: 1695812

URL: http://svn.apache.org/r1695812
Log:
PIVE-4645: Support hadoop-like Counter using spark accumulator (Xianda via 
Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java

Added: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1695812&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java 
(added)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java 
Fri Aug 14 02:27:21 2015
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+
+import java.io.Serializable;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkCounter implements Serializable {
+
+    private String name;
+    private String displayName;
+    private Accumulator<Long> accumulator;
+
+    public SparkCounter() {
+        // For serialization.
+    }
+
+    public SparkCounter(
+            String name,
+            String displayName,
+            String groupName,
+            long initValue,
+            JavaSparkContext sparkContext) {
+
+        this.name = name;
+        this.displayName = displayName;
+        LongAccumulatorParam longAccumulatorParam = new LongAccumulatorParam();
+        String accumulatorName = groupName + "_" + name;
+        this.accumulator = sparkContext.accumulator(initValue, 
accumulatorName, longAccumulatorParam);
+    }
+
+    public long getValue() {
+        if (accumulator != null) {
+            return accumulator.value();
+        } else {
+            return 0L;
+        }
+    }
+
+    public void increment(long incr) {
+        accumulator.add(incr);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    public void setDisplayName(String displayName) {
+        this.displayName = displayName;
+    }
+
+    class LongAccumulatorParam implements AccumulatorParam<Long> {
+
+        @Override
+        public Long addAccumulator(Long t1, Long t2) {
+            return t1 + t2;
+        }
+
+        @Override
+        public Long addInPlace(Long r1, Long r2) {
+            return r1 + r2;
+        }
+
+        @Override
+        public Long zero(Long initialValue) {
+            return 0L;
+        }
+    }
+
+}

Added: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java?rev=1695812&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
 Fri Aug 14 02:27:21 2015
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkCounterGroup implements Serializable {
+    private String groupName;
+    private String groupDisplayName;
+    private Map<String, SparkCounter> sparkCounters;
+
+    private transient JavaSparkContext javaSparkContext;
+
+    private SparkCounterGroup() {
+        // For serialization.
+    }
+
+    public SparkCounterGroup(
+            String groupName,
+            String groupDisplayName,
+            JavaSparkContext javaSparkContext) {
+        this.groupName = groupName;
+        this.groupDisplayName = groupDisplayName;
+        this.javaSparkContext = javaSparkContext;
+        this.sparkCounters = new HashMap<String, SparkCounter>();
+    }
+
+    public void createCounter(String name, long initValue) {
+        SparkCounter counter = new SparkCounter(name, name, groupName, 
initValue, javaSparkContext);
+        sparkCounters.put(name, counter);
+    }
+
+    public SparkCounter getCounter(String name) {
+        return sparkCounters.get(name);
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public String getGroupDisplayName() {
+        return groupDisplayName;
+    }
+
+    public void setGroupDisplayName(String groupDisplayName) {
+        this.groupDisplayName = groupDisplayName;
+    }
+
+    public Map<String, SparkCounter> getSparkCounters() {
+        return sparkCounters;
+    }
+}

Added: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java?rev=1695812&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java 
(added)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java 
Fri Aug 14 02:27:21 2015
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkCounters implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = LogFactory.getLog(SparkCounters.class);
+
+    private Map<String, SparkCounterGroup> sparkCounterGroups;
+
+    private final transient JavaSparkContext javaSparkContext;
+
+    private SparkCounters() {
+        this(null);
+    }
+
+    public SparkCounters(JavaSparkContext javaSparkContext) {
+        this.javaSparkContext = javaSparkContext;
+        this.sparkCounterGroups = new HashMap<String, SparkCounterGroup>();
+    }
+
+    public void createCounter(Enum<?> key) {
+        createCounter(key.getDeclaringClass().getName(), key.name());
+    }
+
+    public void createCounter(String groupName, Enum<?> key) {
+        createCounter(groupName, key.name(), 0L);
+    }
+
+    public void createCounter(String groupName, String counterName) {
+        createCounter(groupName, counterName, 0L);
+    }
+
+    public void createCounter(String groupName, String counterName, long 
initValue) {
+        getGroup(groupName).createCounter(counterName, initValue);
+    }
+
+    public void increment(Enum<?> key, long incrValue) {
+        increment(key.getDeclaringClass().getName(), key.name(), incrValue);
+    }
+
+    public void increment(String groupName, String counterName, long value) {
+        SparkCounter counter = getGroup(groupName).getCounter(counterName);
+        if (counter == null) {
+            LOG.error(String.format("counter[%s, %s] has not initialized 
before.", groupName, counterName));
+        } else {
+            counter.increment(value);
+        }
+    }
+
+    public long getValue(String groupName, String counterName) {
+        SparkCounter counter = getGroup(groupName).getCounter(counterName);
+        if (counter == null) {
+            LOG.error(String.format("counter[%s, %s] has not initialized 
before.", groupName, counterName));
+            return 0;
+        } else {
+            return counter.getValue();
+        }
+    }
+
+    public SparkCounter getCounter(String groupName, String counterName) {
+        return getGroup(groupName).getCounter(counterName);
+    }
+
+    public SparkCounter getCounter(Enum<?> key) {
+        return getCounter(key.getDeclaringClass().getName(), key.name());
+    }
+
+    private SparkCounterGroup getGroup(String groupName) {
+        SparkCounterGroup group = sparkCounterGroups.get(groupName);
+        if (group == null) {
+            group = new SparkCounterGroup(groupName, groupName, 
javaSparkContext);
+            sparkCounterGroups.put(groupName, group);
+        }
+        return group;
+    }
+
+    public Map<String, SparkCounterGroup> getSparkCounterGroups() {
+        return sparkCounterGroups;
+    }
+}


Reply via email to