This is an automated email from the ASF dual-hosted git repository. dchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 24e530d7e Unique System.exit in ContainerLaunchUtil (#1686) 24e530d7e is described below commit 24e530d7ec498f2cbb971e7b4a01a6768fd43198 Author: Jun Guan <guanjun0...@gmail.com> AuthorDate: Mon Sep 11 16:48:58 2023 -0700 Unique System.exit in ContainerLaunchUtil (#1686) --- .../apache/samza/runtime/ContainerLaunchUtil.java | 14 ++++--------- .../samza/runtime/TestContainerLaunchUtil.java | 24 +++++++++++++++++++++- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java index cd153b55c..6a3c11956 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java @@ -98,14 +98,14 @@ public class ContainerLaunchUtil { LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig()); DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, executionEnvContainerId, config); - run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config, + int exitCode = run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config, buildExternalContext(config)); - exitProcess(0); + exitProcess(exitCode); } @VisibleForTesting - static void run( + static int run( ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String jobName, String jobId, @@ -208,13 +208,7 @@ public class ContainerLaunchUtil { exitCode = 1; } finally { coordinatorStreamStore.close(); - /* - * Only exit in the scenario of non-zero exit code in order to maintain parity with current implementation where - * the method completes when no errors are encountered. - */ - if (exitCode != 0) { - exitProcess(exitCode); - } + return exitCode; } } diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java index ec579918c..51e0175ab 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java @@ -61,8 +61,30 @@ public class TestContainerLaunchUtil { .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(), eq(JOB_MODEL), eq(CONFIG), any()); - ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL, + int exitCode = ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL, CONFIG, Optional.empty()); + assertEquals(1, exitCode); + } + + @Test + public void testRunSuccessfully() throws Exception { + int exitCode = 0; + final CountDownLatch completionLatch = new CountDownLatch(1); + PowerMockito.mockStatic(ContainerLaunchUtil.class); + PowerMockito.doReturn(mock(CoordinatorStreamStore.class)) + .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore", eq(CONFIG), any()); + PowerMockito.doAnswer(invocation -> { + completionLatch.countDown(); + return null; + }).when(ContainerLaunchUtil.class, "exitProcess", eq(exitCode)); + PowerMockito.doReturn(exitCode) + .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(), + eq(JOB_MODEL), eq(CONFIG), any()); + PowerMockito.doCallRealMethod() + .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(), + eq(JOB_MODEL)); + + ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL); assertTrue(completionLatch.await(1, TimeUnit.SECONDS)); } }