Repository: samza Updated Branches: refs/heads/master 906aa6b88 -> 51729ac68
SAMZA-1698: Update appStatus on failures in localApplication.run(streamApp). Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Jagadish <[email protected]> Closes #502 from shanthoosh/local_application_runner_set_exception_in_finish Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/51729ac6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/51729ac6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/51729ac6 Branch: refs/heads/master Commit: 51729ac6836f5baa522d7ba6a7308504ba027c5f Parents: 906aa6b Author: Shanthoosh Venkataraman <[email protected]> Authored: Thu May 3 17:37:49 2018 -0700 Committer: Jagadish <[email protected]> Committed: Thu May 3 17:37:49 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/runtime/LocalApplicationRunner.java | 6 ++++-- .../org/apache/samza/runtime/TestLocalApplicationRunner.java | 5 +---- 2 files changed, 5 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/51729ac6/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 9529581..8f481cd 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -175,8 +175,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { // 4. start the StreamProcessors processors.forEach(StreamProcessor::start); - } catch (Exception e) { - throw new SamzaException("Failed to start application", e); + } catch (Throwable throwable) { + appStatus = ApplicationStatus.unsuccessfulFinish(throwable); + shutdownLatch.countDown(); + throw new SamzaException(String.format("Failed to start application: %s.", app), throwable); } } http://git-wip-us.apache.org/repos/asf/samza/blob/51729ac6/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index a23e513..b4a2259 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -224,16 +224,13 @@ public class TestLocalApplicationRunner { when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))); doReturn(plan).when(runner).getExecutionPlan(any(), any()); - Throwable t = new Throwable("test failure"); StreamProcessor sp = mock(StreamProcessor.class); ArgumentCaptor<StreamProcessorLifecycleListener> captor = ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class); doAnswer(i -> { - StreamProcessorLifecycleListener listener = captor.getValue(); - listener.onFailure(t); - return null; + throw new Exception("test failure"); }).when(sp).start(); doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());
