[ 
https://issues.apache.org/jira/browse/BEAM-6005?focusedWorklogId=163596&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163596
 ]

ASF GitHub Bot logged work on BEAM-6005:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Nov/18 19:00
            Start Date: 07/Nov/18 19:00
    Worklog Time Spent: 10m 
      Work Description: jasonkuster closed pull request #6923: [BEAM-6005] 
PCollectionCustomCoderTest updates to fix test to actually function.
URL: https://github.com/apache/beam/pull/6923
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/PCollectionCustomCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/PCollectionCustomCoderTest.java
index 9cc400b4639..28f6fe8dcdc 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/PCollectionCustomCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/PCollectionCustomCoderTest.java
@@ -17,7 +17,8 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -25,26 +26,37 @@
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Tests for coder exception handling in runners. */
 @RunWith(JUnit4.class)
 public class PCollectionCustomCoderTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PCollectionCustomCoderTest.class);
   /**
    * A custom test coder that can throw various exceptions during:
    *
@@ -58,7 +70,8 @@
   static final String NULL_POINTER_EXCEPTION = 
"java.lang.NullPointerException";
   static final String EXCEPTION_MESSAGE = "Super Unique Message!!!";
 
-  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public final transient ExpectedException thrown = 
ExpectedException.none();
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
 
   /** Wrapper of StringUtf8Coder with customizable exception-throwing. */
   public static class CustomTestCoder extends CustomCoder<String> {
@@ -148,104 +161,179 @@ private void throwIfPresent(String exceptionClassName) 
throws IOException {
   @Test
   @Category(NeedsRunner.class)
   public void testDecodingIOException() throws Exception {
-    thrown.expect(Exception.class);
-    thrown.expectCause(instanceOf(IOException.class));
     Pipeline p =
-        runPipelineWith(new CustomTestCoder(IO_EXCEPTION, null, null, null, 
EXCEPTION_MESSAGE));
+        pipelineWith(new CustomTestCoder(IO_EXCEPTION, null, null, null, 
EXCEPTION_MESSAGE));
 
+    thrown.expect(Exception.class);
+    thrown.expect(new ExceptionMatcher("java.io.IOException: Super Unique 
Message!!!"));
     p.run().waitUntilFinish();
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testDecodingNPException() throws Exception {
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("java.lang.NullPointerException: Super Unique 
Message!!!");
-
     Pipeline p =
-        runPipelineWith(
+        pipelineWith(
             new CustomTestCoder(NULL_POINTER_EXCEPTION, null, null, null, 
EXCEPTION_MESSAGE));
+    thrown.expect(Exception.class);
+    thrown.expect(new ExceptionMatcher("java.lang.NullPointerException: Super 
Unique Message!!!"));
+
+    p.run().waitUntilFinish();
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testEncodingIOException() throws Exception {
+    Pipeline p =
+        pipelineWith(new CustomTestCoder(null, IO_EXCEPTION, null, null, 
EXCEPTION_MESSAGE));
     thrown.expect(Exception.class);
-    thrown.expectCause(instanceOf(IOException.class));
+    thrown.expect(new ExceptionMatcher("java.io.IOException: Super Unique 
Message!!!"));
 
-    Pipeline p =
-        runPipelineWith(new CustomTestCoder(null, IO_EXCEPTION, null, null, 
EXCEPTION_MESSAGE));
+    p.run().waitUntilFinish();
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testEncodingNPException() throws Exception {
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("java.lang.NullPointerException: Super Unique 
Message!!!");
     Pipeline p =
-        runPipelineWith(
+        pipelineWith(
             new CustomTestCoder(null, NULL_POINTER_EXCEPTION, null, null, 
EXCEPTION_MESSAGE));
+    thrown.expect(Exception.class);
+    thrown.expect(new ExceptionMatcher("java.lang.NullPointerException: Super 
Unique Message!!!"));
+    p.run().waitUntilFinish();
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testSerializationIOException() throws Exception {
-    thrown.expect(Exception.class);
-    thrown.expectCause(instanceOf(IOException.class));
     Pipeline p =
-        runPipelineWith(new CustomTestCoder(null, null, IO_EXCEPTION, null, 
EXCEPTION_MESSAGE));
+        pipelineWith(new CustomTestCoder(null, null, IO_EXCEPTION, null, 
EXCEPTION_MESSAGE));
+    thrown.expect(Exception.class);
+    thrown.expect(new ExceptionMatcher("java.io.IOException: Super Unique 
Message!!!"));
+    p.run().waitUntilFinish();
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testSerializationNPException() throws Exception {
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("java.lang.NullPointerException: Super Unique 
Message!!!");
-
     Pipeline p =
-        runPipelineWith(
+        pipelineWith(
             new CustomTestCoder(null, null, NULL_POINTER_EXCEPTION, null, 
EXCEPTION_MESSAGE));
+    thrown.expect(Exception.class);
+    thrown.expect(new ExceptionMatcher("java.lang.NullPointerException: Super 
Unique Message!!!"));
+
+    p.run().waitUntilFinish();
   }
 
+  // TODO(BEAM-6004) Have DirectRunner trigger deserialization.
+  @Ignore("DirectRunner doesn't decode coders so this test does not pass.")
   @Test
   @Category(NeedsRunner.class)
   public void testDeserializationIOException() throws Exception {
-    thrown.expect(Exception.class);
-    thrown.expectCause(instanceOf(IOException.class));
     Pipeline p =
-        runPipelineWith(new CustomTestCoder(null, null, null, IO_EXCEPTION, 
EXCEPTION_MESSAGE));
+        pipelineWith(new CustomTestCoder(null, null, null, IO_EXCEPTION, 
EXCEPTION_MESSAGE));
+    thrown.expect(Exception.class);
+    thrown.expect(new ExceptionMatcher("java.io.IOException: Super Unique 
Message!!!"));
+    p.run().waitUntilFinish();
   }
 
+  // TODO(BEAM-6004) Have DirectRunner trigger deserialization.
+  @Ignore("DirectRunner doesn't decode coders so this test does not pass.")
   @Test
   @Category(NeedsRunner.class)
   public void testDeserializationNPException() throws Exception {
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("java.lang.NullPointerException: Super Unique 
Message!!!");
-
     Pipeline p =
-        runPipelineWith(
+        pipelineWith(
             new CustomTestCoder(null, null, null, NULL_POINTER_EXCEPTION, 
EXCEPTION_MESSAGE));
+    thrown.expect(Exception.class);
+    thrown.expect(new ExceptionMatcher("java.lang.NullPointerException: Super 
Unique Message!!!"));
+    p.run().waitUntilFinish();
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testNoException() throws Exception {
-    Pipeline p = runPipelineWith(new CustomTestCoder(null, null, null, null, 
null));
+    Pipeline p = pipelineWith(new CustomTestCoder(null, null, null, null, 
null));
+    p.run().waitUntilFinish();
   }
 
-  public static Pipeline runPipelineWith(CustomTestCoder coder) throws 
Exception {
-    PipelineOptions options = 
PipelineOptionsFactory.fromArgs().as(PipelineOptions.class);
-
+  public Pipeline pipelineWith(CustomTestCoder coder) throws Exception {
     List<String> pipelineContents =
         Arrays.asList("String", "Testing", "Custom", "Coder", "In", "Beam");
 
     // Create input.
-    Pipeline pipeline = TestPipeline.create(options);
     PCollection<String> customCoderPC =
-        pipeline.begin().apply("ReadStrings", 
Create.of(pipelineContents)).setCoder(coder);
-    PAssert.that(customCoderPC).containsInAnyOrder(pipelineContents);
-    pipeline.run().waitUntilFinish();
+        pipeline
+            .begin()
+            .apply("ReadStrings", Create.of(pipelineContents))
+            .setCoder(coder)
+            .apply(Reshuffle.viaRandomKey());
+    PCollection<String> fixedCoderPC =
+        customCoderPC.apply("Identity", ParDo.of(new IdentityDoFn()));
+    fixedCoderPC.setCoder(StringUtf8Coder.of());
+    ContentReader r = ContentReader.elementsEqual(pipelineContents);
+    // PAssert.that relies on the last coder added to the PCollection, so we
+    // need to create an identity ParDo with a valid coder.
+    PAssert.that(fixedCoderPC).satisfies(r);
 
     return pipeline;
   }
+
+  static class IdentityDoFn extends DoFn<String, String> {
+    @ProcessElement
+    public void process(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+
+  static class ContentReader implements SerializableFunction<Iterable<String>, 
Void> {
+    private final String[] expected;
+
+    public static ContentReader elementsEqual(Iterable<String> expected) {
+      return new ContentReader(expected);
+    }
+
+    private ContentReader(Iterable<String> expected) {
+      ArrayList<String> ret = new ArrayList<>();
+      for (String t : expected) {
+        ret.add(t);
+      }
+      this.expected = ret.toArray(new String[ret.size()]);
+    }
+
+    @Override
+    public Void apply(Iterable<String> contents) {
+      assertThat(contents, containsInAnyOrder(expected));
+      return null;
+    }
+  }
+
+  static class ExceptionMatcher extends TypeSafeMatcher<Throwable> {
+    private String expectedError;
+
+    public ExceptionMatcher(String expected) {
+      this.expectedError = expected;
+    }
+
+    @Override
+    public boolean matchesSafely(Throwable result) {
+      if (result.toString().contains(expectedError)) {
+        return true;
+      }
+      Throwable cause = result.getCause();
+      while (null != cause) {
+        String causeString = cause.toString();
+        if (causeString.contains(expectedError)) {
+          return true;
+        }
+        cause = cause.getCause();
+      }
+      return false;
+    }
+
+    @Override
+    public void describeTo(Description descr) {
+      descr.appendText("exception with text matching: " + expectedError);
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 163596)
    Time Spent: 1h 20m  (was: 1h 10m)

> PCollectionCustomCoderTest passes spuriously.
> ---------------------------------------------
>
>                 Key: BEAM-6005
>                 URL: https://issues.apache.org/jira/browse/BEAM-6005
>             Project: Beam
>          Issue Type: Bug
>          Components: testing
>            Reporter: Jason Kuster
>            Assignee: Jason Kuster
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The test assertions trigger before the coder is used on the actual runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to