This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 664ebb0 SAMZA-2721: Container should exit with non-zero status code
in case of errors during launch (#1583)
664ebb0 is described below
commit 664ebb0adddd6c4bb65c1c213e55888a1df67f49
Author: mynameborat <[email protected]>
AuthorDate: Mon Feb 14 15:29:29 2022 -0800
SAMZA-2721: Container should exit with non-zero status code in case of
errors during launch (#1583)
Problem:
ContainerLaunchUtil during its launch sequence swallows exception and
proceeds to shutdown with 0 status code. This causes AM to not restart the
container.
Description:
With the run method, as part of launch sequence we have various
initialization steps before kicking off the container. In case of exceptions
during this step, the run method catches all erros but only logs them and
proceeds to shutdown as usual.
Due to normal exit, AM treats the container completed successfully and
hence doesn't restart causing the failed container to remain failed.
Changes:
Exit with status code 1 in case of exception during launch
Fix the error log to include both actual failure and container exception
Tests:
Added unit test to validate if the system is exiting with the right status
code upon encountering exception
API Changes: None
Usage Instructions: None
Upgrade Instructions: None
---
.../apache/samza/runtime/ContainerLaunchUtil.java | 42 +++++++++++--
.../samza/runtime/TestContainerLaunchUtil.java | 68 ++++++++++++++++++++++
2 files changed, 105 insertions(+), 5 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 7314a86..9cc8121 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -19,6 +19,7 @@
package org.apache.samza.runtime;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -99,10 +100,11 @@ public class ContainerLaunchUtil {
run(appDesc, jobName, jobId, containerId, executionEnvContainerId,
samzaEpochId, jobModel, config,
buildExternalContext(config));
- System.exit(0);
+ exitProcess(0);
}
- private static void run(
+ @VisibleForTesting
+ static void run(
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
String jobName,
String jobId,
@@ -112,9 +114,15 @@ public class ContainerLaunchUtil {
JobModel jobModel,
Config config,
Optional<ExternalContext> externalContextOptional) {
- CoordinatorStreamStore coordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap());
+ CoordinatorStreamStore coordinatorStreamStore =
buildCoordinatorStreamStore(config, new MetricsRegistryMap());
coordinatorStreamStore.init();
+ /*
+ * We track the exit code and only trigger exit in the finally block to
make sure we are able to execute all the
+ * clean up steps. Prior implementation had short circuited exit causing
some of the clean up steps to be missed.
+ */
+ int exitCode = 0;
+
try {
TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
LocalityManager localityManager = new LocalityManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetContainerHostMapping.TYPE));
@@ -179,15 +187,39 @@ public class ContainerLaunchUtil {
if (containerRunnerException != null) {
log.error("Container stopped with Exception. Exiting process now.",
containerRunnerException);
- System.exit(1);
+ exitCode = 1;
}
} catch (Throwable e) {
- log.error("Container stopped with Exception. ",
containerRunnerException);
+ /*
+ * Two separate log statements are intended to print the entire stack
trace as part of the logs. Using
+ * single log statement with custom format requires explicitly fetching
stack trace and null checks which makes
+ * the code slightly hard to read in comparison with the current choice.
+ */
+ log.error("Exiting the process due to", e);
+ log.error("Container runner exception: ", containerRunnerException);
+ exitCode = 1;
} finally {
coordinatorStreamStore.close();
+ /*
+ * Only exit in the scenario of non-zero exit code in order to maintain
parity with current implementation where
+ * the method completes when no errors are encountered.
+ */
+ if (exitCode != 0) {
+ exitProcess(exitCode);
+ }
}
}
+ @VisibleForTesting
+ static CoordinatorStreamStore buildCoordinatorStreamStore(Config config,
MetricsRegistryMap metricsRegistryMap) {
+ return new CoordinatorStreamStore(config, metricsRegistryMap);
+ }
+
+ @VisibleForTesting
+ static void exitProcess(int status) {
+ System.exit(status);
+ }
+
private static Optional<ExternalContext> buildExternalContext(Config config)
{
/*
* By default, use an empty ExternalContext here. In a custom fork of
Samza, this can be implemented to pass
diff --git
a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
new file mode 100644
index 0000000..ec57991
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.job.model.JobModel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ContainerLaunchUtil.class)
+public class TestContainerLaunchUtil {
+ private static final String JOB_NAME = "test-job";
+ private static final String JOB_ID = "test-job-i001";
+ private static final String CONTAINER_ID = "test-job-container-0001";
+
+ private static final ApplicationDescriptorImpl APP_DESC =
mock(ApplicationDescriptorImpl.class);
+ private static final JobModel JOB_MODEL = mock(JobModel.class);
+ private static final Config CONFIG = mock(Config.class);
+
+ @Test
+ public void testRunWithException() throws Exception {
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+ PowerMockito.mockStatic(ContainerLaunchUtil.class);
+ PowerMockito.doReturn(mock(CoordinatorStreamStore.class))
+ .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore",
eq(CONFIG), any());
+ PowerMockito.doAnswer(invocation -> {
+ completionLatch.countDown();
+ return null;
+ }).when(ContainerLaunchUtil.class, "exitProcess", eq(1));
+ PowerMockito.doCallRealMethod()
+ .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME),
eq(JOB_ID), eq(CONTAINER_ID), any(), any(),
+ eq(JOB_MODEL), eq(CONFIG), any());
+
+ ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID,
Optional.empty(), Optional.empty(), JOB_MODEL,
+ CONFIG, Optional.empty());
+ assertTrue(completionLatch.await(1, TimeUnit.SECONDS));
+ }
+}