Sanil15 commented on a change in pull request #1510:
URL: https://github.com/apache/samza/pull/1510#discussion_r683775059



##########
File path: 
samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
##########
@@ -68,66 +68,97 @@
       new PageView("3", "profile-page", "0"),
       new PageView("4", LOGIN_PAGE, "0"));
 
-
   @Test
   public void testProcessingFutureCompletesSuccessfully() {
     List<PageView> expectedPageViews = PAGE_VIEWS.stream()
-        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && 
Long.valueOf(pageView.getUserId()) > 0)
+        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && 
Long.parseLong(pageView.getUserId()) > 0)
         .collect(Collectors.toList());
 
-    List<PageView> actualPageViews = runTest(PAGE_VIEWS, new HashMap<>());
+    List<PageView> actualPageViews = runTest(new HashMap<>());
     assertEquals("Mismatch between expected vs actual page views", 
expectedPageViews, actualPageViews);
   }
 
-  @Test(expected = SamzaException.class)
+  @Test
   public void testProcessingFutureCompletesAfterTaskTimeout() {
     Map<String, String> configs = new HashMap<>();
     configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
     configs.put(PROCESS_JITTER, "200");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a task callback timeout");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The timeout message is
+       * nested within a bunch of other exceptions.
+       */
+      Throwable rootCause = findRootCause(e);
+      assertTrue(rootCause instanceof SamzaException);
+      // the "{}" is intentional, since the exception message actually 
includes it (probably a logging bug)
+      assertEquals("Callback for task {} Partition 0 timed out after 100 ms.", 
rootCause.getMessage());
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testProcessingExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_PROCESS, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a 
ProcessFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof ProcessFailureException);
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testDownstreamOperatorExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_DOWNSTREAM_OPERATOR, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a FilterFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof FilterFailureException);
+    }
   }
 
-  private List<PageView> runTest(List<PageView> pageViews, Map<String, String> 
configs) {
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, 
PAGE_VIEW_STREAM), TEST_SYSTEM);
-
+  private List<PageView> runTest(Map<String, String> configs) {
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(TEST_SYSTEM);
     InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
         .getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<>());
-
-
     InMemoryOutputDescriptor<PageView> outputStreamDesc = isd
         .getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
 
     TestRunner
         .of(new AsyncFlatMapExample())
-        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(pageViewStreamDesc, PAGE_VIEWS)
         .addOutputStream(outputStreamDesc, 1)
         .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(50000));
+        .run(Duration.ofSeconds(10));
 
     Map<Integer, List<PageView>> result = 
TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-    List<PageView> results = result.values().stream()
+    return result.values().stream()
         .flatMap(List::stream)
         .collect(Collectors.toList());
+  }
 
-    return results;
+  private static Throwable findRootCause(Throwable e) {

Review comment:
       make sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to