Repository: crunch Updated Branches: refs/heads/master cbb1b7e75 -> 4f2b1f25f
CRUNCH-493: Re-enable exception throwing on reading materialized PCollections after a pipeline failure Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/4f2b1f25 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/4f2b1f25 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/4f2b1f25 Branch: refs/heads/master Commit: 4f2b1f25fe21aca7c2df4747e754862d113173be Parents: cbb1b7e Author: Josh Wills <[email protected]> Authored: Wed Jan 28 12:41:03 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Thu Jan 29 09:15:36 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/MaterializeIT.java | 54 ++++++++++++++------ .../materialize/MaterializableIterable.java | 2 +- 2 files changed, 40 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/4f2b1f25/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java index cb0f306..7bc61df 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.List; import com.google.common.collect.Iterables; -import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.materialize.MaterializableIterable; @@ -36,6 +35,7 @@ import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.hadoop.conf.Configuration; import org.junit.Assume; import org.junit.Rule; import org.junit.Test; @@ -70,27 +70,27 @@ public class MaterializeIT { } @Test - public void testMaterializeEmptyIntermediate_Writables() throws IOException { + public void testMaterializeEmptyIntermediate() throws IOException { runMaterializeEmptyIntermediate( - new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()), - WritableTypeFamily.getInstance()); + new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration())); } @Test - public void testMaterializeEmptyIntermediate_Avro() throws IOException { - runMaterializeEmptyIntermediate( - new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()), - AvroTypeFamily.getInstance()); + public void testMaterializeEmptyIntermediate_InMemory() throws IOException { + runMaterializeEmptyIntermediate(MemPipeline.getInstance()); } - @Test - public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException { - runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance()); + @Test(expected = CrunchRuntimeException.class) + public void testMaterializeFailure() throws IOException { + runMaterializeWithFailure( + new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration())); } @Test - public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException { - runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance()); + public void testMaterializeNoFailure() throws IOException { + Configuration conf = tmpDir.getDefaultConfiguration(); + conf.setBoolean("crunch.empty.materialize.on.failure", true); + runMaterializeWithFailure(new MRPipeline(MaterializeIT.class, conf)); } public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { @@ -102,14 +102,38 @@ public class MaterializeIT { pipeline.done(); } - public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily) + public void runMaterializeEmptyIntermediate(Pipeline pipeline) throws IOException { String inputPath = tmpDir.copyResourceFileName("set1.txt"); - PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL()); + PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FilterAll<String>(false)); assertTrue(Iterables.isEmpty(empty.materialize())); pipeline.done(); } + public void runMaterializeWithFailure(Pipeline pipeline) throws IOException { + String inputPath = tmpDir.copyResourceFileName("set1.txt"); + PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FilterAll<String>(true)); + empty.materialize().iterator(); + pipeline.done(); + } + + static class FilterAll<T> extends FilterFn<T> { + + private final boolean throwException; + + public FilterAll(boolean throwException) { + this.throwException = throwException; + } + + @Override + public boolean accept(T input) { + if (throwException) { + throw new RuntimeException("This is an exception"); + } + return false; + } + } + static class StringToStringWrapperPersonPairMapFn extends MapFn<String, Pair<StringWrapper, Person>> { @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/4f2b1f25/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java index f83117f..232d0a1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java +++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java @@ -92,7 +92,7 @@ public class MaterializableIterable<E> implements Iterable<E> { public Iterator<E> iterator() { if (materialized == null) { this.result = pipeline.run(); - if (result.succeeded()) { + if (result.succeeded() || !pipeline.getConfiguration().getBoolean("crunch.empty.materialize.on.failure", false)) { materialize(); } else { LOG.error("Pipeline run failed, returning empty iterator");
