cameronlee314 commented on a change in pull request #1510:
URL: https://github.com/apache/samza/pull/1510#discussion_r683680001



##########
File path: 
samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
##########
@@ -68,66 +68,97 @@
       new PageView("3", "profile-page", "0"),
       new PageView("4", LOGIN_PAGE, "0"));
 
-
   @Test
   public void testProcessingFutureCompletesSuccessfully() {
     List<PageView> expectedPageViews = PAGE_VIEWS.stream()
-        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && 
Long.valueOf(pageView.getUserId()) > 0)
+        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && 
Long.parseLong(pageView.getUserId()) > 0)
         .collect(Collectors.toList());
 
-    List<PageView> actualPageViews = runTest(PAGE_VIEWS, new HashMap<>());
+    List<PageView> actualPageViews = runTest(new HashMap<>());
     assertEquals("Mismatch between expected vs actual page views", 
expectedPageViews, actualPageViews);
   }
 
-  @Test(expected = SamzaException.class)
+  @Test
   public void testProcessingFutureCompletesAfterTaskTimeout() {
     Map<String, String> configs = new HashMap<>();
     configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
     configs.put(PROCESS_JITTER, "200");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a task callback timeout");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The timeout message is
+       * nested within a bunch of other exceptions.
+       */
+      Throwable rootCause = findRootCause(e);
+      assertTrue(rootCause instanceof SamzaException);
+      // the "{}" is intentional, since the exception message actually 
includes it (probably a logging bug)
+      assertEquals("Callback for task {} Partition 0 timed out after 100 ms.", 
rootCause.getMessage());
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testProcessingExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_PROCESS, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a 
ProcessFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof ProcessFailureException);
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testDownstreamOperatorExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_DOWNSTREAM_OPERATOR, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a FilterFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof FilterFailureException);
+    }
   }
 
-  private List<PageView> runTest(List<PageView> pageViews, Map<String, String> 
configs) {
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, 
PAGE_VIEW_STREAM), TEST_SYSTEM);
-
+  private List<PageView> runTest(Map<String, String> configs) {
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(TEST_SYSTEM);
     InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
         .getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<>());
-
-
     InMemoryOutputDescriptor<PageView> outputStreamDesc = isd
         .getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
 
     TestRunner
         .of(new AsyncFlatMapExample())
-        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(pageViewStreamDesc, PAGE_VIEWS)
         .addOutputStream(outputStreamDesc, 1)
         .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(50000));
+        .run(Duration.ofSeconds(10));
 
     Map<Integer, List<PageView>> result = 
TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-    List<PageView> results = result.values().stream()
+    return result.values().stream()
         .flatMap(List::stream)
         .collect(Collectors.toList());
+  }
 
-    return results;
+  private static Throwable findRootCause(Throwable e) {

Review comment:
       Generally, I would prefer using the junit annotation, but in this case, 
the junit annotation doesn't provide enough granularity, since it only checks 
the outer exception type. Both the test framework and Samza wrap the actual 
exception with `SamzaException`, so just checking `SamzaException` with a junit 
annotation doesn't really check that the expected failure case is being 
triggered.
   For example, the app execution for this test could fail due to some init 
step instead of during processing, but the outer exception type would still be 
`SamzaException` for both cases. The junit annotation would allow the result to 
succeed in both cases, but this current way allows us to make sure the error 
actually happened during processing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to