PAssert improvements.

- Captures stack trace by introducing a SerializableThrowable.
  Fixes an incorrect test of this.
- PAssert.thatSingletonIterable, thatMap/Multimap/Singleton no longer
  require that the collection is produced by a trigger that promises
  a single firing. thatSingletonIterable checks that the iterable is a
  singleton by other means. thatMap/Multimap/Singleton don't need this
  requirement at all.
  PaneExtractors.onlyPane() is now used only when the user explicitly
  specifies inOnlyPane().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30e7d15b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30e7d15b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30e7d15b

Branch: refs/heads/master
Commit: 30e7d15b3ab68d0c129dbd2be76e77346f2e1f38
Parents: 7e3f591
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Mon Sep 25 11:59:04 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Wed Sep 27 15:08:38 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 63 +++++++-------------
 .../apache/beam/sdk/testing/PaneExtractors.java | 25 +++++---
 .../beam/sdk/testing/SuccessOrFailure.java      | 41 ++++++++-----
 .../apache/beam/sdk/testing/PAssertTest.java    | 41 +++++++------
 .../beam/sdk/testing/PaneExtractorsTest.java    |  7 +--
 5 files changed, 91 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/30e7d15b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index ed80f2f..d2ad67d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -31,7 +31,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineRunner;
@@ -74,8 +73,6 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * An assertion on the contents of a {@link PCollection} incorporated into the 
pipeline. Such an
@@ -105,8 +102,6 @@ import org.slf4j.LoggerFactory;
  * <p>JUnit and Hamcrest must be linked in by any code that uses PAssert.
  */
 public class PAssert {
-
-  private static final Logger LOG = LoggerFactory.getLogger(PAssert.class);
   public static final String SUCCESS_COUNTER = "PAssertSuccess";
   public static final String FAILURE_COUNTER = "PAssertFailure";
   private static final Counter successCounter = Metrics.counter(
@@ -170,10 +165,6 @@ public class PAssert {
       return new PAssertionSite(message, new Throwable().getStackTrace());
     }
 
-    PAssertionSite() {
-      this(null, new StackTraceElement[0]);
-    }
-
     PAssertionSite(String message, StackTraceElement[] creationStackTrace) {
       this.message = message;
       this.creationStackTrace = creationStackTrace;
@@ -381,15 +372,6 @@ public class PAssert {
    */
   public static <T> IterableAssert<T> thatSingletonIterable(
       String reason, PCollection<? extends Iterable<T>> actual) {
-
-    try {
-    } catch (NoSuchElementException | IllegalArgumentException exc) {
-      throw new IllegalArgumentException(
-          "PAssert.<T>thatSingletonIterable requires a 
PCollection<Iterable<T>>"
-              + " with a Coder<Iterable<T>> where getCoderArguments() yields a"
-              + " single Coder<T> to apply to the elements.");
-    }
-
     @SuppressWarnings("unchecked") // Safe covariant cast
     PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) 
actual;
 
@@ -581,7 +563,7 @@ public class PAssert {
     @SafeVarargs
     final PCollectionContentsAssert<T> containsInAnyOrder(
         SerializableMatcher<? super T>... elementMatchers) {
-      return 
satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
+      return 
satisfies(SerializableMatchers.containsInAnyOrder(elementMatchers));
     }
 
     /**
@@ -592,7 +574,7 @@ public class PAssert {
     private PCollectionContentsAssert<T> satisfies(
         AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> 
expectedElements) {
       return satisfies(
-          new CheckRelationAgainstExpected<Iterable<T>>(
+          new CheckRelationAgainstExpected<>(
               relation, expectedElements, 
IterableCoder.of(actual.getCoder())));
     }
 
@@ -668,7 +650,10 @@ public class PAssert {
     public PCollectionSingletonIterableAssert(
         PCollection<Iterable<T>> actual, PAssertionSite site) {
       this(
-          actual, IntoGlobalWindow.<Iterable<T>>of(), 
PaneExtractors.<Iterable<T>>onlyPane(), site);
+          actual,
+          IntoGlobalWindow.<Iterable<T>>of(),
+          PaneExtractors.<Iterable<T>>allPanes(),
+          site);
     }
 
     public PCollectionSingletonIterableAssert(
@@ -753,7 +738,7 @@ public class PAssert {
     private PCollectionSingletonIterableAssert<T> satisfies(
         AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> 
expectedElements) {
       return satisfies(
-          new CheckRelationAgainstExpected<Iterable<T>>(
+          new CheckRelationAgainstExpected<>(
               relation, expectedElements, IterableCoder.of(elementCoder)));
     }
   }
@@ -777,8 +762,12 @@ public class PAssert {
         Coder<ViewT> coder,
         PAssertionSite site) {
       this(
-          actual, view, IntoGlobalWindow.<ElemT>of(), 
PaneExtractors.<ElemT>onlyPane(), coder, site
-      );
+          actual,
+          view,
+          IntoGlobalWindow.<ElemT>of(),
+          PaneExtractors.<ElemT>allPanes(),
+          coder,
+          site);
     }
 
     private PCollectionViewAssert(
@@ -798,7 +787,7 @@ public class PAssert {
 
     @Override
     public PCollectionViewAssert<ElemT, ViewT> inOnlyPane(BoundedWindow 
window) {
-      return inPane(window, PaneExtractors.<ElemT>onlyPane());
+      return inPane(window, PaneExtractors.<ElemT>onlyPane(site));
     }
 
     @Override
@@ -841,7 +830,7 @@ public class PAssert {
           .getPipeline()
           .apply(
               "PAssert$" + (assertCount++),
-              new OneSideInputAssert<ViewT>(
+              new OneSideInputAssert<>(
                   CreateActual.from(actual, rewindowActuals, paneExtractor, 
view),
                   rewindowActuals.<Integer>windowDummy(),
                   checkerFn,
@@ -857,7 +846,7 @@ public class PAssert {
      */
     private PCollectionViewAssert<ElemT, ViewT> satisfies(
         AssertRelation<ViewT, ViewT> relation, final ViewT expectedValue) {
-      return satisfies(new CheckRelationAgainstExpected<ViewT>(relation, 
expectedValue, coder));
+      return satisfies(new CheckRelationAgainstExpected<>(relation, 
expectedValue, coder));
     }
 
     /**
@@ -1224,11 +1213,7 @@ public class PAssert {
 
     @ProcessElement
     public void processElement(ProcessContext c) {
-      try {
-        c.output(doChecks(site, c.element(), checkerFn));
-      } catch (Throwable t) {
-        throw t;
-      }
+      c.output(doChecks(site, c.element(), checkerFn));
     }
   }
 
@@ -1262,13 +1247,11 @@ public class PAssert {
       PAssertionSite site,
       ActualT actualContents,
       SerializableFunction<ActualT, Void> checkerFn) {
-    SuccessOrFailure result = SuccessOrFailure.success();
     try {
       checkerFn.apply(actualContents);
+      return SuccessOrFailure.success();
     } catch (Throwable t) {
-      result = SuccessOrFailure.failure(site, t.getMessage());
-    } finally {
-      return result;
+      return SuccessOrFailure.failure(site, t);
     }
   }
 
@@ -1329,7 +1312,7 @@ public class PAssert {
     }
 
     public AssertContainsInAnyOrder(Iterable<T> expected) {
-      this(Lists.<T>newArrayList(expected));
+      this(Lists.newArrayList(expected));
     }
 
     @Override
@@ -1356,7 +1339,7 @@ public class PAssert {
   private static class AssertIsEqualToRelation<T> implements AssertRelation<T, 
T> {
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
-      return new AssertIsEqualTo<T>(expected);
+      return new AssertIsEqualTo<>(expected);
     }
   }
 
@@ -1366,7 +1349,7 @@ public class PAssert {
   private static class AssertNotEqualToRelation<T> implements 
AssertRelation<T, T> {
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
-      return new AssertNotEqualTo<T>(expected);
+      return new AssertNotEqualTo<>(expected);
     }
   }
 
@@ -1378,7 +1361,7 @@ public class PAssert {
       implements AssertRelation<Iterable<T>, Iterable<T>> {
     @Override
     public SerializableFunction<Iterable<T>, Void> assertFor(Iterable<T> 
expectedElements) {
-      return new AssertContainsInAnyOrder<T>(expectedElements);
+      return new AssertContainsInAnyOrder<>(expectedElements);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/30e7d15b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
index f88efcb..8ff35f3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.testing;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -42,8 +40,9 @@ final class PaneExtractors {
   private PaneExtractors() {
   }
 
-  static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
onlyPane() {
-    return new ExtractOnlyPane<>();
+  static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
onlyPane(
+      PAssert.PAssertionSite site) {
+    return new ExtractOnlyPane<>(site);
   }
 
   static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
onTimePane() {
@@ -68,15 +67,23 @@ final class PaneExtractors {
 
   private static class ExtractOnlyPane<T>
       extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
+    private final PAssert.PAssertionSite site;
+
+    private ExtractOnlyPane(PAssert.PAssertionSite site) {
+      this.site = site;
+    }
+
     @Override
     public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
       List<T> outputs = new ArrayList<>();
       for (ValueInSingleWindow<T> value : input) {
-        checkState(value.getPane().isFirst() && value.getPane().isLast(),
-            "Expected elements to be produced by a trigger that fires at most 
once, but got"
-                + "a value in a pane that is %s. Actual Pane Info: %s",
-            value.getPane().isFirst() ? "not the last pane" : "not the first 
pane",
-            value.getPane());
+        if (!value.getPane().isFirst() || !value.getPane().isLast()) {
+          throw site.wrap(
+              String.format(
+                  "Expected elements to be produced by a trigger that fires at 
most once, but got "
+                      + "a value %s in a pane that is %s.",
+                  value, value.getPane().isFirst() ? "not the last pane" : 
"not the first pane"));
+        }
         outputs.add(value.getValue());
       }
       return outputs;

http://git-wip-us.apache.org/repos/asf/beam/blob/30e7d15b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SuccessOrFailure.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SuccessOrFailure.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SuccessOrFailure.java
index 04e3c35..79e83d6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SuccessOrFailure.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SuccessOrFailure.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.testing;
 
 import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.DefaultCoder;
@@ -28,25 +30,36 @@ import org.apache.beam.sdk.coders.SerializableCoder;
  */
 @DefaultCoder(SerializableCoder.class)
 public final class SuccessOrFailure implements Serializable {
-  // TODO Add a SerializableThrowable. instead of relying on 
PAssertionSite.(BEAM-1898)
+  private static final class SerializableThrowable implements Serializable {
+    private final Throwable throwable;
+    private final StackTraceElement[] stackTrace;
+
+    private SerializableThrowable(Throwable t) {
+      this.throwable = t;
+      this.stackTrace = (t == null) ? null : t.getStackTrace();
+    }
+
+    private void readObject(ObjectInputStream is) throws IOException, 
ClassNotFoundException {
+      is.defaultReadObject();
+      if (throwable != null) {
+        throwable.setStackTrace(stackTrace);
+      }
+    }
+  }
 
   private final boolean isSuccess;
   @Nullable
   private final PAssert.PAssertionSite site;
   @Nullable
-  private final String message;
-
-  private SuccessOrFailure() {
-    this(true, null, null);
-  }
+  private final SerializableThrowable throwable;
 
   private SuccessOrFailure(
       boolean isSuccess,
       @Nullable PAssert.PAssertionSite site,
-      @Nullable String message) {
+      @Nullable Throwable throwable) {
     this.isSuccess = isSuccess;
     this.site = site;
-    this.message = message;
+    this.throwable = new SerializableThrowable(throwable);
   }
 
   public boolean isSuccess() {
@@ -55,7 +68,7 @@ public final class SuccessOrFailure implements Serializable {
 
   @Nullable
   public AssertionError assertionError() {
-    return  site == null ? null : site.wrap(message);
+    return site == null ? null : site.wrap(throwable.throwable);
   }
 
   public static SuccessOrFailure success() {
@@ -63,19 +76,15 @@ public final class SuccessOrFailure implements Serializable 
{
   }
 
   public static SuccessOrFailure failure(@Nullable PAssert.PAssertionSite site,
-      @Nullable String message) {
-    return new SuccessOrFailure(false, site, message);
-  }
-
-  public static SuccessOrFailure failure(@Nullable PAssert.PAssertionSite 
site) {
-    return new SuccessOrFailure(false, site, null);
+      @Nullable Throwable t) {
+    return new SuccessOrFailure(false, site, t);
   }
 
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
         .add("isSuccess", isSuccess())
-        .addValue(message)
+        .addValue(throwable)
         .omitNullValues()
         .toString();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/30e7d15b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 491f001..2a79060 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -121,27 +121,34 @@ public class PAssertTest implements Serializable {
     }
   }
 
+  private void throwNestedError() {
+    throw new RuntimeException("Nested error");
+  }
+
+  private void throwWrappedError() {
+    try {
+      throwNestedError();
+    } catch (Exception e) {
+      throw new RuntimeException("Wrapped error", e);
+    }
+  }
+
   @Test
-  public void testFailureEncodedDecoded() throws IOException {
-    AssertionError error = null;
+  public void testFailureWithExceptionEncodedDecoded() throws IOException {
+    Throwable error;
     try {
-      assertEquals(0, 1);
-    } catch (AssertionError e) {
+      throwWrappedError();
+      throw new IllegalStateException("Should have failed");
+    } catch (Throwable e) {
       error = e;
     }
-    SuccessOrFailure failure = SuccessOrFailure.failure(
-        new PAssert.PAssertionSite(error.getMessage(), error.getStackTrace()));
-    SerializableCoder<SuccessOrFailure> coder = 
SerializableCoder.of(SuccessOrFailure.class);
-
-    byte[] encoded = CoderUtils.encodeToByteArray(coder, failure);
-    SuccessOrFailure res = CoderUtils.decodeFromByteArray(coder, encoded);
-
-    // Should compare strings, because throwables are not directly comparable.
-    assertEquals("Encode-decode failed SuccessOrFailure",
-        failure.assertionError().toString(), res.assertionError().toString());
-    String resultStacktrace = 
Throwables.getStackTraceAsString(res.assertionError());
-    String failureStacktrace = 
Throwables.getStackTraceAsString(failure.assertionError());
-    assertThat(resultStacktrace, is(failureStacktrace));
+    SuccessOrFailure failure =
+        SuccessOrFailure.failure(PAssert.PAssertionSite.capture("here"), 
error);
+    SuccessOrFailure res = 
CoderUtils.clone(SerializableCoder.of(SuccessOrFailure.class), failure);
+    assertEquals(
+        "Encode-decode failed SuccessOrFailure",
+        Throwables.getStackTraceAsString(failure.assertionError()),
+        Throwables.getStackTraceAsString(res.assertionError()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/30e7d15b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
index 8801bde..1d8390e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
@@ -43,7 +43,7 @@ public class PaneExtractorsTest {
   @Test
   public void onlyPaneNoFiring() {
     SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, 
Iterable<Integer>> extractor =
-        PaneExtractors.onlyPane();
+        PaneExtractors.onlyPane(PAssert.PAssertionSite.capture(""));
     Iterable<ValueInSingleWindow<Integer>> noFiring =
         ImmutableList.of(
             ValueInSingleWindow.of(
@@ -56,7 +56,7 @@ public class PaneExtractorsTest {
   @Test
   public void onlyPaneOnlyOneFiring() {
     SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, 
Iterable<Integer>> extractor =
-        PaneExtractors.onlyPane();
+        PaneExtractors.onlyPane(PAssert.PAssertionSite.capture(""));
     Iterable<ValueInSingleWindow<Integer>> onlyFiring =
         ImmutableList.of(
             ValueInSingleWindow.of(
@@ -70,7 +70,7 @@ public class PaneExtractorsTest {
   @Test
   public void onlyPaneMultiplePanesFails() {
     SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, 
Iterable<Integer>> extractor =
-        PaneExtractors.onlyPane();
+        PaneExtractors.onlyPane(PAssert.PAssertionSite.capture(""));
     Iterable<ValueInSingleWindow<Integer>> multipleFiring =
         ImmutableList.of(
             ValueInSingleWindow.of(
@@ -89,7 +89,6 @@ public class PaneExtractorsTest {
                 GlobalWindow.INSTANCE,
                 PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)));
 
-    thrown.expect(IllegalStateException.class);
     thrown.expectMessage("trigger that fires at most once");
     extractor.apply(multipleFiring);
   }

Reply via email to