Repository: beam Updated Branches: refs/heads/master 51820cbe0 -> 07020c961
Revert "Captures assertion site and message in PAssert" This reverts commit c62611c73ab0f9a5769f3ee9b28b11e917628f78. It breaks post-commit Dataflow and Flink runners. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47592f66 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47592f66 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47592f66 Branch: refs/heads/master Commit: 47592f66222e0b8a82d4c94d14cfba38044658f4 Parents: 51820cb Author: Eugene Kirpichov <[email protected]> Authored: Wed Jan 11 12:55:09 2017 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Wed Jan 11 12:55:09 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/testing/PAssert.java | 163 ++++--------------- .../apache/beam/sdk/testing/PAssertTest.java | 44 ----- 2 files changed, 33 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/47592f66/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 89d6fea..b57f4a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -275,13 +275,8 @@ public class PAssert { /** * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}. */ - public static <T> IterableAssert<T> that(String message, PCollection<T> actual) { - return new PCollectionContentsAssert<>(PAssertionSite.capture(message), actual); - } - - /** @see #that(String, PCollection) */ public static <T> IterableAssert<T> that(PCollection<T> actual) { - return that("", actual); + return new PCollectionContentsAssert<>(actual); } /** @@ -289,7 +284,7 @@ public class PAssert { * must contain a single {@code Iterable<T>} value. */ public static <T> IterableAssert<T> thatSingletonIterable( - String message, PCollection<? extends Iterable<T>> actual) { + PCollection<? extends Iterable<T>> actual) { try { } catch (NoSuchElementException | IllegalArgumentException exc) { @@ -302,29 +297,15 @@ public class PAssert { @SuppressWarnings("unchecked") // Safe covariant cast PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual; - return new PCollectionSingletonIterableAssert<>( - PAssertionSite.capture(message), actualIterables); - } - - /** @see #thatSingletonIterable(String, PCollection) */ - public static <T> IterableAssert<T> thatSingletonIterable( - PCollection<? extends Iterable<T>> actual) { - return thatSingletonIterable("", actual); + return new PCollectionSingletonIterableAssert<>(actualIterables); } - /** - * Constructs a {@link SingletonAssert} for the value of the provided - * {@code PCollection PCollection<T>}, which must be a singleton. - */ - public static <T> SingletonAssert<T> thatSingleton(String message, PCollection<T> actual) { - return new PCollectionViewAssert<>( - PAssertionSite.capture(message), - actual, View.<T>asSingleton(), actual.getCoder()); - } - - /** @see #thatSingleton(String, PCollection) */ + /** + * Constructs a {@link SingletonAssert} for the value of the provided + * {@code PCollection PCollection<T>}, which must be a singleton. + */ public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) { - return thatSingleton("", actual); + return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder()); } /** @@ -334,90 +315,48 @@ public class PAssert { * {@code Coder<K, V>}. */ public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap( - String message, PCollection<KV<K, V>> actual) { @SuppressWarnings("unchecked") KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder(); return new PCollectionViewAssert<>( - PAssertionSite.capture(message), actual, View.<K, V>asMultimap(), MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); } - /** @see #thatMultimap(String, PCollection) */ - public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap( - PCollection<KV<K, V>> actual) { - return thatMultimap("", actual); - } - /** * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which * must have at most one value per key. * - * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any {@code Coder<K, - * V>}. + * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any + * {@code Coder<K, V>}. */ - public static <K, V> SingletonAssert<Map<K, V>> thatMap( - String message, PCollection<KV<K, V>> actual) { + public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) { @SuppressWarnings("unchecked") KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder(); return new PCollectionViewAssert<>( - PAssertionSite.capture(message), - actual, - View.<K, V>asMap(), - MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); - } - - /** @see #thatMap(String, PCollection) */ - public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) { - return thatMap("", actual); + actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); } //////////////////////////////////////////////////////////// - private static class PAssertionSite implements Serializable { - private final String message; - private final StackTraceElement[] creationStackTrace; - - static PAssertionSite capture(String message) { - return new PAssertionSite(message, new Throwable().getStackTrace()); - } - - PAssertionSite(String message, StackTraceElement[] creationStackTrace) { - this.message = message; - this.creationStackTrace = creationStackTrace; - } - - public AssertionError wrap(Throwable t) { - AssertionError res = - new AssertionError( - message.isEmpty() ? t.getMessage() : (message + ": " + t.getMessage()), t); - res.setStackTrace(creationStackTrace); - return res; - } - } - /** * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require * the runner to support side inputs. */ private static class PCollectionContentsAssert<T> implements IterableAssert<T> { - private final PAssertionSite site; private final PCollection<T> actual; private final AssertionWindows rewindowingStrategy; private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor; - public PCollectionContentsAssert(PAssertionSite site, PCollection<T> actual) { - this(site, actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes()); + public PCollectionContentsAssert(PCollection<T> actual) { + this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes()); } public PCollectionContentsAssert( - PAssertionSite site, PCollection<T> actual, AssertionWindows rewindowingStrategy, SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) { - this.site = site; this.actual = actual; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; @@ -455,7 +394,7 @@ public class PAssert { Coder<BoundedWindow> windowCoder = (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); return new PCollectionContentsAssert<>( - site, actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor); + actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor); } /** @@ -490,7 +429,7 @@ public class PAssert { SerializableFunction<Iterable<T>, Void> checkerFn) { actual.apply( nextAssertionName(), - new GroupThenAssert<>(site, checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor)); return this; } @@ -532,7 +471,7 @@ public class PAssert { (SerializableFunction) new MatcherCheckerFn<>(matcher); actual.apply( "PAssert$" + (assertCount++), - new GroupThenAssert<>(site, checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor)); return this; } @@ -579,26 +518,21 @@ public class PAssert { * This does not require the runner to support side inputs. */ private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> { - private final PAssertionSite site; private final PCollection<Iterable<T>> actual; private final Coder<T> elementCoder; private final AssertionWindows rewindowingStrategy; private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor; - public PCollectionSingletonIterableAssert( - PAssertionSite site, PCollection<Iterable<T>> actual) { - this( - site, actual, IntoGlobalWindow.<Iterable<T>>of(), PaneExtractors.<Iterable<T>>onlyPane()); + public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) { + this(actual, IntoGlobalWindow.<Iterable<T>>of(), PaneExtractors.<Iterable<T>>onlyPane()); } public PCollectionSingletonIterableAssert( - PAssertionSite site, PCollection<Iterable<T>> actual, AssertionWindows rewindowingStrategy, SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) { - this.site = site; this.actual = actual; @SuppressWarnings("unchecked") @@ -642,7 +576,7 @@ public class PAssert { Coder<BoundedWindow> windowCoder = (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); return new PCollectionSingletonIterableAssert<>( - site, actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), paneExtractor); + actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), paneExtractor); } @Override @@ -666,7 +600,7 @@ public class PAssert { SerializableFunction<Iterable<T>, Void> checkerFn) { actual.apply( "PAssert$" + (assertCount++), - new GroupThenAssertForSingleton<>(site, checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor)); return this; } @@ -683,7 +617,6 @@ public class PAssert { * of type {@code ViewT}. This requires side input support from the runner. */ private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> { - private final PAssertionSite site; private final PCollection<ElemT> actual; private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view; private final AssertionWindows rewindowActuals; @@ -692,27 +625,18 @@ public class PAssert { private final Coder<ViewT> coder; protected PCollectionViewAssert( - PAssertionSite site, PCollection<ElemT> actual, PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view, Coder<ViewT> coder) { - this( - site, - actual, - view, - IntoGlobalWindow.<ElemT>of(), - PaneExtractors.<ElemT>onlyPane(), - coder); + this(actual, view, IntoGlobalWindow.<ElemT>of(), PaneExtractors.<ElemT>onlyPane(), coder); } private PCollectionViewAssert( - PAssertionSite site, PCollection<ElemT> actual, PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view, AssertionWindows rewindowActuals, SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> paneExtractor, Coder<ViewT> coder) { - this.site = site; this.actual = actual; this.view = view; this.rewindowActuals = rewindowActuals; @@ -739,7 +663,6 @@ public class PAssert { BoundedWindow window, SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> paneExtractor) { return new PCollectionViewAssert<>( - site, actual, view, IntoStaticWindows.of( @@ -766,7 +689,6 @@ public class PAssert { .apply( "PAssert$" + (assertCount++), new OneSideInputAssert<ViewT>( - site, CreateActual.from(actual, rewindowActuals, paneExtractor, view), rewindowActuals.<Integer>windowDummy(), checkerFn)); @@ -989,17 +911,14 @@ public class PAssert { */ public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone> implements Serializable { - private final PAssertionSite site; private final SerializableFunction<Iterable<T>, Void> checkerFn; private final AssertionWindows rewindowingStrategy; private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor; private GroupThenAssert( - PAssertionSite site, SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy, SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) { - this.site = site; this.checkerFn = checkerFn; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; @@ -1011,7 +930,7 @@ public class PAssert { .apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy)) .apply("GetPane", MapElements.via(paneExtractor)) .setCoder(IterableCoder.of(input.getCoder())) - .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(site, checkerFn))); + .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn))); return PDone.in(input.getPipeline()); } @@ -1023,19 +942,16 @@ public class PAssert { */ public static class GroupThenAssertForSingleton<T> extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable { - private final PAssertionSite site; private final SerializableFunction<Iterable<T>, Void> checkerFn; private final AssertionWindows rewindowingStrategy; private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor; private GroupThenAssertForSingleton( - PAssertionSite site, SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy, SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) { - this.site = site; this.checkerFn = checkerFn; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; @@ -1047,7 +963,7 @@ public class PAssert { .apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy)) .apply("GetPane", MapElements.via(paneExtractor)) .setCoder(IterableCoder.of(input.getCoder())) - .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(site, checkerFn))); + .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn))); return PDone.in(input.getPipeline()); } @@ -1065,17 +981,14 @@ public class PAssert { */ public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone> implements Serializable { - private final PAssertionSite site; private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual; private final transient PTransform<PCollection<Integer>, PCollection<Integer>> windowToken; private final SerializableFunction<ActualT, Void> checkerFn; private OneSideInputAssert( - PAssertionSite site, PTransform<PBegin, PCollectionView<ActualT>> createActual, PTransform<PCollection<Integer>, PCollection<Integer>> windowToken, SerializableFunction<ActualT, Void> checkerFn) { - this.site = site; this.createActual = createActual; this.windowToken = windowToken; this.checkerFn = checkerFn; @@ -1090,7 +1003,7 @@ public class PAssert { .apply("WindowToken", windowToken) .apply( "RunChecks", - ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(site, checkerFn, actual))); + ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual))); return PDone.in(input.getPipeline()); } @@ -1104,7 +1017,6 @@ public class PAssert { * null values. */ private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> { - private final PAssertionSite site; private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, Sum.ofIntegers()); @@ -1113,10 +1025,7 @@ public class PAssert { private final PCollectionView<ActualT> actual; private SideInputCheckerDoFn( - PAssertionSite site, - SerializableFunction<ActualT, Void> checkerFn, - PCollectionView<ActualT> actual) { - this.site = site; + SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) { this.checkerFn = checkerFn; this.actual = actual; } @@ -1125,7 +1034,7 @@ public class PAssert { public void processElement(ProcessContext c) { try { ActualT actualContents = c.sideInput(actual); - doChecks(site, actualContents, checkerFn, success, failure); + doChecks(actualContents, checkerFn, success, failure); } catch (Throwable t) { // Suppress exception in streaming if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { @@ -1143,22 +1052,19 @@ public class PAssert { * <p>The singleton property is presumed, not enforced. */ private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> { - private final PAssertionSite site; private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, Sum.ofIntegers()); private final Aggregator<Integer, Integer> failure = createAggregator(FAILURE_COUNTER, Sum.ofIntegers()); - private GroupedValuesCheckerDoFn( - PAssertionSite site, SerializableFunction<ActualT, Void> checkerFn) { - this.site = site; + private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) { this.checkerFn = checkerFn; } @ProcessElement public void processElement(ProcessContext c) { - doChecks(site, c.element(), checkerFn, success, failure); + doChecks(c.element(), checkerFn, success, failure); } } @@ -1171,28 +1077,24 @@ public class PAssert { * each input element must be a singleton iterable, or this will fail. */ private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> { - private final PAssertionSite site; private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, Sum.ofIntegers()); private final Aggregator<Integer, Integer> failure = createAggregator(FAILURE_COUNTER, Sum.ofIntegers()); - private SingletonCheckerDoFn( - PAssertionSite site, SerializableFunction<ActualT, Void> checkerFn) { - this.site = site; + private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) { this.checkerFn = checkerFn; } @ProcessElement public void processElement(ProcessContext c) { ActualT actualContents = Iterables.getOnlyElement(c.element()); - doChecks(site, actualContents, checkerFn, success, failure); + doChecks(actualContents, checkerFn, success, failure); } } private static <ActualT> void doChecks( - PAssertionSite site, ActualT actualContents, SerializableFunction<ActualT, Void> checkerFn, Aggregator<Integer, Integer> successAggregator, @@ -1201,8 +1103,9 @@ public class PAssert { checkerFn.apply(actualContents); successAggregator.addValue(1); } catch (Throwable t) { + LOG.error("PAssert failed expectations.", t); failureAggregator.addValue(1); - throw site.wrap(t); + throw t; } } http://git-wip-us.apache.org/repos/asf/beam/blob/47592f66/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index e09f54b..1997bbe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.fasterxml.jackson.annotation.JsonCreator; -import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import java.io.IOException; import java.io.InputStream; @@ -393,49 +392,6 @@ public class PAssertTest implements Serializable { assertThat(thrown.getMessage(), containsString("Expected: iterable over [] in any order")); } - @Test - @Category(RunnableOnService.class) - public void testAssertionSiteIsCapturedWithMessage() throws Exception { - PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); - assertThatCollectionIsEmptyWithMessage(vals); - - Throwable thrown = runExpectingAssertionFailure(pipeline); - - assertThat( - thrown.getMessage(), - containsString("Should be empty")); - assertThat( - thrown.getMessage(), - containsString("Expected: iterable over [] in any order")); - String stacktrace = Throwables.getStackTraceAsString(thrown); - assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithMessage")); - assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithMessage")); - } - - @Test - @Category(RunnableOnService.class) - public void testAssertionSiteIsCapturedWithoutMessage() throws Exception { - PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); - assertThatCollectionIsEmptyWithoutMessage(vals); - - Throwable thrown = runExpectingAssertionFailure(pipeline); - - assertThat( - thrown.getMessage(), - containsString("Expected: iterable over [] in any order")); - String stacktrace = Throwables.getStackTraceAsString(thrown); - assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithoutMessage")); - assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithoutMessage")); - } - - private static void assertThatCollectionIsEmptyWithMessage(PCollection<Long> vals) { - PAssert.that("Should be empty", vals).empty(); - } - - private static void assertThatCollectionIsEmptyWithoutMessage(PCollection<Long> vals) { - PAssert.that(vals).empty(); - } - private static Throwable runExpectingAssertionFailure(Pipeline pipeline) { // We cannot use thrown.expect(AssertionError.class) because the AssertionError // is first caught by JUnit and causes a test failure.
