Updated Branches: refs/heads/master 9a689d1ed -> a7002c117
CRUNCH-292: Hack around job counter limits in Hadoop-2 for in-memory pipelines Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a7002c11 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a7002c11 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a7002c11 Branch: refs/heads/master Commit: a7002c1178ae9e8dd522d029c96d80ba7d616df8 Parents: 9a689d1 Author: Josh Wills <[email protected]> Authored: Fri Nov 1 10:04:21 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Fri Nov 1 13:50:12 2013 -0700 ---------------------------------------------------------------------- .../apache/crunch/impl/mem/CountersWrapper.java | 127 +++++++++++++++++++ .../org/apache/crunch/impl/mem/MemPipeline.java | 4 +- .../apache/crunch/impl/mem/CountersTest.java | 54 ++++++++ 3 files changed, 183 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java new file mode 100644 index 0000000..7fe893c --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java @@ -0,0 +1,127 @@ +/** + * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.crunch.impl.mem; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; + +import javax.annotation.Nullable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +class CountersWrapper extends Counters { + + private Counters active; + private final Map<String, Map<String, Counter>> lookupCache = Maps.newHashMap(); + private Set<Counters> allCounters = Sets.newHashSet(); + + CountersWrapper() { + this.active = new Counters(); + allCounters.add(active); + } + + CountersWrapper(org.apache.hadoop.mapred.Counters counters) { + this.active = new Counters(counters); + allCounters.add(active); + } + + @Override + public Counter findCounter(String groupName, String counterName) { + Map<String, Counter> c = lookupCache.get(groupName); + if (c == null) { + c = Maps.newHashMap(); + lookupCache.put(groupName, c); + } + Counter counter = c.get(counterName); + if (counter == null) { + try { + counter = active.findCounter(groupName, counterName); + } catch (Exception e) { + // Recover from this by creating a new active instance + active = new Counters(); + allCounters.add(active); + counter = active.findCounter(groupName, counterName); + } + c.put(counterName, counter); + } + return counter; + } + + @Override + public synchronized Counter findCounter(Enum<?> key) { + return findCounter(key.getClass().getName(), key.name()); + } + + @Override + public synchronized Collection<String> getGroupNames() { + return lookupCache.keySet(); + } + + @Override + public Iterator<CounterGroup> iterator() { + return Iterators.concat(Iterables.transform(allCounters, new Function<Counters, Iterator<CounterGroup>>() { + @Override + public Iterator<CounterGroup> apply(@Nullable Counters input) { + return input.iterator(); + } + }).iterator()); + } + + @Override + public synchronized CounterGroup getGroup(String groupName) { + if (allCounters.size() == 1) { + return active.getGroup(groupName); + } else { + throw new UnsupportedOperationException( + "CounterWrapper cannot return CounterGroup when there are too many Counters"); + } + } + + public synchronized void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("CountersWrapper may not be written"); + } + + public synchronized void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("CountersWrapper may not be read"); + } + + @Override + public synchronized int countCounters() { + int cntrs = 0; + for (Counters c : allCounters) { + cntrs += c.countCounters(); + } + return cntrs; + } + + public synchronized void incrAllCounters(Counters other) { + for (CounterGroup cg : other) { + for (Counter c : cg) { + findCounter(cg.getName(), c.getName()).increment(c.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index cc9ad69..5c0f6b0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -67,7 +67,7 @@ import com.google.common.collect.Sets; public class MemPipeline implements Pipeline { private static final Log LOG = LogFactory.getLog(MemPipeline.class); - private static Counters COUNTERS = new Counters(); + private static Counters COUNTERS = new CountersWrapper(); private static final MemPipeline INSTANCE = new MemPipeline(); private int outputIndex = 0; @@ -77,7 +77,7 @@ public class MemPipeline implements Pipeline { } public static void clearCounters() { - COUNTERS = new Counters(); + COUNTERS = new CountersWrapper(); } public static Pipeline getInstance() { http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java new file mode 100644 index 0000000..458ecc7 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java @@ -0,0 +1,54 @@ +/** + * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.crunch.impl.mem; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mem.collect.MemCollection; +import org.apache.crunch.types.writable.Writables; +import org.junit.Test; + +import java.util.Arrays; + +public class CountersTest { + + @Test + public void counterTest() throws Exception { + Pipeline pipeline = MemPipeline.getInstance(); + + // Single row PCollection. + PCollection<String> objects = MemPipeline.collectionOf(Arrays.asList(new String[]{"hello world"})); + System.out.println("Objects: " + ((MemCollection) objects).getCollection()); + + // Counter creating Map. + PCollection<String> objects2 = objects.parallelDo("Create counters", + new MapFn<String, String>() { + @Override + public String map(String input) { + for(int i = 0; i < 200; ++i) { + this.increment("testCounter", String.valueOf(i)); + } + return input; + } + }, + Writables.strings() + ); + + // Run it! + pipeline.done(); + System.out.println("Objects2: " + ((MemCollection) objects2).getCollection()); + } +}
