Author: xuefu Date: Fri Aug 14 02:27:21 2015 New Revision: 1695812 URL: http://svn.apache.org/r1695812 Log: PIVE-4645: Support hadoop-like Counter using spark accumulator (Xianda via Xuefu)
Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1695812&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java (added) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java Fri Aug 14 02:27:21 2015 @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + + +import java.io.Serializable; + +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.api.java.JavaSparkContext; + +public class SparkCounter implements Serializable { + + private String name; + private String displayName; + private Accumulator<Long> accumulator; + + public SparkCounter() { + // For serialization. + } + + public SparkCounter( + String name, + String displayName, + String groupName, + long initValue, + JavaSparkContext sparkContext) { + + this.name = name; + this.displayName = displayName; + LongAccumulatorParam longAccumulatorParam = new LongAccumulatorParam(); + String accumulatorName = groupName + "_" + name; + this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longAccumulatorParam); + } + + public long getValue() { + if (accumulator != null) { + return accumulator.value(); + } else { + return 0L; + } + } + + public void increment(long incr) { + accumulator.add(incr); + } + + public String getName() { + return name; + } + + public String getDisplayName() { + return displayName; + } + + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + + class LongAccumulatorParam implements AccumulatorParam<Long> { + + @Override + public Long addAccumulator(Long t1, Long t2) { + return t1 + t2; + } + + @Override + public Long addInPlace(Long r1, Long r2) { + return r1 + r2; + } + + @Override + public Long zero(Long initialValue) { + return 0L; + } + } + +} Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java?rev=1695812&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java (added) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java Fri Aug 14 02:27:21 2015 @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + + +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class SparkCounterGroup implements Serializable { + private String groupName; + private String groupDisplayName; + private Map<String, SparkCounter> sparkCounters; + + private transient JavaSparkContext javaSparkContext; + + private SparkCounterGroup() { + // For serialization. + } + + public SparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + this.groupName = groupName; + this.groupDisplayName = groupDisplayName; + this.javaSparkContext = javaSparkContext; + this.sparkCounters = new HashMap<String, SparkCounter>(); + } + + public void createCounter(String name, long initValue) { + SparkCounter counter = new SparkCounter(name, name, groupName, initValue, javaSparkContext); + sparkCounters.put(name, counter); + } + + public SparkCounter getCounter(String name) { + return sparkCounters.get(name); + } + + public String getGroupName() { + return groupName; + } + + public String getGroupDisplayName() { + return groupDisplayName; + } + + public void setGroupDisplayName(String groupDisplayName) { + this.groupDisplayName = groupDisplayName; + } + + public Map<String, SparkCounter> getSparkCounters() { + return sparkCounters; + } +} Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java?rev=1695812&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java (added) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java Fri Aug 14 02:27:21 2015 @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class SparkCounters implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(SparkCounters.class); + + private Map<String, SparkCounterGroup> sparkCounterGroups; + + private final transient JavaSparkContext javaSparkContext; + + private SparkCounters() { + this(null); + } + + public SparkCounters(JavaSparkContext javaSparkContext) { + this.javaSparkContext = javaSparkContext; + this.sparkCounterGroups = new HashMap<String, SparkCounterGroup>(); + } + + public void createCounter(Enum<?> key) { + createCounter(key.getDeclaringClass().getName(), key.name()); + } + + public void createCounter(String groupName, Enum<?> key) { + createCounter(groupName, key.name(), 0L); + } + + public void createCounter(String groupName, String counterName) { + createCounter(groupName, counterName, 0L); + } + + public void createCounter(String groupName, String counterName, long initValue) { + getGroup(groupName).createCounter(counterName, initValue); + } + + public void increment(Enum<?> key, long incrValue) { + increment(key.getDeclaringClass().getName(), key.name(), incrValue); + } + + public void increment(String groupName, String counterName, long value) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + LOG.error(String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + } else { + counter.increment(value); + } + } + + public long getValue(String groupName, String counterName) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + LOG.error(String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + return 0; + } else { + return counter.getValue(); + } + } + + public SparkCounter getCounter(String groupName, String counterName) { + return getGroup(groupName).getCounter(counterName); + } + + public SparkCounter getCounter(Enum<?> key) { + return getCounter(key.getDeclaringClass().getName(), key.name()); + } + + private SparkCounterGroup getGroup(String groupName) { + SparkCounterGroup group = sparkCounterGroups.get(groupName); + if (group == null) { + group = new SparkCounterGroup(groupName, groupName, javaSparkContext); + sparkCounterGroups.put(groupName, group); + } + return group; + } + + public Map<String, SparkCounterGroup> getSparkCounterGroups() { + return sparkCounterGroups; + } +}