cameronlee314 commented on a change in pull request #1510:
URL: https://github.com/apache/samza/pull/1510#discussion_r683680001
##########
File path:
samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
##########
@@ -68,66 +68,97 @@
new PageView("3", "profile-page", "0"),
new PageView("4", LOGIN_PAGE, "0"));
-
@Test
public void testProcessingFutureCompletesSuccessfully() {
List<PageView> expectedPageViews = PAGE_VIEWS.stream()
- .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) &&
Long.valueOf(pageView.getUserId()) > 0)
+ .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) &&
Long.parseLong(pageView.getUserId()) > 0)
.collect(Collectors.toList());
- List<PageView> actualPageViews = runTest(PAGE_VIEWS, new HashMap<>());
+ List<PageView> actualPageViews = runTest(new HashMap<>());
assertEquals("Mismatch between expected vs actual page views",
expectedPageViews, actualPageViews);
}
- @Test(expected = SamzaException.class)
+ @Test
public void testProcessingFutureCompletesAfterTaskTimeout() {
Map<String, String> configs = new HashMap<>();
configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
configs.put(PROCESS_JITTER, "200");
- runTest(PAGE_VIEWS, configs);
+ try {
+ runTest(configs);
+ fail("App execution should have failed due to a task callback timeout");
+ } catch (SamzaException e) {
+ /*
+ * TestRunner throws SamzaException on failures in general, so check the
actual cause. The timeout message is
+ * nested within a bunch of other exceptions.
+ */
+ Throwable rootCause = findRootCause(e);
+ assertTrue(rootCause instanceof SamzaException);
+ // the "{}" is intentional, since the exception message actually
includes it (probably a logging bug)
+ assertEquals("Callback for task {} Partition 0 timed out after 100 ms.",
rootCause.getMessage());
+ }
}
- @Test(expected = RuntimeException.class)
+ @Test
public void testProcessingExceptionIsBubbledUp() {
Map<String, String> configs = new HashMap<>();
configs.put(FAIL_PROCESS, "true");
- runTest(PAGE_VIEWS, configs);
+ try {
+ runTest(configs);
+ fail("App execution should have failed due to a
ProcessFailureException");
+ } catch (SamzaException e) {
+ /*
+ * TestRunner throws SamzaException on failures in general, so check the
actual cause. The actual exception is
+ * nested within a bunch of other exceptions.
+ */
+ assertTrue(findRootCause(e) instanceof ProcessFailureException);
+ }
}
- @Test(expected = RuntimeException.class)
+ @Test
public void testDownstreamOperatorExceptionIsBubbledUp() {
Map<String, String> configs = new HashMap<>();
configs.put(FAIL_DOWNSTREAM_OPERATOR, "true");
- runTest(PAGE_VIEWS, configs);
+ try {
+ runTest(configs);
+ fail("App execution should have failed due to a FilterFailureException");
+ } catch (SamzaException e) {
+ /*
+ * TestRunner throws SamzaException on failures in general, so check the
actual cause. The actual exception is
+ * nested within a bunch of other exceptions.
+ */
+ assertTrue(findRootCause(e) instanceof FilterFailureException);
+ }
}
- private List<PageView> runTest(List<PageView> pageViews, Map<String, String>
configs) {
- configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID,
PAGE_VIEW_STREAM), TEST_SYSTEM);
-
+ private List<PageView> runTest(Map<String, String> configs) {
InMemorySystemDescriptor isd = new InMemorySystemDescriptor(TEST_SYSTEM);
InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
.getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<>());
-
-
InMemoryOutputDescriptor<PageView> outputStreamDesc = isd
.getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
TestRunner
.of(new AsyncFlatMapExample())
- .addInputStream(pageViewStreamDesc, pageViews)
+ .addInputStream(pageViewStreamDesc, PAGE_VIEWS)
.addOutputStream(outputStreamDesc, 1)
.addConfig(new MapConfig(configs))
- .run(Duration.ofMillis(50000));
+ .run(Duration.ofSeconds(10));
Map<Integer, List<PageView>> result =
TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
- List<PageView> results = result.values().stream()
+ return result.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
+ }
- return results;
+ private static Throwable findRootCause(Throwable e) {
Review comment:
Generally, I would prefer using the junit annotation, but in this case,
the junit annotation doesn't provide enough granularity, since it only checks
the outer exception type. Both the test framework and Samza wrap the actual
exception with `SamzaException`, so just checking `SamzaException` with a junit
annotation doesn't really check that the expected failure case is being
triggered.
For example, the app execution for this test could fail due to some init
step instead of during processing, but the outer exception type would still be
`SamzaException` for both cases. The junit annotation would allow the result to
succeed in both cases, but this current way allows us to make sure the error
actually happened during processing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]