Updated Branches: refs/heads/master 37a73f103 -> 8d1886273
CRUNCH-235. Avoid exposing incompatible Hadoop classes in Crunch API. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8d188627 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8d188627 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8d188627 Branch: refs/heads/master Commit: 8d18862738defb48c4064dc72cc1a2128a4d35c4 Parents: 37a73f1 Author: Tom White <[email protected]> Authored: Tue Jul 9 17:10:54 2013 +0100 Committer: Tom White <[email protected]> Committed: Tue Jul 9 17:10:54 2013 +0100 ---------------------------------------------------------------------- .../org/apache/crunch/contrib/text/Parse.java | 4 +- .../apache/crunch/StageResultsCountersIT.java | 11 ++--- .../src/main/java/org/apache/crunch/DoFn.java | 22 +++++++++- .../java/org/apache/crunch/PipelineResult.java | 46 +++++++++++++++++++- .../org/apache/crunch/test/CountersTest.java | 6 +-- .../crunch/examples/AverageBytesByIP.java | 4 +- .../crunch/examples/SecondarySortExample.java | 4 +- 7 files changed, 79 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8d188627/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java index a1c610b..9bbc231 100644 --- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java @@ -118,10 +118,10 @@ public final class Parse { public void cleanup(Emitter<T> emitter) { if (getContext() != null) { ExtractorStats stats = extractor.getStats(); - getCounter(groupName, "OVERALL_ERRORS").increment(stats.getErrorCount()); + increment(groupName, "OVERALL_ERRORS", stats.getErrorCount()); List<Integer> fieldErrors = stats.getFieldErrors(); for (int i = 0; i < fieldErrors.size(); i++) { - getCounter(groupName, "ERRORS_FOR_FIELD_" + i).increment(fieldErrors.get(i)); + increment(groupName, "ERRORS_FOR_FIELD_" + i, fieldErrors.get(i)); } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8d188627/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java index 19fc302..ffcc931 100644 --- a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java @@ -21,7 +21,6 @@ import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -35,7 +34,6 @@ import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.writable.WritableTypeFamily; -import org.apache.hadoop.mapreduce.Counter; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -107,7 +105,7 @@ public class StageResultsCountersIT { for (String token : tokens) { if (SPECIAL_KEYWORDS.contains(token)) { - getCounter(KEYWORDS_COUNTER_GROUP, token).increment(1); + increment(KEYWORDS_COUNTER_GROUP, token); } } } @@ -122,10 +120,9 @@ public class StageResultsCountersIT { Map<String, Long> countersMap = Maps.newHashMap(); for (StageResult sr : stages) { - Iterator<Counter> iterator = sr.getCounters().getGroup(counterGroupName).iterator(); - while (iterator.hasNext()) { - Counter counter = (Counter) iterator.next(); - countersMap.put(counter.getDisplayName(), counter.getValue()); + for (String counterName : sr.getCounterNames().get(counterGroupName)) { + countersMap.put(sr.getCounterDisplayName(counterGroupName, counterName), + sr.getCounterValue(counterGroupName, counterName)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8d188627/crunch-core/src/main/java/org/apache/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java index 2c6389a..6da89ef 100644 --- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java @@ -127,20 +127,40 @@ public abstract class DoFn<S, T> implements Serializable { return context.getConfiguration(); } + /** + * @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2 + * (from a class to an interface) so user programs should avoid this method and use + * one of the <code>increment</code> methods instead, such as {@link #increment(Enum)}. + */ + @Deprecated protected Counter getCounter(Enum<?> counterName) { return context.getCounter(counterName); } + /** + * @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2 + * (from a class to an interface) so user programs should avoid this method and use + * one of the <code>increment</code> methods instead, such as {@link #increment(Enum)}. + */ + @Deprecated protected Counter getCounter(String groupName, String counterName) { return context.getCounter(groupName, counterName); } + protected void increment(String groupName, String counterName) { + increment(groupName, counterName, 1); + } + + protected void increment(String groupName, String counterName, long value) { + context.getCounter(groupName, counterName).increment(value); + } + protected void increment(Enum<?> counterName) { increment(counterName, 1); } protected void increment(Enum<?> counterName, long value) { - getCounter(counterName).increment(value); + context.getCounter(counterName).increment(value); } protected void progress() { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d188627/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java index 71a05e2..74a073f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java +++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java @@ -18,7 +18,12 @@ package org.apache.crunch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; import java.util.List; @@ -44,16 +49,55 @@ public class PipelineResult { return stageName; } + /** + * @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2 + * (from a class to an interface) so user programs should avoid this method and use + * {@link #getCounterNames()}. + */ + @Deprecated public Counters getCounters() { return counters; } + /** + * @return a map of group names to counter names. + */ + public Map<String, Set<String>> getCounterNames() { + Map<String, Set<String>> names = Maps.newHashMap(); + for (CounterGroup counterGroup : counters) { + Set<String> counterNames = Sets.newHashSet(); + for (Counter counter : counterGroup) { + counterNames.add(counter.getName()); + } + names.put(counterGroup.getName(), counterNames); + } + return names; + } + + /** + * @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2 + * (from a class to an interface) so user programs should avoid this method and use + * {@link #getCounterValue(Enum)} and/or {@link #getCounterDisplayName(Enum)}. + */ + @Deprecated public Counter findCounter(Enum<?> key) { return counters.findCounter(key); } + public long getCounterValue(String groupName, String counterName) { + return counters.findCounter(groupName, counterName).getValue(); + } + + public String getCounterDisplayName(String groupName, String counterName) { + return counters.findCounter(groupName, counterName).getDisplayName(); + } + public long getCounterValue(Enum<?> key) { - return findCounter(key).getValue(); + return counters.findCounter(key).getValue(); + } + + public String getCounterDisplayName(Enum<?> key) { + return counters.findCounter(key).getDisplayName(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8d188627/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java b/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java index 66f854e..e144cb9 100644 --- a/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java @@ -42,9 +42,9 @@ public class CountersTest { @Override public void process(String input, Emitter<String> emitter) { - getCounter(CT.ONE).increment(1); - getCounter(CT.TWO).increment(4); - getCounter(CT.THREE).increment(7); + increment(CT.ONE, 1); + increment(CT.TWO, 4); + increment(CT.THREE, 7); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8d188627/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java index a9e8d1b..166a1e0 100644 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java @@ -124,10 +124,10 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable { String remoteAddr = matcher.group(1); emitter.emit(Pair.of(remoteAddr, sumCount)); } catch (NumberFormatException e) { - this.getCounter(COUNTERS.CORRUPT_SIZE).increment(1); + this.increment(COUNTERS.CORRUPT_SIZE); } } else { - this.getCounter(COUNTERS.NO_MATCH).increment(1); + this.increment(COUNTERS.NO_MATCH); } } }; http://git-wip-us.apache.org/repos/asf/crunch/blob/8d188627/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java index 998bd7f..817b6f3 100644 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java @@ -101,7 +101,7 @@ public class SecondarySortExample extends Configured implements Tool, Serializab timestamp = Long.parseLong(element); } catch (NumberFormatException e) { System.out.println("Timestamp not in long format '" + line + "'"); - this.getCounter(COUNTERS.CORRUPT_TIMESTAMP).increment(1); + this.increment(COUNTERS.CORRUPT_TIMESTAMP); } break; case 3: @@ -116,7 +116,7 @@ public class SecondarySortExample extends Configured implements Tool, Serializab Long sortby = new Long(timestamp); emitter.emit(Pair.of(key, Pair.of(sortby, value))); } else { - this.getCounter(COUNTERS.CORRUPT_LINE).increment(1); + this.increment(COUNTERS.CORRUPT_LINE); } }}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), Avros.strings())));
