This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch SAMZA-2721
in repository https://gitbox.apache.org/repos/asf/samza.git

commit ad596c87c1c8c07ca58b29e29974227b29444a29
Author: Bharath Kumarasubramanian <[email protected]>
AuthorDate: Mon Feb 14 09:28:38 2022 -0800

    SAMZA-2721: Container should exit with non-zero status code in case of 
errors during launch
---
 .../apache/samza/runtime/ContainerLaunchUtil.java  | 23 ++++++--
 .../samza/runtime/TestContainerLaunchUtil.java     | 68 ++++++++++++++++++++++
 2 files changed, 86 insertions(+), 5 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 7314a86..42d3c91 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -99,10 +100,11 @@ public class ContainerLaunchUtil {
     run(appDesc, jobName, jobId, containerId, executionEnvContainerId, 
samzaEpochId, jobModel, config,
         buildExternalContext(config));
 
-    System.exit(0);
+    exitProcess(0);
   }
 
-  private static void run(
+  @VisibleForTesting
+  static void run(
       ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
       String jobName,
       String jobId,
@@ -112,7 +114,7 @@ public class ContainerLaunchUtil {
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
-    CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap());
+    CoordinatorStreamStore coordinatorStreamStore = 
buildCoordinatorStreamStore(config, new MetricsRegistryMap());
     coordinatorStreamStore.init();
 
     try {
@@ -179,15 +181,26 @@ public class ContainerLaunchUtil {
 
       if (containerRunnerException != null) {
         log.error("Container stopped with Exception. Exiting process now.", 
containerRunnerException);
-        System.exit(1);
+        exitProcess(1);
       }
     } catch (Throwable e) {
-      log.error("Container stopped with Exception. ", 
containerRunnerException);
+      log.error("Exiting the process due to {}. \nContainer runner exception: 
{}", e, containerRunnerException);
+      exitProcess(1);
     } finally {
       coordinatorStreamStore.close();
     }
   }
 
+  @VisibleForTesting
+  static CoordinatorStreamStore buildCoordinatorStreamStore(Config config, 
MetricsRegistryMap metricsRegistryMap) {
+    return new CoordinatorStreamStore(config, metricsRegistryMap);
+  }
+
+  @VisibleForTesting
+  static void exitProcess(int status) {
+    System.exit(status);
+  }
+
   private static Optional<ExternalContext> buildExternalContext(Config config) 
{
     /*
      * By default, use an empty ExternalContext here. In a custom fork of 
Samza, this can be implemented to pass
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
new file mode 100644
index 0000000..ec57991
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.job.model.JobModel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ContainerLaunchUtil.class)
+public class TestContainerLaunchUtil {
+  private static final String JOB_NAME = "test-job";
+  private static final String JOB_ID = "test-job-i001";
+  private static final String CONTAINER_ID = "test-job-container-0001";
+
+  private static final ApplicationDescriptorImpl APP_DESC = 
mock(ApplicationDescriptorImpl.class);
+  private static final JobModel JOB_MODEL = mock(JobModel.class);
+  private static final Config CONFIG = mock(Config.class);
+
+  @Test
+  public void testRunWithException() throws Exception {
+    final CountDownLatch completionLatch = new CountDownLatch(1);
+    PowerMockito.mockStatic(ContainerLaunchUtil.class);
+    PowerMockito.doReturn(mock(CoordinatorStreamStore.class))
+        .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore", 
eq(CONFIG), any());
+    PowerMockito.doAnswer(invocation -> {
+      completionLatch.countDown();
+      return null;
+    }).when(ContainerLaunchUtil.class, "exitProcess", eq(1));
+    PowerMockito.doCallRealMethod()
+        .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), 
eq(JOB_ID), eq(CONTAINER_ID), any(), any(),
+            eq(JOB_MODEL), eq(CONFIG), any());
+
+    ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, 
Optional.empty(), Optional.empty(), JOB_MODEL,
+        CONFIG, Optional.empty());
+    assertTrue(completionLatch.await(1, TimeUnit.SECONDS));
+  }
+}

Reply via email to