Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 1a5bb622e -> 00127b9a9
CRUNCH-400: Add PipelineResult option to MaterializableIterable Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/00127b9a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/00127b9a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/00127b9a Branch: refs/heads/apache-crunch-0.8 Commit: 00127b9a99e703267eb04bf43ea9b624213b3090 Parents: 1a5bb62 Author: Josh Wills <[email protected]> Authored: Mon Jul 14 12:11:36 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jul 17 11:53:08 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/MaterializeIT.java | 13 ++++--- .../materialize/MaterializableIterable.java | 39 ++++++++++++++++++-- 2 files changed, 42 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/00127b9a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java index d064993..cb0f306 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java @@ -23,9 +23,11 @@ import static junit.framework.Assert.assertTrue; import java.io.IOException; import java.util.List; +import com.google.common.collect.Iterables; import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; @@ -104,8 +106,7 @@ public class MaterializeIT { throws IOException { String inputPath = tmpDir.copyResourceFileName("set1.txt"); PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL()); - - assertTrue(Lists.newArrayList(empty.materialize()).isEmpty()); + assertTrue(Iterables.isEmpty(empty.materialize())); pipeline.done(); } @@ -126,14 +127,14 @@ public class MaterializeIT { public void testMaterializeAvroPersonAndReflectsPair_GroupedTable() throws IOException { Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS); Pipeline pipeline = new MRPipeline(MaterializeIT.class); - List<Pair<StringWrapper, Person>> pairList = Lists.newArrayList(pipeline + MaterializableIterable<Pair<StringWrapper, Person>> mi = (MaterializableIterable) pipeline .readTextFile(tmpDir.copyResourceFileName("set1.txt")) .parallelDo(new StringToStringWrapperPersonPairMapFn(), Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class))) - .materialize()); + .materialize(); // We just need to make sure this doesn't crash - assertEquals(4, pairList.size()); - + assertEquals(4, Iterables.size(mi)); + assertTrue(mi.getPipelineResult().succeeded()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/00127b9a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java index aeb1fba..d00a4c1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java +++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java @@ -20,16 +20,22 @@ package org.apache.crunch.materialize; import java.io.IOException; import java.util.Iterator; +import com.google.common.collect.Iterators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.PathTarget; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.hadoop.fs.Path; +/** + * A reference to the materialized output of a {@code PCollection} created + * by a subclass of {@code DistributedPipeline}. + */ public class MaterializableIterable<E> implements Iterable<E> { private static final Log LOG = LogFactory.getLog(MaterializableIterable.class); @@ -37,6 +43,7 @@ public class MaterializableIterable<E> implements Iterable<E> { private final Pipeline pipeline; private final ReadableSource<E> source; private Iterable<E> materialized; + private PipelineResult result; public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) { this.pipeline = pipeline; @@ -44,14 +51,23 @@ public class MaterializableIterable<E> implements Iterable<E> { this.materialized = null; } + /** + * Returns the backing {@code ReadableSource} for this instance. + */ public ReadableSource<E> getSource() { return source; } + /** + * Indicates whether this instance is backed by a {@code SourceTarget}. + */ public boolean isSourceTarget() { return (source instanceof SourceTarget); } - + + /** + * Returns the {@code Path} that contains this data, or null if no such path exists. + */ public Path getPath() { if (source instanceof FileSourceImpl) { return ((FileSourceImpl) source).getPath(); @@ -61,12 +77,27 @@ public class MaterializableIterable<E> implements Iterable<E> { } return null; } - + + /** + * Returns the {@code PipelineResult} that was generated by the Pipeline execution that + * created this data. This result will only be non-empty if an actual pipeline execution was + * performed in order to generate this data, and it will only be non-null if this method is + * called after the data from this Iterable is retrieved. + */ + public PipelineResult getPipelineResult() { + return result; + } + @Override public Iterator<E> iterator() { if (materialized == null) { - pipeline.run(); - materialize(); + this.result = pipeline.run(); + if (result.succeeded()) { + materialize(); + } else { + LOG.error("Pipeline run failed, returning empty iterator"); + return Iterators.emptyIterator(); + } } return materialized.iterator(); }
