Base PAssert on GBK instead of side inputs Previously PAssert - hence all RunnableOnService/NeedsRunner tests - required side input support. This created a very steep on ramp for new runners.
GroupByKey is a bit more fundamental and most backends will be able to group by key in the global window very quickly. So switching the primitive used to gather all the contents of a PCollection for assertions should make it a bit easier to get early feedback during runner development. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/810ffeb2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/810ffeb2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/810ffeb2 Branch: refs/heads/master Commit: 810ffeb2785bf996001c8fadb992410d1f9409c6 Parents: d6adbbf Author: Kenneth Knowles <[email protected]> Authored: Wed Jun 8 15:07:52 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 9 14:41:09 2016 -0700 ---------------------------------------------------------------------- .../testing/TestDataflowPipelineRunner.java | 3 +- .../org/apache/beam/sdk/testing/PAssert.java | 737 +++++++++---------- .../apache/beam/sdk/testing/PAssertTest.java | 27 - 3 files changed, 362 insertions(+), 405 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java index 3e8d903..c940e9a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java @@ -166,7 +166,8 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ public <OutputT extends POutput, InputT extends PInput> OutputT apply( PTransform<InputT, OutputT> transform, InputT input) { if (transform instanceof PAssert.OneSideInputAssert - || transform instanceof PAssert.TwoSideInputAssert) { + || transform instanceof PAssert.GroupThenAssert + || transform instanceof PAssert.GroupThenAssertForSingleton) { expectedNumberOfAssertions += 1; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index c2cd598..b10c1cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -34,11 +34,14 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; @@ -48,32 +51,27 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.NoSuchElementException; /** - * An assertion on the contents of a {@link PCollection} - * incorporated into the pipeline. Such an assertion - * can be checked no matter what kind of {@link PipelineRunner} is - * used. + * An assertion on the contents of a {@link PCollection} incorporated into the pipeline. Such an + * assertion can be checked no matter what kind of {@link PipelineRunner} is used. * - * <p>Note that the {@code PAssert} call must precede the call - * to {@link Pipeline#run}. + * <p>Note that the {@code PAssert} call must precede the call to {@link Pipeline#run}. * - * <p>Examples of use: - * <pre>{@code + * <p>Examples of use: <pre>{@code * Pipeline p = TestPipeline.create(); * ... * PCollection<String> output = @@ -107,30 +105,84 @@ public class PAssert { private PAssert() {} /** - * Constructs an {@link IterableAssert} for the elements of the provided - * {@link PCollection}. + * Builder interface for assertions applicable to iterables and PCollection contents. + */ + public interface IterableAssert<T> { + + /** + * Asserts that the iterable in question contains the provided elements. + * + * @return the same {@link IterableAssert} builder for further assertions + */ + IterableAssert<T> containsInAnyOrder(T... expectedElements); + + /** + * Asserts that the iterable in question contains the provided elements. + * + * @return the same {@link IterableAssert} builder for further assertions + */ + IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements); + + /** + * Asserts that the iterable in question is empty. + * + * @return the same {@link IterableAssert} builder for further assertions + */ + IterableAssert<T> empty(); + + /** + * Applies the provided checking function (presumably containing assertions) to the + * iterable in question. + * + * @return the same {@link IterableAssert} builder for further assertions + */ + IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn); + } + + /** + * Builder interface for assertions applicable to a single value. + */ + public interface SingletonAssert<T> { + /** + * Asserts that the value in question is equal to the provided value, according to + * {@link Object#equals}. + * + * @return the same {@link SingletonAssert} builder for further assertions + */ + SingletonAssert<T> isEqualTo(T expected); + + /** + * Asserts that the value in question is not equal to the provided value, according + * to {@link Object#equals}. + * + * @return the same {@link SingletonAssert} builder for further assertions + */ + SingletonAssert<T> notEqualTo(T notExpected); + + /** + * Applies the provided checking function (presumably containing assertions) to the + * value in question. + * + * @return the same {@link SingletonAssert} builder for further assertions + */ + SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn); + } + + /** + * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}. */ public static <T> IterableAssert<T> that(PCollection<T> actual) { - return new IterableAssert<>( - new CreateActual<T, Iterable<T>>(actual, View.<T>asIterable()), - actual.getPipeline()) - .setCoder(actual.getCoder()); + return new PCollectionContentsAssert<>(actual); } /** - * Constructs an {@link IterableAssert} for the value of the provided - * {@link PCollection} which must contain a single {@code Iterable<T>} - * value. + * Constructs an {@link IterableAssert} for the value of the provided {@link PCollection} which + * must contain a single {@code Iterable<T>} value. */ - public static <T> IterableAssert<T> - thatSingletonIterable(PCollection<? extends Iterable<T>> actual) { + public static <T> IterableAssert<T> thatSingletonIterable( + PCollection<? extends Iterable<T>> actual) { - List<? extends Coder<?>> maybeElementCoder = actual.getCoder().getCoderArguments(); - Coder<T> tCoder; try { - @SuppressWarnings("unchecked") - Coder<T> tCoderTmp = (Coder<T>) Iterables.getOnlyElement(maybeElementCoder); - tCoder = tCoderTmp; } catch (NoSuchElementException | IllegalArgumentException exc) { throw new IllegalArgumentException( "PAssert.<T>thatSingletonIterable requires a PCollection<Iterable<T>>" @@ -141,19 +193,7 @@ public class PAssert { @SuppressWarnings("unchecked") // Safe covariant cast PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual; - return new IterableAssert<>( - new CreateActual<Iterable<T>, Iterable<T>>( - actualIterables, View.<Iterable<T>>asSingleton()), - actual.getPipeline()) - .setCoder(tCoder); - } - - /** - * Constructs an {@link IterableAssert} for the value of the provided - * {@code PCollectionView PCollectionView<Iterable<T>>}. - */ - public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> actual) { - return new IterableAssert<>(new PreExisting<Iterable<T>>(actual), actual.getPipeline()); + return new PCollectionSingletonIterableAssert<>(actualIterables); } /** @@ -161,93 +201,95 @@ public class PAssert { * {@code PCollection PCollection<T>}, which must be a singleton. */ public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) { - return new SingletonAssert<>( - new CreateActual<T, T>(actual, View.<T>asSingleton()), actual.getPipeline()) - .setCoder(actual.getCoder()); + return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder()); } /** * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}. * - * <p>Note that the actual value must be coded by a {@link KvCoder}, - * not just any {@code Coder<K, V>}. + * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any + * {@code Coder<K, V>}. */ - public static <K, V> SingletonAssert<Map<K, Iterable<V>>> - thatMultimap(PCollection<KV<K, V>> actual) { + public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap( + PCollection<KV<K, V>> actual) { @SuppressWarnings("unchecked") KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder(); - - return new SingletonAssert<>( - new CreateActual<>(actual, View.<K, V>asMultimap()), actual.getPipeline()) - .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); + return new PCollectionViewAssert<>( + actual, + View.<K, V>asMultimap(), + MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); } /** - * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, - * which must have at most one value per key. + * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which + * must have at most one value per key. * - * <p>Note that the actual value must be coded by a {@link KvCoder}, - * not just any {@code Coder<K, V>}. + * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any + * {@code Coder<K, V>}. */ public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) { @SuppressWarnings("unchecked") KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder(); - - return new SingletonAssert<>( - new CreateActual<>(actual, View.<K, V>asMap()), actual.getPipeline()) - .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); + return new PCollectionViewAssert<>( + actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); } //////////////////////////////////////////////////////////// /** - * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable<T>}. + * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require + * the runner to support side inputs. */ - public static class IterableAssert<T> implements Serializable { - private final Pipeline pipeline; - private final PTransform<PBegin, PCollectionView<Iterable<T>>> createActual; - private Optional<Coder<T>> coder; + private static class PCollectionContentsAssert<T> implements IterableAssert<T> { + private final PCollection<T> actual; - protected IterableAssert( - PTransform<PBegin, PCollectionView<Iterable<T>>> createActual, Pipeline pipeline) { - this.createActual = createActual; - this.pipeline = pipeline; - this.coder = Optional.absent(); + public PCollectionContentsAssert(PCollection<T> actual) { + this.actual = actual; } /** - * Sets the coder to use for elements of type {@code T}, as needed for internal purposes. + * Checks that the {@code Iterable} contains the expected elements, in any order. * * <p>Returns this {@code IterableAssert}. */ - public IterableAssert<T> setCoder(Coder<T> coderOrNull) { - this.coder = Optional.fromNullable(coderOrNull); - return this; + @Override + @SafeVarargs + public final PCollectionContentsAssert<T> containsInAnyOrder(T... expectedElements) { + return containsInAnyOrder(Arrays.asList(expectedElements)); } /** - * Gets the coder, which may yet be absent. + * Checks that the {@code Iterable} contains the expected elements, in any order. + * + * <p>Returns this {@code IterableAssert}. */ - public Coder<T> getCoder() { - if (coder.isPresent()) { - return coder.get(); - } else { - throw new IllegalStateException( - "Attempting to access the coder of an IterableAssert" - + " that has not been set yet."); - } + @Override + public PCollectionContentsAssert<T> containsInAnyOrder(Iterable<T> expectedElements) { + return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements); + } + + @Override + public PCollectionContentsAssert<T> empty() { + return containsInAnyOrder(Collections.<T>emptyList()); + } + + @Override + public PCollectionContentsAssert<T> satisfies( + SerializableFunction<Iterable<T>, Void> checkerFn) { + actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); + return this; } /** - * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}. + * Checks that the {@code Iterable} contains elements that match the provided matchers, in any + * order. * * <p>Returns this {@code IterableAssert}. */ - public IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn) { - pipeline.apply( - "PAssert$" + (assertCount++), - new OneSideInputAssert<Iterable<T>>(createActual, checkerFn)); - return this; + @SafeVarargs + final PCollectionContentsAssert<T> containsInAnyOrder( + SerializableMatcher<? super T>... elementMatchers) { + return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers)); } /** @@ -255,17 +297,11 @@ public class PAssert { * * <p>Returns this {@code IterableAssert}. */ - public IterableAssert<T> satisfies( - AssertRelation<Iterable<T>, Iterable<T>> relation, - final Iterable<T> expectedElements) { - pipeline.apply( - "PAssert$" + (assertCount++), - new TwoSideInputAssert<Iterable<T>, Iterable<T>>( - createActual, - new CreateExpected<T, Iterable<T>>(expectedElements, coder, View.<T>asIterable()), - relation)); - - return this; + private PCollectionContentsAssert<T> satisfies( + AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) { + return satisfies( + new CheckRelationAgainstExpected<Iterable<T>>( + relation, expectedElements, IterableCoder.of(actual.getCoder()))); } /** @@ -273,15 +309,14 @@ public class PAssert { * * <p>Returns this {@code IterableAssert}. */ - IterableAssert<T> satisfies(final SerializableMatcher<Iterable<? extends T>> matcher) { + PCollectionContentsAssert<T> satisfies( + final SerializableMatcher<Iterable<? extends T>> matcher) { // Safe covariant cast. Could be elided by changing a lot of this file to use // more flexible bounds. @SuppressWarnings({"rawtypes", "unchecked"}) SerializableFunction<Iterable<T>, Void> checkerFn = - (SerializableFunction) new MatcherCheckerFn<>(matcher); - pipeline.apply( - "PAssert$" + (assertCount++), - new OneSideInputAssert<Iterable<T>>(createActual, checkerFn)); + (SerializableFunction) new MatcherCheckerFn<>(matcher); + actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); return this; } @@ -300,19 +335,9 @@ public class PAssert { } /** - * Checks that the {@code Iterable} is empty. - * - * <p>Returns this {@code IterableAssert}. - */ - public IterableAssert<T> empty() { - return satisfies(new AssertContainsInAnyOrderRelation<T>(), Collections.<T>emptyList()); - } - - /** * @throws UnsupportedOperationException always - * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. - * If you meant to test object equality, use a variant of {@link #containsInAnyOrder} - * instead. + * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If you meant + * to test object equality, use a variant of {@link #containsInAnyOrder} instead. */ @Deprecated @Override @@ -331,169 +356,129 @@ public class PAssert { throw new UnsupportedOperationException( String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName())); } + } - /** - * Checks that the {@code Iterable} contains the expected elements, in any - * order. - * - * <p>Returns this {@code IterableAssert}. - */ - public IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) { - return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements); - } + /** + * An {@link IterableAssert} for an iterable that is the sole element of a {@link PCollection}. + * This does not require the runner to support side inputs. + */ + private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> { + private final PCollection<Iterable<T>> actual; + private final Coder<T> elementCoder; - /** - * Checks that the {@code Iterable} contains the expected elements, in any - * order. - * - * <p>Returns this {@code IterableAssert}. - */ - @SafeVarargs - public final IterableAssert<T> containsInAnyOrder(T... expectedElements) { - return satisfies( - new AssertContainsInAnyOrderRelation<T>(), - Arrays.asList(expectedElements)); + public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) { + this.actual = actual; + + @SuppressWarnings("unchecked") + Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0); + this.elementCoder = typedCoder; } - /** - * Checks that the {@code Iterable} contains elements that match the provided matchers, - * in any order. - * - * <p>Returns this {@code IterableAssert}. - */ + @Override @SafeVarargs - final IterableAssert<T> containsInAnyOrder( - SerializableMatcher<? super T>... elementMatchers) { - return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers)); + public final PCollectionSingletonIterableAssert<T> containsInAnyOrder(T... expectedElements) { + return containsInAnyOrder(Arrays.asList(expectedElements)); } - } - /** - * An assertion about the single value of type {@code T} - * associated with a {@link PCollectionView}. - */ - public static class SingletonAssert<T> implements Serializable { - private final Pipeline pipeline; - private final CreateActual<?, T> createActual; - private Optional<Coder<T>> coder; - - protected SingletonAssert( - CreateActual<?, T> createActual, Pipeline pipeline) { - this.pipeline = pipeline; - this.createActual = createActual; - this.coder = Optional.absent(); + @Override + public PCollectionSingletonIterableAssert<T> empty() { + return containsInAnyOrder(Collections.<T>emptyList()); } - /** - * Always throws an {@link UnsupportedOperationException}: users are probably looking for - * {@link #isEqualTo}. - */ - @Deprecated @Override - public boolean equals(Object o) { - throw new UnsupportedOperationException( - String.format( - "tests for Java equality of the %s object, not the PCollection in question. " - + "Call a test method, such as isEqualTo.", - getClass().getSimpleName())); + public PCollectionSingletonIterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) { + return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements); } - /** - * @throws UnsupportedOperationException always. - * @deprecated {@link Object#hashCode()} is not supported on PAssert objects. - */ - @Deprecated @Override - public int hashCode() { - throw new UnsupportedOperationException( - String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName())); + public PCollectionSingletonIterableAssert<T> satisfies( + SerializableFunction<Iterable<T>, Void> checkerFn) { + actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn)); + return this; } - /** - * Sets the coder to use for elements of type {@code T}, as needed - * for internal purposes. - * - * <p>Returns this {@code IterableAssert}. - */ - public SingletonAssert<T> setCoder(Coder<T> coderOrNull) { - this.coder = Optional.fromNullable(coderOrNull); - return this; + private PCollectionSingletonIterableAssert<T> satisfies( + AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) { + return satisfies( + new CheckRelationAgainstExpected<Iterable<T>>( + relation, expectedElements, IterableCoder.of(elementCoder))); } + } - /** - * Gets the coder, which may yet be absent. - */ - public Coder<T> getCoder() { - if (coder.isPresent()) { - return coder.get(); - } else { - throw new IllegalStateException( - "Attempting to access the coder of an IterableAssert that has not been set yet."); - } + /** + * An assertion about the contents of a {@link PCollection} when it is viewed as a single value + * of type {@code ViewT}. This requires side input support from the runner. + */ + private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> { + private final PCollection<ElemT> actual; + private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view; + private final Coder<ViewT> coder; + + protected PCollectionViewAssert( + PCollection<ElemT> actual, + PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view, + Coder<ViewT> coder) { + this.actual = actual; + this.view = view; + this.coder = coder; } - /** - * Applies a {@link SerializableFunction} to check the value of this - * {@code SingletonAssert}'s view. - * - * <p>Returns this {@code SingletonAssert}. - */ - public SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) { - pipeline.apply( - "PAssert$" + (assertCount++), new OneSideInputAssert<T>(createActual, checkerFn)); - return this; + @Override + public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) { + return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue); } - /** - * Applies an {@link AssertRelation} to check the provided relation against the - * value of this assert and the provided expected value. - * - * <p>Returns this {@code SingletonAssert}. - */ - public SingletonAssert<T> satisfies( - AssertRelation<T, T> relation, - final T expectedValue) { - pipeline.apply( - "PAssert$" + (assertCount++), - new TwoSideInputAssert<T, T>( - createActual, - new CreateExpected<T, T>(Arrays.asList(expectedValue), coder, View.<T>asSingleton()), - relation)); + @Override + public PCollectionViewAssert<ElemT, ViewT> notEqualTo(ViewT expectedValue) { + return satisfies(new AssertNotEqualToRelation<ViewT>(), expectedValue); + } + @Override + public PCollectionViewAssert<ElemT, ViewT> satisfies( + SerializableFunction<ViewT, Void> checkerFn) { + actual + .getPipeline() + .apply( + "PAssert$" + (assertCount++), + new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn)); return this; } /** - * Checks that the value of this {@code SingletonAssert}'s view is equal - * to the expected value. + * Applies an {@link AssertRelation} to check the provided relation against the value of this + * assert and the provided expected value. * * <p>Returns this {@code SingletonAssert}. */ - public SingletonAssert<T> isEqualTo(T expectedValue) { - return satisfies(new AssertIsEqualToRelation<T>(), expectedValue); + private PCollectionViewAssert<ElemT, ViewT> satisfies( + AssertRelation<ViewT, ViewT> relation, final ViewT expectedValue) { + return satisfies(new CheckRelationAgainstExpected<ViewT>(relation, expectedValue, coder)); } /** - * Checks that the value of this {@code SingletonAssert}'s view is not equal - * to the expected value. - * - * <p>Returns this {@code SingletonAssert}. + * Always throws an {@link UnsupportedOperationException}: users are probably looking for + * {@link #isEqualTo}. */ - public SingletonAssert<T> notEqualTo(T expectedValue) { - return satisfies(new AssertNotEqualToRelation<T>(), expectedValue); + @Deprecated + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException( + String.format( + "tests for Java equality of the %s object, not the PCollection in question. " + + "Call a test method, such as isEqualTo.", + getClass().getSimpleName())); } /** - * Checks that the value of this {@code SingletonAssert}'s view is equal to - * the expected value. - * - * @deprecated replaced by {@link #isEqualTo} + * @throws UnsupportedOperationException always. + * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects. */ @Deprecated - public SingletonAssert<T> is(T expectedValue) { - return isEqualTo(expectedValue); + @Override + public int hashCode() { + throw new UnsupportedOperationException( + String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName())); } - } //////////////////////////////////////////////////////////////////////// @@ -504,8 +489,13 @@ public class PAssert { private final transient PCollection<T> actual; private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView; - private CreateActual(PCollection<T> actual, - PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) { + public static <T, ActualT> CreateActual<T, ActualT> from( + PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) { + return new CreateActual<>(actual, actualView); + } + + private CreateActual( + PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) { this.actual = actual; this.actualView = actualView; } @@ -515,73 +505,145 @@ public class PAssert { final Coder<T> coder = actual.getCoder(); return actual .apply(Window.<T>into(new GlobalWindows())) - .apply(ParDo.of(new DoFn<T, T>() { - @Override - public void processElement(ProcessContext context) throws CoderException { - context.output(CoderUtils.clone(coder, context.element())); - } - })) + .apply( + ParDo.of( + new DoFn<T, T>() { + @Override + public void processElement(ProcessContext context) throws CoderException { + context.output(CoderUtils.clone(coder, context.element())); + } + })) .apply(actualView); } } - private static class CreateExpected<T, ExpectedT> - extends PTransform<PBegin, PCollectionView<ExpectedT>> { - - private final Iterable<T> elements; - private final Optional<Coder<T>> coder; - private final transient PTransform<PCollection<T>, PCollectionView<ExpectedT>> view; + /** + * A partially applied {@link AssertRelation}, where one value is provided along with a coder to + * serialize/deserialize them. + */ + private static class CheckRelationAgainstExpected<T> implements SerializableFunction<T, Void> { + private final AssertRelation<T, T> relation; + private final byte[] encodedExpected; + private final Coder<T> coder; - private CreateExpected(Iterable<T> elements, Optional<Coder<T>> coder, - PTransform<PCollection<T>, PCollectionView<ExpectedT>> view) { - this.elements = elements; + public CheckRelationAgainstExpected(AssertRelation<T, T> relation, T expected, Coder<T> coder) { + this.relation = relation; this.coder = coder; - this.view = view; + + try { + this.encodedExpected = CoderUtils.encodeToByteArray(coder, expected); + } catch (IOException coderException) { + throw new RuntimeException(coderException); + } } @Override - public PCollectionView<ExpectedT> apply(PBegin input) { - Create.Values<T> createTransform = Create.<T>of(elements); - if (coder.isPresent()) { - createTransform = createTransform.withCoder(coder.get()); + public Void apply(T actual) { + try { + T expected = CoderUtils.decodeFromByteArray(coder, encodedExpected); + return relation.assertFor(expected).apply(actual); + } catch (IOException coderException) { + throw new RuntimeException(coderException); } - return input.apply(createTransform).apply(view); } } - private static class PreExisting<T> extends PTransform<PBegin, PCollectionView<T>> { + /** + * A transform that gathers the contents of a {@link PCollection} into a single main input + * iterable in the global window. This requires a runner to support {@link GroupByKey} in the + * global window, but not side inputs or other windowing or triggers. + */ + private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>> + implements Serializable { - private final PCollectionView<T> view; + public GroupGlobally() {} - private PreExisting(PCollectionView<T> view) { - this.view = view; + @Override + public PCollection<Iterable<T>> apply(PCollection<T> input) { + return input + .apply("GloballyWindow", Window.<T>into(new GlobalWindows())) + .apply("DummyKey", WithKeys.<Integer, T>of(0)) + .apply("GroupByKey", GroupByKey.<Integer, T>create()) + .apply("GetOnlyValue", Values.<Iterable<T>>create()); + } + } + + /** + * A transform that applies an assertion-checking function over iterables of {@code ActualT} to + * the entirety of the contents of its input. + */ + public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone> + implements Serializable { + private final SerializableFunction<Iterable<T>, Void> checkerFn; + + private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) { + this.checkerFn = checkerFn; } @Override - public PCollectionView<T> apply(PBegin input) { - return view; + public PDone apply(PCollection<T> input) { + input + .apply("GroupGlobally", new GroupGlobally<T>()) + .apply( + "RunChecks", + ParDo.of( + new DoFn<Iterable<T>, Void>() { + @Override + public void processElement(ProcessContext context) { + checkerFn.apply(context.element()); + } + })); + + return PDone.in(input.getPipeline()); } } /** - * An assertion checker that takes a single - * {@link PCollectionView PCollectionView<ActualT>} - * and an assertion over {@code ActualT}, and checks it within a dataflow - * pipeline. + * A transform that applies an assertion-checking function to a single iterable contained as the + * sole element of a {@link PCollection}. + */ + public static class GroupThenAssertForSingleton<T> + extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable { + private final SerializableFunction<Iterable<T>, Void> checkerFn; + + private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) { + this.checkerFn = checkerFn; + } + + @Override + public PDone apply(PCollection<Iterable<T>> input) { + input + .apply("GroupGlobally", new GroupGlobally<Iterable<T>>()) + .apply( + "RunChecks", + ParDo.of( + new DoFn<Iterable<Iterable<T>>, Void>() { + @Override + public void processElement(ProcessContext context) { + checkerFn.apply(Iterables.getOnlyElement(context.element())); + } + })); + + return PDone.in(input.getPipeline()); + } + } + + /** + * An assertion checker that takes a single {@link PCollectionView + * PCollectionView<ActualT>} and an assertion over {@code ActualT}, and checks it within a + * Beam pipeline. * - * <p>Note that the entire assertion must be serializable. If - * you need to make assertions involving multiple inputs - * that are each not serializable, use TwoSideInputAssert. + * <p>Note that the entire assertion must be serializable. * - * <p>This is generally useful for assertion functions that - * are serializable but whose underlying data may not have a coder. + * <p>This is generally useful for assertion functions that are serializable but whose underlying + * data may not have a coder. */ - public static class OneSideInputAssert<ActualT> - extends PTransform<PBegin, PDone> implements Serializable { + public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone> + implements Serializable { private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual; private final SerializableFunction<ActualT, Void> checkerFn; - public OneSideInputAssert( + private OneSideInputAssert( PTransform<PBegin, PCollectionView<ActualT>> createActual, SerializableFunction<ActualT, Void> checkerFn) { this.createActual = createActual; @@ -594,16 +656,18 @@ public class PAssert { input .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply(ParDo.named("RunChecks").withSideInputs(actual) - .of(new CheckerDoFn<>(checkerFn, actual))); + .apply( + ParDo.named("RunChecks") + .withSideInputs(actual) + .of(new CheckerDoFn<>(checkerFn, actual))); return PDone.in(input.getPipeline()); } } /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of - * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a + * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. * * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support * null values. @@ -617,8 +681,7 @@ public class PAssert { private final PCollectionView<ActualT> actual; private CheckerDoFn( - SerializableFunction<ActualT, Void> checkerFn, - PCollectionView<ActualT> actual) { + SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) { this.checkerFn = checkerFn; this.actual = actual; } @@ -640,88 +703,11 @@ public class PAssert { } } - /** - * An assertion checker that takes a {@link PCollectionView PCollectionView<ActualT>}, - * a {@link PCollectionView PCollectionView<ExpectedT>}, a relation - * over {@code A} and {@code B}, and checks that the relation holds - * within a dataflow pipeline. - * - * <p>This is useful when either/both of {@code A} and {@code B} - * are not serializable, but have coders (provided - * by the underlying {@link PCollection}s). - */ - public static class TwoSideInputAssert<ActualT, ExpectedT> - extends PTransform<PBegin, PDone> implements Serializable { - - private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual; - private final transient PTransform<PBegin, PCollectionView<ExpectedT>> createExpected; - private final AssertRelation<ActualT, ExpectedT> relation; - - protected TwoSideInputAssert( - PTransform<PBegin, PCollectionView<ActualT>> createActual, - PTransform<PBegin, PCollectionView<ExpectedT>> createExpected, - AssertRelation<ActualT, ExpectedT> relation) { - this.createActual = createActual; - this.createExpected = createExpected; - this.relation = relation; - } - - @Override - public PDone apply(PBegin input) { - final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual); - final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected); - - input - .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply("RunChecks", ParDo.withSideInputs(actual, expected) - .of(new CheckerDoFn<>(relation, actual, expected))); - - return PDone.in(input.getPipeline()); - } - - /** - * Input is ignored, but is {@link Integer} for runners that do not support null values. - */ - private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Integer, Void> { - private final Aggregator<Integer, Integer> success = - createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); - private final Aggregator<Integer, Integer> failure = - createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn()); - private final AssertRelation<ActualT, ExpectedT> relation; - private final PCollectionView<ActualT> actual; - private final PCollectionView<ExpectedT> expected; - - private CheckerDoFn(AssertRelation<ActualT, ExpectedT> relation, - PCollectionView<ActualT> actual, PCollectionView<ExpectedT> expected) { - this.relation = relation; - this.actual = actual; - this.expected = expected; - } - - @Override - public void processElement(ProcessContext c) { - try { - ActualT actualContents = c.sideInput(actual); - ExpectedT expectedContents = c.sideInput(expected); - relation.assertFor(expectedContents).apply(actualContents); - success.addValue(1); - } catch (Throwable t) { - LOG.error("PAssert failed expectations.", t); - failure.addValue(1); - // TODO: allow for metrics to propagate on failure when running a streaming pipeline - if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { - throw t; - } - } - } - } - } - ///////////////////////////////////////////////////////////////////////////// /** - * A {@link SerializableFunction} that verifies that an actual value is equal to an - * expected value. + * A {@link SerializableFunction} that verifies that an actual value is equal to an expected + * value. */ private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> { private T expected; @@ -738,8 +724,8 @@ public class PAssert { } /** - * A {@link SerializableFunction} that verifies that an actual value is not equal to an - * expected value. + * A {@link SerializableFunction} that verifies that an actual value is not equal to an expected + * value. */ private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> { private T expected; @@ -756,8 +742,8 @@ public class PAssert { } /** - * A {@link SerializableFunction} that verifies that an {@code Iterable} contains - * expected items in any order. + * A {@link SerializableFunction} that verifies that an {@code Iterable} contains expected items + * in any order. */ private static class AssertContainsInAnyOrder<T> implements SerializableFunction<Iterable<T>, Void> { @@ -787,10 +773,9 @@ public class PAssert { //////////////////////////////////////////////////////////// /** - * A binary predicate between types {@code Actual} and {@code Expected}. - * Implemented as a method {@code assertFor(Expected)} which returns - * a {@code SerializableFunction<Actual, Void>} - * that should verify the assertion.. + * A binary predicate between types {@code Actual} and {@code Expected}. Implemented as a method + * {@code assertFor(Expected)} which returns a {@code SerializableFunction<Actual, Void>} that + * should verify the assertion.. */ private static interface AssertRelation<ActualT, ExpectedT> extends Serializable { public SerializableFunction<ActualT, Void> assertFor(ExpectedT input); @@ -799,8 +784,7 @@ public class PAssert { /** * An {@link AssertRelation} implementing the binary predicate that two objects are equal. */ - private static class AssertIsEqualToRelation<T> - implements AssertRelation<T, T> { + private static class AssertIsEqualToRelation<T> implements AssertRelation<T, T> { @Override public SerializableFunction<T, Void> assertFor(T expected) { return new AssertIsEqualTo<T>(expected); @@ -810,8 +794,7 @@ public class PAssert { /** * An {@link AssertRelation} implementing the binary predicate that two objects are not equal. */ - private static class AssertNotEqualToRelation<T> - implements AssertRelation<T, T> { + private static class AssertNotEqualToRelation<T> implements AssertRelation<T, T> { @Override public SerializableFunction<T, Void> assertFor(T expected) { return new AssertNotEqualTo<T>(expected); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index f540948..fdc8719 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -17,9 +17,6 @@ */ package org.apache.beam.sdk.testing; -import static org.apache.beam.sdk.testing.SerializableMatchers.anything; -import static org.apache.beam.sdk.testing.SerializableMatchers.not; - import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -151,30 +148,6 @@ public class PAssertTest implements Serializable { } /** - * Basic test of succeeding {@link PAssert} using a {@link SerializableMatcher}. - */ - @Test - @Category(RunnableOnService.class) - public void testBasicMatcherSuccess() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); - PAssert.that(pcollection).containsInAnyOrder(anything()); - pipeline.run(); - } - - /** - * Basic test of failing {@link PAssert} using a {@link SerializableMatcher}. - */ - @Test - @Category(RunnableOnService.class) - public void testBasicMatcherFailure() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); - PAssert.that(pcollection).containsInAnyOrder(not(anything())); - runExpectingAssertionFailure(pipeline); - } - - /** * Test that we throw an error at pipeline construction time when the user mistakenly uses * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}. */
