This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch SAMZA-2721 in repository https://gitbox.apache.org/repos/asf/samza.git
commit ad596c87c1c8c07ca58b29e29974227b29444a29 Author: Bharath Kumarasubramanian <[email protected]> AuthorDate: Mon Feb 14 09:28:38 2022 -0800 SAMZA-2721: Container should exit with non-zero status code in case of errors during launch --- .../apache/samza/runtime/ContainerLaunchUtil.java | 23 ++++++-- .../samza/runtime/TestContainerLaunchUtil.java | 68 ++++++++++++++++++++++ 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java index 7314a86..42d3c91 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java @@ -19,6 +19,7 @@ package org.apache.samza.runtime; +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -99,10 +100,11 @@ public class ContainerLaunchUtil { run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config, buildExternalContext(config)); - System.exit(0); + exitProcess(0); } - private static void run( + @VisibleForTesting + static void run( ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String jobName, String jobId, @@ -112,7 +114,7 @@ public class ContainerLaunchUtil { JobModel jobModel, Config config, Optional<ExternalContext> externalContextOptional) { - CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap()); + CoordinatorStreamStore coordinatorStreamStore = buildCoordinatorStreamStore(config, new MetricsRegistryMap()); coordinatorStreamStore.init(); try { @@ -179,15 +181,26 @@ public class ContainerLaunchUtil { if (containerRunnerException != null) { log.error("Container stopped with Exception. Exiting process now.", containerRunnerException); - System.exit(1); + exitProcess(1); } } catch (Throwable e) { - log.error("Container stopped with Exception. ", containerRunnerException); + log.error("Exiting the process due to {}. \nContainer runner exception: {}", e, containerRunnerException); + exitProcess(1); } finally { coordinatorStreamStore.close(); } } + @VisibleForTesting + static CoordinatorStreamStore buildCoordinatorStreamStore(Config config, MetricsRegistryMap metricsRegistryMap) { + return new CoordinatorStreamStore(config, metricsRegistryMap); + } + + @VisibleForTesting + static void exitProcess(int status) { + System.exit(status); + } + private static Optional<ExternalContext> buildExternalContext(Config config) { /* * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java new file mode 100644 index 0000000..ec57991 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.runtime; + +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; +import org.apache.samza.job.model.JobModel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ContainerLaunchUtil.class) +public class TestContainerLaunchUtil { + private static final String JOB_NAME = "test-job"; + private static final String JOB_ID = "test-job-i001"; + private static final String CONTAINER_ID = "test-job-container-0001"; + + private static final ApplicationDescriptorImpl APP_DESC = mock(ApplicationDescriptorImpl.class); + private static final JobModel JOB_MODEL = mock(JobModel.class); + private static final Config CONFIG = mock(Config.class); + + @Test + public void testRunWithException() throws Exception { + final CountDownLatch completionLatch = new CountDownLatch(1); + PowerMockito.mockStatic(ContainerLaunchUtil.class); + PowerMockito.doReturn(mock(CoordinatorStreamStore.class)) + .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore", eq(CONFIG), any()); + PowerMockito.doAnswer(invocation -> { + completionLatch.countDown(); + return null; + }).when(ContainerLaunchUtil.class, "exitProcess", eq(1)); + PowerMockito.doCallRealMethod() + .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(), + eq(JOB_MODEL), eq(CONFIG), any()); + + ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL, + CONFIG, Optional.empty()); + assertTrue(completionLatch.await(1, TimeUnit.SECONDS)); + } +}
