Updated Branches: refs/heads/master 98458852a -> fd957a920
CRUNCH-248: Fix exception masking issue in CrunchReducer caused by SingleUseIterable Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fd957a92 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fd957a92 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fd957a92 Branch: refs/heads/master Commit: fd957a9202ab2c72b835d64d9d1b08c3b5d71c85 Parents: 9845885 Author: Josh Wills <[email protected]> Authored: Wed Aug 7 06:47:03 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Wed Aug 7 06:47:03 2013 -0700 ---------------------------------------------------------------------- .../crunch/SingleUseIterableExceptionIT.java | 70 ++++++++++++++++++++ .../src/it/java/org/apache/crunch/TfIdfIT.java | 1 - .../apache/crunch/types/PGroupedTableType.java | 61 ++++++++++++----- 3 files changed, 115 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java new file mode 100644 index 0000000..ccc91c6 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import java.util.Iterator; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.junit.Rule; +import org.junit.Test; + +/** + * + */ +public class SingleUseIterableExceptionIT { + + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + static class ReduceFn extends MapFn<Iterable<String>, String> { + @Override + public String map(Iterable<String> input) { + Iterator<String> iter = input.iterator(); + throw new CrunchRuntimeException("Exception"); + } + } + + @Test + public void testException() throws Exception { + run(new MRPipeline(SingleUseIterableExceptionIT.class), + tmpDir.copyResourceFileName("shakes.txt"), + tmpDir.getFileName("out")); + } + + public static void run(MRPipeline p, String input, String output) { + PCollection<String> shakes = p.readTextFile(input); + shakes.parallelDo(new MapFn<String, Pair<String, String>>() { + @Override + public Pair<String, String> map(String input) { + if (input.length() > 5) { + return Pair.of(input.substring(0, 5), input); + } else { + return Pair.of("__SHORT__", input); + } + } + }, Avros.tableOf(Avros.strings(), Avros.strings())) + .groupByKey() + .mapValues(new ReduceFn(), Avros.strings()) + .write(To.textFile(output)); + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java index 23e45ca..640686a 100644 --- a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java @@ -26,7 +26,6 @@ import java.nio.charset.Charset; import java.util.Collection; import java.util.List; -import org.apache.crunch.fn.MapKeysFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.seq.SeqFileSourceTarget; import org.apache.crunch.lib.Aggregate; http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java index d276cd6..4e72054 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java @@ -42,37 +42,66 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable< protected static class PTypeIterable<V> implements Iterable<V> { private final Iterable<Object> iterable; - private final MapFn<Object, V> mapFn; + private final HoldLastIterator<V> holdLastIter; public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) { - this.mapFn = mapFn; this.iterable = iterable; + this.holdLastIter = new HoldLastIterator<V>(mapFn); } public Iterator<V> iterator() { - return new Iterator<V>() { - Iterator<Object> iter = iterable.iterator(); + return holdLastIter.reset(iterable.iterator()); + } + + @Override + public String toString() { + return holdLastIter.toString(); + } + } + + protected static class HoldLastIterator<V> implements Iterator<V> { - public boolean hasNext() { - return iter.hasNext(); - } + private Iterator<Object> iter; + private V lastReturned = null; + private final MapFn<Object, V> mapFn; + + public HoldLastIterator(MapFn<Object, V> mapFn) { + this.mapFn = mapFn; + } + + public HoldLastIterator<V> reset(Iterator<Object> iter) { + this.iter = iter; + return this; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } - public V next() { - return mapFn.map(iter.next()); - } + @Override + public V next() { + lastReturned = mapFn.map(iter.next()); + return lastReturned; + } - public void remove() { - iter.remove(); - } - }; + @Override + public void remove() { + throw new UnsupportedOperationException(); } @Override public String toString() { - return Iterables.toString(this); + StringBuilder sb = new StringBuilder().append('['); + if (lastReturned != null) { + sb.append(lastReturned).append(", ...]"); + } else if (iter != null) { + sb.append("...]"); + } + return sb.toString(); } } - + public static class PairIterableMapFn<K, V> extends MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> { private final MapFn<Object, K> keys; private final MapFn<Object, V> values;
