Updated Branches: refs/heads/master 222dd76ac -> cbc7c2fb3
CRUNCH-192 Enforce single use of Reducer Iterables Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/cbc7c2fb Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/cbc7c2fb Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/cbc7c2fb Branch: refs/heads/master Commit: cbc7c2fb30ad0486e7ec60656c079c81e41eda2c Parents: 222dd76 Author: Gabriel Reid <[email protected]> Authored: Fri Apr 5 22:25:19 2013 +0200 Committer: Gabriel Reid <[email protected]> Committed: Thu Apr 11 16:17:52 2013 +0200 ---------------------------------------------------------------------- .../apache/crunch/IterableReuseProtectionIT.java | 89 +++++++++++++++ .../src/main/java/org/apache/crunch/CombineFn.java | 5 +- .../org/apache/crunch/impl/SingleUseIterable.java | 49 ++++++++ .../apache/crunch/impl/mem/collect/Shuffler.java | 3 +- .../apache/crunch/impl/mr/run/CrunchReducer.java | 2 + .../apache/crunch/impl/SingleUseIterableTest.java | 54 +++++++++ 6 files changed, 200 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java b/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java new file mode 100644 index 0000000..da487eb --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * Verify that calling the iterator method on a Reducer-based Iterable + * is forcefully disallowed. + */ +public class IterableReuseProtectionIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + + public void checkIteratorReuse(Pipeline pipeline) throws IOException { + Iterable<String> values = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")) + .by(IdentityFn.<String>getInstance(), Writables.strings()) + .groupByKey() + .combineValues(new TestIterableReuseFn()) + .values().materialize(); + + List<String> valueList = Lists.newArrayList(values); + Collections.sort(valueList); + assertEquals(Lists.newArrayList("a", "b", "c", "e"), valueList); + } + + @Test + public void testIteratorReuse_MRPipeline() throws IOException { + checkIteratorReuse(new MRPipeline(IterableReuseProtectionIT.class, tmpDir.getDefaultConfiguration())); + } + + @Test + public void testIteratorReuse_InMemoryPipeline() throws IOException { + checkIteratorReuse(MemPipeline.getInstance()); + } + + static class TestIterableReuseFn extends CombineFn<String, String> { + + @Override + public void process(Pair<String, Iterable<String>> input, Emitter<Pair<String, String>> emitter) { + StringBuilder combinedBuilder = new StringBuilder(); + for (String v : input.second()) { + combinedBuilder.append(v); + } + + try { + input.second().iterator(); + throw new RuntimeException("Second call to iterator should throw an exception"); + } catch (IllegalStateException e) { + // Expected situation + } + emitter.emit(Pair.of(input.first(), combinedBuilder.toString())); + } + + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/CombineFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java index c42e48f..71e8057 100644 --- a/crunch/src/main/java/org/apache/crunch/CombineFn.java +++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java @@ -38,7 +38,10 @@ import com.google.common.collect.Sets; * {@link PGroupedTable}, the function will be applied to the output of the map * stage before the data is passed to the reducer, which can improve the runtime * of certain classes of jobs. - * + * <p> + * Note that the incoming {@code Iterable} can only be used to create an + * {@code Iterator} once. Calling {@link Iterable#iterator()} method a second + * time will throw an {@link IllegalStateException}. */ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> { http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java b/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java new file mode 100644 index 0000000..98f982f --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl; + +import java.util.Iterator; + +/** + * Wrapper around a Reducer's input Iterable. Ensures that the + * {@link #iterator()} method is not called more than once. + */ +public class SingleUseIterable<T> implements Iterable<T> { + + private boolean used = false; + private Iterable<T> wrappedIterable; + + /** + * Instantiate around an Iterable that may only be used once. + * + * @param toWrap iterable to wrap + */ + public SingleUseIterable(Iterable<T> toWrap) { + this.wrappedIterable = toWrap; + } + + @Override + public Iterator<T> iterator() { + if (used) { + throw new IllegalStateException("iterator() can only be called once on this Iterable"); + } + used = true; + return wrappedIterable.iterator(); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java index afc04c3..2e8f9eb 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java @@ -29,6 +29,7 @@ import java.util.TreeMap; import org.apache.crunch.GroupingOptions; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.SingleUseIterable; import org.apache.crunch.types.PType; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.util.ReflectionUtils; @@ -75,7 +76,7 @@ abstract class Shuffler<K, V> implements Iterable<Pair<K, Iterable<V>>> { private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>, Pair<K, Iterable<V>>> { @Override public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>> input) { - return Pair.of(input.getKey(), (Iterable<V>) input.getValue()); + return Pair.<K, Iterable<V>>of(input.getKey(), new SingleUseIterable<V>(input.getValue())); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java index 12caa86..e5ddbd2 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.impl.SingleUseIterable; import org.apache.hadoop.mapreduce.Reducer; public class CrunchReducer extends Reducer<Object, Object, Object, Object> { @@ -52,6 +53,7 @@ public class CrunchReducer extends Reducer<Object, Object, Object, Object> { @Override protected void reduce(Object key, Iterable<Object> values, Reducer<Object, Object, Object, Object>.Context context) { + values = new SingleUseIterable<Object>(values); if (debug) { try { node.processIterable(key, values); http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java b/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java new file mode 100644 index 0000000..811a0a3 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class SingleUseIterableTest { + + @Test + public void testIterator() { + List<Integer> values = Lists.newArrayList(1,2,3); + + SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values); + + List<Integer> retrievedValues = Lists.newArrayList(iterable); + + assertEquals(values, retrievedValues); + } + + @Test(expected=IllegalStateException.class) + public void testIterator_MultipleCalls() { + List<Integer> values = Lists.newArrayList(1,2,3); + + SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values); + + List<Integer> retrievedValues = Lists.newArrayList(iterable); + + for (Integer n : iterable) { + + } + } + +}
