Repository: incubator-beam Updated Branches: refs/heads/master f9a9214dd -> c8ad2e7dd
Revert GBK-based PAssert This changed neglected the use of counters by the Dataflow runner, which is used to prevent tests for spuriously passing when a PCollection is empty. Obvious fixes for that revealed probable bugs in the in-process and Spark runner, as well as tests that happen to work with PAssert but are actually unsupported. A proper long-term fix is underway to address all of the above. In the meantime, this commit rolls back the changes to PAssert. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045b568f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045b568f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045b568f Branch: refs/heads/master Commit: 045b568f6be4b7b010d4fd4cfdd1536db943ce54 Parents: f9a9214 Author: Kenneth Knowles <[email protected]> Authored: Tue Jun 14 08:05:04 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Jun 14 08:05:41 2016 -0700 ---------------------------------------------------------------------- .../testing/TestDataflowPipelineRunner.java | 3 +- .../org/apache/beam/sdk/testing/PAssert.java | 779 +++++++++---------- .../apache/beam/sdk/testing/PAssertTest.java | 27 + 3 files changed, 396 insertions(+), 413 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java index c940e9a..3e8d903 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java @@ -166,8 +166,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ public <OutputT extends POutput, InputT extends PInput> OutputT apply( PTransform<InputT, OutputT> transform, InputT input) { if (transform instanceof PAssert.OneSideInputAssert - || transform instanceof PAssert.GroupThenAssert - || transform instanceof PAssert.GroupThenAssertForSingleton) { + || transform instanceof PAssert.TwoSideInputAssert) { expectedNumberOfAssertions += 1; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 62d3599..c2cd598 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -34,14 +34,11 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; @@ -51,27 +48,32 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; /** - * An assertion on the contents of a {@link PCollection} incorporated into the pipeline. Such an - * assertion can be checked no matter what kind of {@link PipelineRunner} is used. + * An assertion on the contents of a {@link PCollection} + * incorporated into the pipeline. Such an assertion + * can be checked no matter what kind of {@link PipelineRunner} is + * used. * - * <p>Note that the {@code PAssert} call must precede the call to {@link Pipeline#run}. + * <p>Note that the {@code PAssert} call must precede the call + * to {@link Pipeline#run}. * - * <p>Examples of use: <pre>{@code + * <p>Examples of use: + * <pre>{@code * Pipeline p = TestPipeline.create(); * ... * PCollection<String> output = @@ -105,84 +107,30 @@ public class PAssert { private PAssert() {} /** - * Builder interface for assertions applicable to iterables and PCollection contents. - */ - public interface IterableAssert<T> { - - /** - * Asserts that the iterable in question contains the provided elements. - * - * @return the same {@link IterableAssert} builder for further assertions - */ - IterableAssert<T> containsInAnyOrder(T... expectedElements); - - /** - * Asserts that the iterable in question contains the provided elements. - * - * @return the same {@link IterableAssert} builder for further assertions - */ - IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements); - - /** - * Asserts that the iterable in question is empty. - * - * @return the same {@link IterableAssert} builder for further assertions - */ - IterableAssert<T> empty(); - - /** - * Applies the provided checking function (presumably containing assertions) to the - * iterable in question. - * - * @return the same {@link IterableAssert} builder for further assertions - */ - IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn); - } - - /** - * Builder interface for assertions applicable to a single value. - */ - public interface SingletonAssert<T> { - /** - * Asserts that the value in question is equal to the provided value, according to - * {@link Object#equals}. - * - * @return the same {@link SingletonAssert} builder for further assertions - */ - SingletonAssert<T> isEqualTo(T expected); - - /** - * Asserts that the value in question is not equal to the provided value, according - * to {@link Object#equals}. - * - * @return the same {@link SingletonAssert} builder for further assertions - */ - SingletonAssert<T> notEqualTo(T notExpected); - - /** - * Applies the provided checking function (presumably containing assertions) to the - * value in question. - * - * @return the same {@link SingletonAssert} builder for further assertions - */ - SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn); - } - - /** - * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}. + * Constructs an {@link IterableAssert} for the elements of the provided + * {@link PCollection}. */ public static <T> IterableAssert<T> that(PCollection<T> actual) { - return new PCollectionContentsAssert<>(actual); + return new IterableAssert<>( + new CreateActual<T, Iterable<T>>(actual, View.<T>asIterable()), + actual.getPipeline()) + .setCoder(actual.getCoder()); } /** - * Constructs an {@link IterableAssert} for the value of the provided {@link PCollection} which - * must contain a single {@code Iterable<T>} value. + * Constructs an {@link IterableAssert} for the value of the provided + * {@link PCollection} which must contain a single {@code Iterable<T>} + * value. */ - public static <T> IterableAssert<T> thatSingletonIterable( - PCollection<? extends Iterable<T>> actual) { + public static <T> IterableAssert<T> + thatSingletonIterable(PCollection<? extends Iterable<T>> actual) { + List<? extends Coder<?>> maybeElementCoder = actual.getCoder().getCoderArguments(); + Coder<T> tCoder; try { + @SuppressWarnings("unchecked") + Coder<T> tCoderTmp = (Coder<T>) Iterables.getOnlyElement(maybeElementCoder); + tCoder = tCoderTmp; } catch (NoSuchElementException | IllegalArgumentException exc) { throw new IllegalArgumentException( "PAssert.<T>thatSingletonIterable requires a PCollection<Iterable<T>>" @@ -193,7 +141,19 @@ public class PAssert { @SuppressWarnings("unchecked") // Safe covariant cast PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual; - return new PCollectionSingletonIterableAssert<>(actualIterables); + return new IterableAssert<>( + new CreateActual<Iterable<T>, Iterable<T>>( + actualIterables, View.<Iterable<T>>asSingleton()), + actual.getPipeline()) + .setCoder(tCoder); + } + + /** + * Constructs an {@link IterableAssert} for the value of the provided + * {@code PCollectionView PCollectionView<Iterable<T>>}. + */ + public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> actual) { + return new IterableAssert<>(new PreExisting<Iterable<T>>(actual), actual.getPipeline()); } /** @@ -201,95 +161,93 @@ public class PAssert { * {@code PCollection PCollection<T>}, which must be a singleton. */ public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) { - return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder()); + return new SingletonAssert<>( + new CreateActual<T, T>(actual, View.<T>asSingleton()), actual.getPipeline()) + .setCoder(actual.getCoder()); } /** * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}. * - * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any - * {@code Coder<K, V>}. + * <p>Note that the actual value must be coded by a {@link KvCoder}, + * not just any {@code Coder<K, V>}. */ - public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap( - PCollection<KV<K, V>> actual) { + public static <K, V> SingletonAssert<Map<K, Iterable<V>>> + thatMultimap(PCollection<KV<K, V>> actual) { @SuppressWarnings("unchecked") KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder(); - return new PCollectionViewAssert<>( - actual, - View.<K, V>asMultimap(), - MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); + + return new SingletonAssert<>( + new CreateActual<>(actual, View.<K, V>asMultimap()), actual.getPipeline()) + .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); } /** - * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which - * must have at most one value per key. + * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, + * which must have at most one value per key. * - * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any - * {@code Coder<K, V>}. + * <p>Note that the actual value must be coded by a {@link KvCoder}, + * not just any {@code Coder<K, V>}. */ public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) { @SuppressWarnings("unchecked") KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder(); - return new PCollectionViewAssert<>( - actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); + + return new SingletonAssert<>( + new CreateActual<>(actual, View.<K, V>asMap()), actual.getPipeline()) + .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); } //////////////////////////////////////////////////////////// /** - * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require - * the runner to support side inputs. + * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable<T>}. */ - private static class PCollectionContentsAssert<T> implements IterableAssert<T> { - private final PCollection<T> actual; + public static class IterableAssert<T> implements Serializable { + private final Pipeline pipeline; + private final PTransform<PBegin, PCollectionView<Iterable<T>>> createActual; + private Optional<Coder<T>> coder; - public PCollectionContentsAssert(PCollection<T> actual) { - this.actual = actual; + protected IterableAssert( + PTransform<PBegin, PCollectionView<Iterable<T>>> createActual, Pipeline pipeline) { + this.createActual = createActual; + this.pipeline = pipeline; + this.coder = Optional.absent(); } /** - * Checks that the {@code Iterable} contains the expected elements, in any order. + * Sets the coder to use for elements of type {@code T}, as needed for internal purposes. * * <p>Returns this {@code IterableAssert}. */ - @Override - @SafeVarargs - public final PCollectionContentsAssert<T> containsInAnyOrder(T... expectedElements) { - return containsInAnyOrder(Arrays.asList(expectedElements)); + public IterableAssert<T> setCoder(Coder<T> coderOrNull) { + this.coder = Optional.fromNullable(coderOrNull); + return this; } /** - * Checks that the {@code Iterable} contains the expected elements, in any order. - * - * <p>Returns this {@code IterableAssert}. + * Gets the coder, which may yet be absent. */ - @Override - public PCollectionContentsAssert<T> containsInAnyOrder(Iterable<T> expectedElements) { - return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements); - } - - @Override - public PCollectionContentsAssert<T> empty() { - return containsInAnyOrder(Collections.<T>emptyList()); - } - - @Override - public PCollectionContentsAssert<T> satisfies( - SerializableFunction<Iterable<T>, Void> checkerFn) { - actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); - return this; + public Coder<T> getCoder() { + if (coder.isPresent()) { + return coder.get(); + } else { + throw new IllegalStateException( + "Attempting to access the coder of an IterableAssert" + + " that has not been set yet."); + } } /** - * Checks that the {@code Iterable} contains elements that match the provided matchers, in any - * order. + * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}. * * <p>Returns this {@code IterableAssert}. */ - @SafeVarargs - final PCollectionContentsAssert<T> containsInAnyOrder( - SerializableMatcher<? super T>... elementMatchers) { - return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers)); + public IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn) { + pipeline.apply( + "PAssert$" + (assertCount++), + new OneSideInputAssert<Iterable<T>>(createActual, checkerFn)); + return this; } /** @@ -297,11 +255,17 @@ public class PAssert { * * <p>Returns this {@code IterableAssert}. */ - private PCollectionContentsAssert<T> satisfies( - AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) { - return satisfies( - new CheckRelationAgainstExpected<Iterable<T>>( - relation, expectedElements, IterableCoder.of(actual.getCoder()))); + public IterableAssert<T> satisfies( + AssertRelation<Iterable<T>, Iterable<T>> relation, + final Iterable<T> expectedElements) { + pipeline.apply( + "PAssert$" + (assertCount++), + new TwoSideInputAssert<Iterable<T>, Iterable<T>>( + createActual, + new CreateExpected<T, Iterable<T>>(expectedElements, coder, View.<T>asIterable()), + relation)); + + return this; } /** @@ -309,14 +273,15 @@ public class PAssert { * * <p>Returns this {@code IterableAssert}. */ - PCollectionContentsAssert<T> satisfies( - final SerializableMatcher<Iterable<? extends T>> matcher) { + IterableAssert<T> satisfies(final SerializableMatcher<Iterable<? extends T>> matcher) { // Safe covariant cast. Could be elided by changing a lot of this file to use // more flexible bounds. @SuppressWarnings({"rawtypes", "unchecked"}) SerializableFunction<Iterable<T>, Void> checkerFn = - (SerializableFunction) new MatcherCheckerFn<>(matcher); - actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); + (SerializableFunction) new MatcherCheckerFn<>(matcher); + pipeline.apply( + "PAssert$" + (assertCount++), + new OneSideInputAssert<Iterable<T>>(createActual, checkerFn)); return this; } @@ -335,9 +300,19 @@ public class PAssert { } /** + * Checks that the {@code Iterable} is empty. + * + * <p>Returns this {@code IterableAssert}. + */ + public IterableAssert<T> empty() { + return satisfies(new AssertContainsInAnyOrderRelation<T>(), Collections.<T>emptyList()); + } + + /** * @throws UnsupportedOperationException always - * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If you meant - * to test object equality, use a variant of {@link #containsInAnyOrder} instead. + * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. + * If you meant to test object equality, use a variant of {@link #containsInAnyOrder} + * instead. */ @Deprecated @Override @@ -356,129 +331,169 @@ public class PAssert { throw new UnsupportedOperationException( String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName())); } - } - /** - * An {@link IterableAssert} for an iterable that is the sole element of a {@link PCollection}. - * This does not require the runner to support side inputs. - */ - private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> { - private final PCollection<Iterable<T>> actual; - private final Coder<T> elementCoder; - - public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) { - this.actual = actual; + /** + * Checks that the {@code Iterable} contains the expected elements, in any + * order. + * + * <p>Returns this {@code IterableAssert}. + */ + public IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) { + return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements); + } - @SuppressWarnings("unchecked") - Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0); - this.elementCoder = typedCoder; + /** + * Checks that the {@code Iterable} contains the expected elements, in any + * order. + * + * <p>Returns this {@code IterableAssert}. + */ + @SafeVarargs + public final IterableAssert<T> containsInAnyOrder(T... expectedElements) { + return satisfies( + new AssertContainsInAnyOrderRelation<T>(), + Arrays.asList(expectedElements)); } - @Override + /** + * Checks that the {@code Iterable} contains elements that match the provided matchers, + * in any order. + * + * <p>Returns this {@code IterableAssert}. + */ @SafeVarargs - public final PCollectionSingletonIterableAssert<T> containsInAnyOrder(T... expectedElements) { - return containsInAnyOrder(Arrays.asList(expectedElements)); + final IterableAssert<T> containsInAnyOrder( + SerializableMatcher<? super T>... elementMatchers) { + return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers)); } + } - @Override - public PCollectionSingletonIterableAssert<T> empty() { - return containsInAnyOrder(Collections.<T>emptyList()); + /** + * An assertion about the single value of type {@code T} + * associated with a {@link PCollectionView}. + */ + public static class SingletonAssert<T> implements Serializable { + private final Pipeline pipeline; + private final CreateActual<?, T> createActual; + private Optional<Coder<T>> coder; + + protected SingletonAssert( + CreateActual<?, T> createActual, Pipeline pipeline) { + this.pipeline = pipeline; + this.createActual = createActual; + this.coder = Optional.absent(); } + /** + * Always throws an {@link UnsupportedOperationException}: users are probably looking for + * {@link #isEqualTo}. + */ + @Deprecated @Override - public PCollectionSingletonIterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) { - return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements); + public boolean equals(Object o) { + throw new UnsupportedOperationException( + String.format( + "tests for Java equality of the %s object, not the PCollection in question. " + + "Call a test method, such as isEqualTo.", + getClass().getSimpleName())); } + /** + * @throws UnsupportedOperationException always. + * @deprecated {@link Object#hashCode()} is not supported on PAssert objects. + */ + @Deprecated @Override - public PCollectionSingletonIterableAssert<T> satisfies( - SerializableFunction<Iterable<T>, Void> checkerFn) { - actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn)); - return this; + public int hashCode() { + throw new UnsupportedOperationException( + String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName())); } - private PCollectionSingletonIterableAssert<T> satisfies( - AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) { - return satisfies( - new CheckRelationAgainstExpected<Iterable<T>>( - relation, expectedElements, IterableCoder.of(elementCoder))); + /** + * Sets the coder to use for elements of type {@code T}, as needed + * for internal purposes. + * + * <p>Returns this {@code IterableAssert}. + */ + public SingletonAssert<T> setCoder(Coder<T> coderOrNull) { + this.coder = Optional.fromNullable(coderOrNull); + return this; } - } - /** - * An assertion about the contents of a {@link PCollection} when it is viewed as a single value - * of type {@code ViewT}. This requires side input support from the runner. - */ - private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> { - private final PCollection<ElemT> actual; - private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view; - private final Coder<ViewT> coder; - - protected PCollectionViewAssert( - PCollection<ElemT> actual, - PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view, - Coder<ViewT> coder) { - this.actual = actual; - this.view = view; - this.coder = coder; + /** + * Gets the coder, which may yet be absent. + */ + public Coder<T> getCoder() { + if (coder.isPresent()) { + return coder.get(); + } else { + throw new IllegalStateException( + "Attempting to access the coder of an IterableAssert that has not been set yet."); + } } - @Override - public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) { - return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue); + /** + * Applies a {@link SerializableFunction} to check the value of this + * {@code SingletonAssert}'s view. + * + * <p>Returns this {@code SingletonAssert}. + */ + public SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) { + pipeline.apply( + "PAssert$" + (assertCount++), new OneSideInputAssert<T>(createActual, checkerFn)); + return this; } - @Override - public PCollectionViewAssert<ElemT, ViewT> notEqualTo(ViewT expectedValue) { - return satisfies(new AssertNotEqualToRelation<ViewT>(), expectedValue); - } + /** + * Applies an {@link AssertRelation} to check the provided relation against the + * value of this assert and the provided expected value. + * + * <p>Returns this {@code SingletonAssert}. + */ + public SingletonAssert<T> satisfies( + AssertRelation<T, T> relation, + final T expectedValue) { + pipeline.apply( + "PAssert$" + (assertCount++), + new TwoSideInputAssert<T, T>( + createActual, + new CreateExpected<T, T>(Arrays.asList(expectedValue), coder, View.<T>asSingleton()), + relation)); - @Override - public PCollectionViewAssert<ElemT, ViewT> satisfies( - SerializableFunction<ViewT, Void> checkerFn) { - actual - .getPipeline() - .apply( - "PAssert$" + (assertCount++), - new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn)); return this; } /** - * Applies an {@link AssertRelation} to check the provided relation against the value of this - * assert and the provided expected value. + * Checks that the value of this {@code SingletonAssert}'s view is equal + * to the expected value. * * <p>Returns this {@code SingletonAssert}. */ - private PCollectionViewAssert<ElemT, ViewT> satisfies( - AssertRelation<ViewT, ViewT> relation, final ViewT expectedValue) { - return satisfies(new CheckRelationAgainstExpected<ViewT>(relation, expectedValue, coder)); + public SingletonAssert<T> isEqualTo(T expectedValue) { + return satisfies(new AssertIsEqualToRelation<T>(), expectedValue); } /** - * Always throws an {@link UnsupportedOperationException}: users are probably looking for - * {@link #isEqualTo}. + * Checks that the value of this {@code SingletonAssert}'s view is not equal + * to the expected value. + * + * <p>Returns this {@code SingletonAssert}. */ - @Deprecated - @Override - public boolean equals(Object o) { - throw new UnsupportedOperationException( - String.format( - "tests for Java equality of the %s object, not the PCollection in question. " - + "Call a test method, such as isEqualTo.", - getClass().getSimpleName())); + public SingletonAssert<T> notEqualTo(T expectedValue) { + return satisfies(new AssertNotEqualToRelation<T>(), expectedValue); } /** - * @throws UnsupportedOperationException always. - * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects. + * Checks that the value of this {@code SingletonAssert}'s view is equal to + * the expected value. + * + * @deprecated replaced by {@link #isEqualTo} */ @Deprecated - @Override - public int hashCode() { - throw new UnsupportedOperationException( - String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName())); + public SingletonAssert<T> is(T expectedValue) { + return isEqualTo(expectedValue); } + } //////////////////////////////////////////////////////////////////////// @@ -489,13 +504,8 @@ public class PAssert { private final transient PCollection<T> actual; private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView; - public static <T, ActualT> CreateActual<T, ActualT> from( - PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) { - return new CreateActual<>(actual, actualView); - } - - private CreateActual( - PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) { + private CreateActual(PCollection<T> actual, + PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) { this.actual = actual; this.actualView = actualView; } @@ -505,129 +515,73 @@ public class PAssert { final Coder<T> coder = actual.getCoder(); return actual .apply(Window.<T>into(new GlobalWindows())) - .apply( - ParDo.of( - new DoFn<T, T>() { - @Override - public void processElement(ProcessContext context) throws CoderException { - context.output(CoderUtils.clone(coder, context.element())); - } - })) + .apply(ParDo.of(new DoFn<T, T>() { + @Override + public void processElement(ProcessContext context) throws CoderException { + context.output(CoderUtils.clone(coder, context.element())); + } + })) .apply(actualView); } } - /** - * A partially applied {@link AssertRelation}, where one value is provided along with a coder to - * serialize/deserialize them. - */ - private static class CheckRelationAgainstExpected<T> implements SerializableFunction<T, Void> { - private final AssertRelation<T, T> relation; - private final byte[] encodedExpected; - private final Coder<T> coder; + private static class CreateExpected<T, ExpectedT> + extends PTransform<PBegin, PCollectionView<ExpectedT>> { - public CheckRelationAgainstExpected(AssertRelation<T, T> relation, T expected, Coder<T> coder) { - this.relation = relation; - this.coder = coder; + private final Iterable<T> elements; + private final Optional<Coder<T>> coder; + private final transient PTransform<PCollection<T>, PCollectionView<ExpectedT>> view; - try { - this.encodedExpected = CoderUtils.encodeToByteArray(coder, expected); - } catch (IOException coderException) { - throw new RuntimeException(coderException); - } + private CreateExpected(Iterable<T> elements, Optional<Coder<T>> coder, + PTransform<PCollection<T>, PCollectionView<ExpectedT>> view) { + this.elements = elements; + this.coder = coder; + this.view = view; } @Override - public Void apply(T actual) { - try { - T expected = CoderUtils.decodeFromByteArray(coder, encodedExpected); - return relation.assertFor(expected).apply(actual); - } catch (IOException coderException) { - throw new RuntimeException(coderException); + public PCollectionView<ExpectedT> apply(PBegin input) { + Create.Values<T> createTransform = Create.<T>of(elements); + if (coder.isPresent()) { + createTransform = createTransform.withCoder(coder.get()); } + return input.apply(createTransform).apply(view); } } - /** - * A transform that gathers the contents of a {@link PCollection} into a single main input - * iterable in the global window. This requires a runner to support {@link GroupByKey} in the - * global window, but not side inputs or other windowing or triggers. - */ - private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>> - implements Serializable { - - public GroupGlobally() {} - - @Override - public PCollection<Iterable<T>> apply(PCollection<T> input) { - return input - .apply("GloballyWindow", Window.<T>into(new GlobalWindows())) - .apply("DummyKey", WithKeys.<Integer, T>of(0)) - .apply("GroupByKey", GroupByKey.<Integer, T>create()) - .apply("GetOnlyValue", Values.<Iterable<T>>create()); - } - } - - /** - * A transform that applies an assertion-checking function over iterables of {@code ActualT} to - * the entirety of the contents of its input. - */ - public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone> - implements Serializable { - private final SerializableFunction<Iterable<T>, Void> checkerFn; - - private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) { - this.checkerFn = checkerFn; - } - - @Override - public PDone apply(PCollection<T> input) { - input - .apply("GroupGlobally", new GroupGlobally<T>()) - .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn))); + private static class PreExisting<T> extends PTransform<PBegin, PCollectionView<T>> { - return PDone.in(input.getPipeline()); - } - } + private final PCollectionView<T> view; - /** - * A transform that applies an assertion-checking function to a single iterable contained as the - * sole element of a {@link PCollection}. - */ - public static class GroupThenAssertForSingleton<T> - extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable { - private final SerializableFunction<Iterable<T>, Void> checkerFn; - - private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) { - this.checkerFn = checkerFn; + private PreExisting(PCollectionView<T> view) { + this.view = view; } @Override - public PDone apply(PCollection<Iterable<T>> input) { - input - .apply("GroupGlobally", new GroupGlobally<Iterable<T>>()) - .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn))); - - return PDone.in(input.getPipeline()); + public PCollectionView<T> apply(PBegin input) { + return view; } } /** - * An assertion checker that takes a single {@link PCollectionView - * PCollectionView<ActualT>} and an assertion over {@code ActualT}, and checks it within a - * Beam pipeline. + * An assertion checker that takes a single + * {@link PCollectionView PCollectionView<ActualT>} + * and an assertion over {@code ActualT}, and checks it within a dataflow + * pipeline. * - * <p>Note that the entire assertion must be serializable. + * <p>Note that the entire assertion must be serializable. If + * you need to make assertions involving multiple inputs + * that are each not serializable, use TwoSideInputAssert. * - * <p>This is generally useful for assertion functions that are serializable but whose underlying - * data may not have a coder. + * <p>This is generally useful for assertion functions that + * are serializable but whose underlying data may not have a coder. */ - public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone> - implements Serializable { + public static class OneSideInputAssert<ActualT> + extends PTransform<PBegin, PDone> implements Serializable { private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual; private final SerializableFunction<ActualT, Void> checkerFn; - private OneSideInputAssert( + public OneSideInputAssert( PTransform<PBegin, PCollectionView<ActualT>> createActual, SerializableFunction<ActualT, Void> checkerFn) { this.createActual = createActual; @@ -640,23 +594,21 @@ public class PAssert { input .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply( - ParDo.named("RunChecks") - .withSideInputs(actual) - .of(new SideInputCheckerDoFn<>(checkerFn, actual))); + .apply(ParDo.named("RunChecks").withSideInputs(actual) + .of(new CheckerDoFn<>(checkerFn, actual))); return PDone.in(input.getPipeline()); } } /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a - * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of + * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. * * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support * null values. */ - private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> { + private static class CheckerDoFn<ActualT> extends DoFn<Integer, Void> { private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -664,8 +616,9 @@ public class PAssert { createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn()); private final PCollectionView<ActualT> actual; - private SideInputCheckerDoFn( - SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) { + private CheckerDoFn( + SerializableFunction<ActualT, Void> checkerFn, + PCollectionView<ActualT> actual) { this.checkerFn = checkerFn; this.actual = actual; } @@ -674,9 +627,12 @@ public class PAssert { public void processElement(ProcessContext c) { try { ActualT actualContents = c.sideInput(actual); - doChecks(actualContents, checkerFn, success, failure); + checkerFn.apply(actualContents); + success.addValue(1); } catch (Throwable t) { - // Suppress exception in streaming + LOG.error("PAssert failed expectations.", t); + failure.addValue(1); + // TODO: allow for metrics to propagate on failure when running a streaming pipeline if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { throw t; } @@ -685,89 +641,87 @@ public class PAssert { } /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of - * the single iterable element of the input {@link PCollection} and adjusts counters and - * thrown exceptions for use in testing. + * An assertion checker that takes a {@link PCollectionView PCollectionView<ActualT>}, + * a {@link PCollectionView PCollectionView<ExpectedT>}, a relation + * over {@code A} and {@code B}, and checks that the relation holds + * within a dataflow pipeline. * - * <p>The singleton property is presumed, not enforced. + * <p>This is useful when either/both of {@code A} and {@code B} + * are not serializable, but have coders (provided + * by the underlying {@link PCollection}s). */ - private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> { - private final SerializableFunction<ActualT, Void> checkerFn; - private final Aggregator<Integer, Integer> success = - createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); - private final Aggregator<Integer, Integer> failure = - createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn()); + public static class TwoSideInputAssert<ActualT, ExpectedT> + extends PTransform<PBegin, PDone> implements Serializable { - private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) { - this.checkerFn = checkerFn; + private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual; + private final transient PTransform<PBegin, PCollectionView<ExpectedT>> createExpected; + private final AssertRelation<ActualT, ExpectedT> relation; + + protected TwoSideInputAssert( + PTransform<PBegin, PCollectionView<ActualT>> createActual, + PTransform<PBegin, PCollectionView<ExpectedT>> createExpected, + AssertRelation<ActualT, ExpectedT> relation) { + this.createActual = createActual; + this.createExpected = createExpected; + this.relation = relation; } @Override - public void processElement(ProcessContext c) { - try { - doChecks(c.element(), checkerFn, success, failure); - } catch (Throwable t) { - // Suppress exception in streaming - if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { - throw t; - } - } - } - } + public PDone apply(PBegin input) { + final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual); + final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected); - /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of - * the single item contained within the single iterable on input and - * adjusts counters and thrown exceptions for use in testing. - * - * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However, - * each input element must be a singleton iterable, or this will fail. - */ - private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> { - private final SerializableFunction<ActualT, Void> checkerFn; - private final Aggregator<Integer, Integer> success = - createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); - private final Aggregator<Integer, Integer> failure = - createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn()); + input + .apply(Create.of(0).withCoder(VarIntCoder.of())) + .apply("RunChecks", ParDo.withSideInputs(actual, expected) + .of(new CheckerDoFn<>(relation, actual, expected))); - private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) { - this.checkerFn = checkerFn; + return PDone.in(input.getPipeline()); } - @Override - public void processElement(ProcessContext c) { - try { - ActualT actualContents = Iterables.getOnlyElement(c.element()); - doChecks(actualContents, checkerFn, success, failure); - } catch (Throwable t) { - // Suppress exception in streaming - if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { - throw t; - } + /** + * Input is ignored, but is {@link Integer} for runners that do not support null values. + */ + private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Integer, Void> { + private final Aggregator<Integer, Integer> success = + createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); + private final Aggregator<Integer, Integer> failure = + createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn()); + private final AssertRelation<ActualT, ExpectedT> relation; + private final PCollectionView<ActualT> actual; + private final PCollectionView<ExpectedT> expected; + + private CheckerDoFn(AssertRelation<ActualT, ExpectedT> relation, + PCollectionView<ActualT> actual, PCollectionView<ExpectedT> expected) { + this.relation = relation; + this.actual = actual; + this.expected = expected; } - } - } - private static <ActualT> void doChecks( - ActualT actualContents, - SerializableFunction<ActualT, Void> checkerFn, - Aggregator<Integer, Integer> successAggregator, - Aggregator<Integer, Integer> failureAggregator) { - try { - checkerFn.apply(actualContents); - successAggregator.addValue(1); - } catch (Throwable t) { - LOG.error("PAssert failed expectations.", t); - failureAggregator.addValue(1); - throw t; + @Override + public void processElement(ProcessContext c) { + try { + ActualT actualContents = c.sideInput(actual); + ExpectedT expectedContents = c.sideInput(expected); + relation.assertFor(expectedContents).apply(actualContents); + success.addValue(1); + } catch (Throwable t) { + LOG.error("PAssert failed expectations.", t); + failure.addValue(1); + // TODO: allow for metrics to propagate on failure when running a streaming pipeline + if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { + throw t; + } + } + } } } ///////////////////////////////////////////////////////////////////////////// /** - * A {@link SerializableFunction} that verifies that an actual value is equal to an expected - * value. + * A {@link SerializableFunction} that verifies that an actual value is equal to an + * expected value. */ private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> { private T expected; @@ -784,8 +738,8 @@ public class PAssert { } /** - * A {@link SerializableFunction} that verifies that an actual value is not equal to an expected - * value. + * A {@link SerializableFunction} that verifies that an actual value is not equal to an + * expected value. */ private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> { private T expected; @@ -802,8 +756,8 @@ public class PAssert { } /** - * A {@link SerializableFunction} that verifies that an {@code Iterable} contains expected items - * in any order. + * A {@link SerializableFunction} that verifies that an {@code Iterable} contains + * expected items in any order. */ private static class AssertContainsInAnyOrder<T> implements SerializableFunction<Iterable<T>, Void> { @@ -833,9 +787,10 @@ public class PAssert { //////////////////////////////////////////////////////////// /** - * A binary predicate between types {@code Actual} and {@code Expected}. Implemented as a method - * {@code assertFor(Expected)} which returns a {@code SerializableFunction<Actual, Void>} that - * should verify the assertion.. + * A binary predicate between types {@code Actual} and {@code Expected}. + * Implemented as a method {@code assertFor(Expected)} which returns + * a {@code SerializableFunction<Actual, Void>} + * that should verify the assertion.. */ private static interface AssertRelation<ActualT, ExpectedT> extends Serializable { public SerializableFunction<ActualT, Void> assertFor(ExpectedT input); @@ -844,7 +799,8 @@ public class PAssert { /** * An {@link AssertRelation} implementing the binary predicate that two objects are equal. */ - private static class AssertIsEqualToRelation<T> implements AssertRelation<T, T> { + private static class AssertIsEqualToRelation<T> + implements AssertRelation<T, T> { @Override public SerializableFunction<T, Void> assertFor(T expected) { return new AssertIsEqualTo<T>(expected); @@ -854,7 +810,8 @@ public class PAssert { /** * An {@link AssertRelation} implementing the binary predicate that two objects are not equal. */ - private static class AssertNotEqualToRelation<T> implements AssertRelation<T, T> { + private static class AssertNotEqualToRelation<T> + implements AssertRelation<T, T> { @Override public SerializableFunction<T, Void> assertFor(T expected) { return new AssertNotEqualTo<T>(expected); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index fdc8719..f540948 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.testing; +import static org.apache.beam.sdk.testing.SerializableMatchers.anything; +import static org.apache.beam.sdk.testing.SerializableMatchers.not; + import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -148,6 +151,30 @@ public class PAssertTest implements Serializable { } /** + * Basic test of succeeding {@link PAssert} using a {@link SerializableMatcher}. + */ + @Test + @Category(RunnableOnService.class) + public void testBasicMatcherSuccess() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); + PAssert.that(pcollection).containsInAnyOrder(anything()); + pipeline.run(); + } + + /** + * Basic test of failing {@link PAssert} using a {@link SerializableMatcher}. + */ + @Test + @Category(RunnableOnService.class) + public void testBasicMatcherFailure() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); + PAssert.that(pcollection).containsInAnyOrder(not(anything())); + runExpectingAssertionFailure(pipeline); + } + + /** * Test that we throw an error at pipeline construction time when the user mistakenly uses * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}. */
