Repository: crunch Updated Branches: refs/heads/master 49a64ab16 -> 49e457559
CRUNCH-607 Allow collection reuse in MemPipeline Prevent SingleUseIterable from throwing an IllegalArgumentException when legal reuse of PGroupedCollections are done with the MemPipeline. This simply prevents materializing the transformed contents of a MemCollection until it is iterated over. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/49e45755 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/49e45755 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/49e45755 Branch: refs/heads/master Commit: 49e4575595e4667a7d2aeef7d4e0aaeace0f59c3 Parents: 49a64ab Author: Gabriel Reid <[email protected]> Authored: Mon May 2 17:31:20 2016 +0200 Committer: Gabriel Reid <[email protected]> Committed: Mon May 2 17:31:20 2016 +0200 ---------------------------------------------------------------------- .../crunch/impl/mem/collect/MemCollection.java | 11 +-- .../java/org/apache/crunch/WriteModeTest.java | 6 +- .../mem/MemPipelinePCollectionReuseTest.java | 74 ++++++++++++++++++++ 3 files changed, 83 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/49e45755/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 087a31d..f032d18 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -25,6 +25,7 @@ import java.lang.reflect.Method; import java.util.Collection; import java.util.Set; +import com.google.common.collect.Iterables; import javassist.util.proxy.MethodFilter; import javassist.util.proxy.MethodHandler; import javassist.util.proxy.ProxyFactory; @@ -67,7 +68,7 @@ import com.google.common.collect.ImmutableSet; public class MemCollection<S> implements PCollection<S> { - private final Collection<S> collect; + private final Iterable<S> collect; private final PType<S> ptype; private String name; @@ -80,7 +81,7 @@ public class MemCollection<S> implements PCollection<S> { } public MemCollection(Iterable<S> collect, PType<S> ptype, String name) { - this.collect = ImmutableList.copyOf(collect); + this.collect = collect; this.ptype = ptype; this.name = name; } @@ -244,11 +245,11 @@ public class MemCollection<S> implements PCollection<S> { @Override public ReadableData<S> asReadable(boolean materialize) { - return new MemReadableData<S>(collect); + return new MemReadableData<S>(ImmutableList.copyOf(collect)); } public Collection<S> getCollection() { - return collect; + return ImmutableList.copyOf(collect); } @Override @@ -266,7 +267,7 @@ public class MemCollection<S> implements PCollection<S> { @Override public long getSize() { - return collect.isEmpty() ? 0 : 1; // getSize is only used for pipeline optimization in MR + return Iterables.isEmpty(collect) ? 0 : 1; // getSize is only used for pipeline optimization in MR } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/49e45755/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java b/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java index e99ac7b..977b14d 100644 --- a/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java @@ -51,7 +51,7 @@ public class WriteModeTest { public void testOverwrite() throws Exception { Path p = run(WriteMode.OVERWRITE, true); PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString()); - assertEquals(ImmutableList.of("some", "string", "values"), lines.materialize()); + assertEquals(ImmutableList.of("some", "string", "values"), ImmutableList.copyOf(lines.materialize())); } @Test(expected=CrunchRuntimeException.class) @@ -64,7 +64,7 @@ public class WriteModeTest { Path p = run(WriteMode.APPEND, true); PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString()); assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"), - lines.materialize()); + ImmutableList.copyOf(lines.materialize())); } @Test @@ -72,7 +72,7 @@ public class WriteModeTest { Path p = run(WriteMode.APPEND, false); PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString()); assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"), - lines.materialize()); + ImmutableList.copyOf(lines.materialize())); } Path run(WriteMode writeMode, boolean doRun) throws Exception { http://git-wip-us.apache.org/repos/asf/crunch/blob/49e45755/crunch-core/src/test/java/org/apache/crunch/impl/mem/MemPipelinePCollectionReuseTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mem/MemPipelinePCollectionReuseTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mem/MemPipelinePCollectionReuseTest.java new file mode 100644 index 0000000..fa63287 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mem/MemPipelinePCollectionReuseTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.impl.mem; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.types.avro.Avros; +import org.junit.Test; + +public class MemPipelinePCollectionReuseTest { + + /** + * Specific test for the situation outlined in CRUNCH-607, which was that deriving two PCollections from the same + * PGroupedTable would throw an IllegalStateException from SingleUseIterable. This just ensures that this case + * doesn't return. + */ + @Test + public void testGroupedCollectionReuse() { + + PCollection<String> stringValues = MemPipeline.typedCollectionOf(Avros.strings(), "one", "two", "three"); + + PGroupedTable<String, String> groupedTable = + stringValues.by(IdentityFn.<String>getInstance(), Avros.strings()).groupByKey(); + + // Here we re-use the grouped table twice, meaning its internal iterators will need to be iterated multiple times + PTable<String, Integer> stringLengthTable = + groupedTable.mapValues(new MaxStringLengthFn(), Avros.ints()); + + // Previous to LP-607, this would fail with an IllegalStateException from SingleUseIterable + Set<String> keys = ImmutableSet.copyOf(groupedTable.ungroup().join(stringLengthTable).keys().materialize()); + + assertEquals( + ImmutableSet.of("one", "two", "three"), + keys); + } + + + public static class MaxStringLengthFn extends MapFn<Iterable<String>, Integer> { + @Override + public Integer map(Iterable<String> input) { + int maxLength = Integer.MIN_VALUE; + for (String inputString : input) { + maxLength = Math.max(maxLength, inputString.length()); + } + return maxLength; + } + } + + +}
