Repository: crunch Updated Branches: refs/heads/master 3fff74e2e -> f1d074c2a
CRUNCH-601: Handle empty PCollections correctly in Crunch-on-Spark. Created by Micah Whitacre, Mikael Goldmann, and Josh Wills. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f1d074c2 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f1d074c2 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f1d074c2 Branch: refs/heads/master Commit: f1d074c2a7dcaf44b03dab5b84e9d323f586fdac Parents: 3fff74e Author: Josh Wills <[email protected]> Authored: Wed Aug 24 10:59:14 2016 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Aug 25 22:47:02 2016 -0700 ---------------------------------------------------------------------- .../impl/dist/collect/BaseDoCollection.java | 6 +- .../crunch/impl/dist/collect/BaseDoTable.java | 6 +- .../java/org/apache/crunch/lib/Aggregate.java | 2 +- .../pobject/FirstElementPObject.java | 15 +++- .../crunch/SmallCollectionLengthTest.java | 80 ++++++++++++++++++++ 5 files changed, 105 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java index bb1d054..a43967e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java @@ -46,7 +46,11 @@ public class BaseDoCollection<S> extends PCollectionImpl<S> { @Override protected long getSizeInternal() { - return (long) (fn.scaleFactor() * parent.getSize()); + long parentSize = parent.getSize(); + if (parentSize == 0L) { + return parentSize; + } + return Math.max(1L, (long) (fn.scaleFactor() * parentSize)); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java index 4c5569e..4b64ae8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java @@ -74,7 +74,11 @@ public class BaseDoTable<K, V> extends PTableBase<K, V> implements PTable<K, V> @Override protected long getSizeInternal() { - return (long) (fn.scaleFactor() * parent.getSize()); + long parentSize = parent.getSize(); + if (parentSize == 0L) { + return parentSize; + } + return Math.max(1L, (long) (fn.scaleFactor() * parentSize)); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index dd4e1db..9f71458 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -92,7 +92,7 @@ public class Aggregate { .groupByKey(GroupingOptions.builder().numReducers(1).build()) .combineValues(Aggregators.SUM_LONGS()); PCollection<Long> count = countTable.values(); - return new FirstElementPObject<Long>(count); + return new FirstElementPObject<Long>(count, 0L); } public static class PairValueComparator<K, V> implements Comparator<Pair<K, V>> { http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java index aa5fd9e..7f25720 100644 --- a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java +++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java @@ -29,13 +29,26 @@ import org.apache.crunch.PCollection; */ public class FirstElementPObject<T> extends PObjectImpl<T, T> { + private T defaultValue; + /** * Constructs a new instance of this {@code PObject} implementation. * * @param collect The backing {@code PCollection} for this {@code PObject}. */ public FirstElementPObject(PCollection<T> collect) { + this(collect, null); + } + + /** + * Constructs a new instance of this {@code PObject} implementation. + * + * @param collect The backing {@code PCollection} for this {@code PObject}. + * @param defaultValue The value to return if the backing PCollection is empty. + */ + public FirstElementPObject(PCollection<T> collect, T defaultValue) { super(collect); + this.defaultValue = defaultValue; } /** {@inheritDoc} */ @@ -45,6 +58,6 @@ public class FirstElementPObject<T> extends PObjectImpl<T, T> { if (itr.hasNext()) { return itr.next(); } - return null; + return defaultValue; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-spark/src/it/java/org/apache/crunch/SmallCollectionLengthTest.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SmallCollectionLengthTest.java b/crunch-spark/src/it/java/org/apache/crunch/SmallCollectionLengthTest.java new file mode 100644 index 0000000..2cc800b --- /dev/null +++ b/crunch-spark/src/it/java/org/apache/crunch/SmallCollectionLengthTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.impl.spark; + + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PObject; +import org.apache.crunch.Pipeline; +import org.apache.crunch.types.avro.Avros; +import org.junit.Test; + +import java.io.Serializable; +import java.util.ArrayList; + +import javax.annotation.Nullable; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +public class SmallCollectionLengthTest implements Serializable { + + + @Test + public void smallCollectionsShouldNotHaveNullLength() throws Exception { + Pipeline p = new SparkPipeline("local", "foobar"); + final ImmutableList<String> + allFruits = + ImmutableList.of("apelsin", "banan", "citron", "daddel"); + final ArrayList<ImmutableList<String>> fruitLists = new ArrayList<>(); + for (int i = 0; i <= allFruits.size(); ++i) { + fruitLists.add(ImmutableList.copyOf(allFruits.subList(0, i))); + } + + final ArrayList<PObject<Long>> results = new ArrayList<>(); + for (ImmutableList<String> fruit : fruitLists) { + final PCollection<String> collection = p.create(fruit, Avros.strings()); + results.add(collection.length()); + } + + p.run(); + + final Iterable<Long> + lengths = + Iterables.transform(results, new Function<PObject<Long>, Long>() { + @Nullable + @Override + public Long apply(@Nullable PObject<Long> input) { + return input.getValue(); + } + }); + + for (Long length : lengths) { + assertThat(length, not(nullValue())); + } + + p.done(); + } + +}
