This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 664ebb0  SAMZA-2721: Container should exit with non-zero status code 
in case of errors during launch (#1583)
664ebb0 is described below

commit 664ebb0adddd6c4bb65c1c213e55888a1df67f49
Author: mynameborat <[email protected]>
AuthorDate: Mon Feb 14 15:29:29 2022 -0800

    SAMZA-2721: Container should exit with non-zero status code in case of 
errors during launch (#1583)
    
    Problem:
    ContainerLaunchUtil during its launch sequence swallows exception and 
proceeds to shutdown with 0 status code. This causes AM to not restart the 
container.
    
    Description:
    With the run method, as part of launch sequence we have various 
initialization steps before kicking off the container. In case of exceptions 
during this step, the run method catches all erros but only logs them and 
proceeds to shutdown as usual.
    Due to normal exit, AM treats the container completed successfully and 
hence doesn't restart causing the failed container to remain failed.
    
    Changes:
    
    Exit with status code 1 in case of exception during launch
    Fix the error log to include both actual failure and container exception
    Tests:
    
    Added unit test to validate if the system is exiting with the right status 
code upon encountering exception
    API Changes: None
    Usage Instructions: None
    Upgrade Instructions: None
---
 .../apache/samza/runtime/ContainerLaunchUtil.java  | 42 +++++++++++--
 .../samza/runtime/TestContainerLaunchUtil.java     | 68 ++++++++++++++++++++++
 2 files changed, 105 insertions(+), 5 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 7314a86..9cc8121 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -99,10 +100,11 @@ public class ContainerLaunchUtil {
     run(appDesc, jobName, jobId, containerId, executionEnvContainerId, 
samzaEpochId, jobModel, config,
         buildExternalContext(config));
 
-    System.exit(0);
+    exitProcess(0);
   }
 
-  private static void run(
+  @VisibleForTesting
+  static void run(
       ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
       String jobName,
       String jobId,
@@ -112,9 +114,15 @@ public class ContainerLaunchUtil {
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
-    CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap());
+    CoordinatorStreamStore coordinatorStreamStore = 
buildCoordinatorStreamStore(config, new MetricsRegistryMap());
     coordinatorStreamStore.init();
 
+    /*
+     * We track the exit code and only trigger exit in the finally block to 
make sure we are able to execute all the
+     * clean up steps. Prior implementation had short circuited exit causing 
some of the clean up steps to be missed.
+     */
+    int exitCode = 0;
+
     try {
       TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
       LocalityManager localityManager = new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetContainerHostMapping.TYPE));
@@ -179,15 +187,39 @@ public class ContainerLaunchUtil {
 
       if (containerRunnerException != null) {
         log.error("Container stopped with Exception. Exiting process now.", 
containerRunnerException);
-        System.exit(1);
+        exitCode = 1;
       }
     } catch (Throwable e) {
-      log.error("Container stopped with Exception. ", 
containerRunnerException);
+      /*
+       * Two separate log statements are intended to print the entire stack 
trace as part of the logs. Using
+       * single log statement with custom format requires explicitly fetching 
stack trace and null checks which makes
+       * the code slightly hard to read in comparison with the current choice.
+       */
+      log.error("Exiting the process due to", e);
+      log.error("Container runner exception: ", containerRunnerException);
+      exitCode = 1;
     } finally {
       coordinatorStreamStore.close();
+      /*
+       * Only exit in the scenario of non-zero exit code in order to maintain 
parity with current implementation where
+       * the method completes when no errors are encountered.
+       */
+      if (exitCode != 0) {
+        exitProcess(exitCode);
+      }
     }
   }
 
+  @VisibleForTesting
+  static CoordinatorStreamStore buildCoordinatorStreamStore(Config config, 
MetricsRegistryMap metricsRegistryMap) {
+    return new CoordinatorStreamStore(config, metricsRegistryMap);
+  }
+
+  @VisibleForTesting
+  static void exitProcess(int status) {
+    System.exit(status);
+  }
+
   private static Optional<ExternalContext> buildExternalContext(Config config) 
{
     /*
      * By default, use an empty ExternalContext here. In a custom fork of 
Samza, this can be implemented to pass
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
new file mode 100644
index 0000000..ec57991
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.job.model.JobModel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ContainerLaunchUtil.class)
+public class TestContainerLaunchUtil {
+  private static final String JOB_NAME = "test-job";
+  private static final String JOB_ID = "test-job-i001";
+  private static final String CONTAINER_ID = "test-job-container-0001";
+
+  private static final ApplicationDescriptorImpl APP_DESC = 
mock(ApplicationDescriptorImpl.class);
+  private static final JobModel JOB_MODEL = mock(JobModel.class);
+  private static final Config CONFIG = mock(Config.class);
+
+  @Test
+  public void testRunWithException() throws Exception {
+    final CountDownLatch completionLatch = new CountDownLatch(1);
+    PowerMockito.mockStatic(ContainerLaunchUtil.class);
+    PowerMockito.doReturn(mock(CoordinatorStreamStore.class))
+        .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore", 
eq(CONFIG), any());
+    PowerMockito.doAnswer(invocation -> {
+      completionLatch.countDown();
+      return null;
+    }).when(ContainerLaunchUtil.class, "exitProcess", eq(1));
+    PowerMockito.doCallRealMethod()
+        .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), 
eq(JOB_ID), eq(CONTAINER_ID), any(), any(),
+            eq(JOB_MODEL), eq(CONFIG), any());
+
+    ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, 
Optional.empty(), Optional.empty(), JOB_MODEL,
+        CONFIG, Optional.empty());
+    assertTrue(completionLatch.await(1, TimeUnit.SECONDS));
+  }
+}

Reply via email to