XComp commented on a change in pull request #14499:
URL: https://github.com/apache/flink/pull/14499#discussion_r555570200
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -115,4 +127,35 @@ public static boolean
isDeclarativeResourceManagementEnabled(Configuration confi
return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT)
&&
!System.getProperties().containsKey("flink.tests.disable-declarative");
}
+
+ /** The mode of how to handle user code attempting to exit JVM. */
+ public enum UserSystemExitMode {
+ DISABLED("No check is enabled, that is allowing exit without any
action"),
Review comment:
```suggestion
DISABLED("No check is enabled. Hence, exits are allowed without any
action being triggered."),
```
I feel like the current version of the description might be ambiguous:
Firstly, I read it like "there is no check which allows exiting without any
action". I suggest the version above.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.security.Permission;
+import java.util.function.Consumer;
+
+/**
+ * {@code FlinkSecurityManager} to control certain behaviors that can be
captured by Java system
+ * security manager. It can be used to control unexpected user behaviors that
potentially impact
+ * cluster availability, for example, it can warn or prevent user code from
terminating JVM by
+ * System.exit or halt by logging or throwing an exception. This does not
necessarily prevent
+ * malicious users who try to tweak security manager on their own, but more
for being dependable
+ * against user mistakes by gracefully handling them informing users rather
than causing silent
+ * unavailability.
+ */
+public class FlinkSecurityManager extends SecurityManager {
+
+ static final Logger LOG =
LoggerFactory.getLogger(FlinkSecurityManager.class);
+
+ /**
+ * Security manager reference lastly set to system's security manager by
public API. As system
+ * security manager can be reset with another but still chain-called into
this manager properly,
+ * this reference may not be referenced by System.getSecurityManager, but
we still need to
+ * control runtime check behaviors such as monitoring exit from user code.
+ */
+ private static FlinkSecurityManager flinkSecurityManager;
+
+ private final SecurityManager originalSecurityManager;
+ private final ThreadLocal<Boolean> monitorUserSystemExit = new
InheritableThreadLocal<>();
+ private final ClusterOptions.UserSystemExitMode userSystemExitMode;
+
+ /** The behavior to execute when the JVM exists. */
+ private final Consumer<Integer> onExitBehavior;
+
+ @VisibleForTesting
+ FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode userSystemExitMode,
+ @Nullable Consumer<Integer> onExitBehavior) {
+ this(userSystemExitMode, onExitBehavior, System.getSecurityManager());
+ }
+
+ @VisibleForTesting
+ FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode userSystemExitMode,
+ @Nullable Consumer<Integer> onExitBehavior,
+ SecurityManager originalSecurityManager) {
+ this.userSystemExitMode =
Preconditions.checkNotNull(userSystemExitMode);
+ this.onExitBehavior = onExitBehavior;
+ this.originalSecurityManager = originalSecurityManager;
+ }
+
+ /**
+ * Instantiate FlinkUserSecurityManager from configuration. Return null if
no security manager
+ * check is needed, so that a caller can skip setting security manager
avoiding runtime check
+ * cost, if there is no security check set up already. Use {@link
#setFromConfiguration} helper,
+ * which handles disabled case.
+ *
+ * @param configuration to instantiate the security manager from
+ * @return FlinkUserSecurityManager instantiated based on configuration.
Return null if
+ * disabled.
+ */
+ @VisibleForTesting
+ static FlinkSecurityManager fromConfiguration(Configuration configuration)
{
+ final ClusterOptions.UserSystemExitMode userSystemExitMode =
+ configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT);
+
+ boolean haltOnSystemExit =
configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT);
+
+ // If no check is needed, return null so that caller can avoid setting
security manager not
+ // to incur any runtime cost.
+ if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED
&& !haltOnSystemExit) {
+ return null;
+ }
+ Consumer<Integer> onExitBehavior = null;
+ // If halt on system exit is configured, registers a custom
SecurityManager which converts
+ // graceful exists calls using {@code System#exit} into forceful exit
calls using
+ // {@code Runtime#halt}. The latter does not perform a clean shutdown
using the registered
+ // shutdown hooks. This may be configured to prevent deadlocks with
Java 8 and the G1
+ // garbage collection, see
https://issues.apache.org/jira/browse/FLINK-16510.
+ if (haltOnSystemExit) {
+ onExitBehavior = status -> Runtime.getRuntime().halt(status);
+ }
+ LOG.info(
+ "FlinkSecurityManager is created with {} user system exit mode
and {} exit",
+ userSystemExitMode,
+ haltOnSystemExit ? "forceful" : "graceful");
+ // Add more configuration parameters that need user security manager
(currently only for
+ // system exit).
+ return new FlinkSecurityManager(userSystemExitMode, onExitBehavior);
+ }
+
+ public static void setFromConfiguration(Configuration configuration) {
+ final FlinkSecurityManager flinkSecurityManager =
+ FlinkSecurityManager.fromConfiguration(configuration);
+ if (flinkSecurityManager != null) {
+ try {
+ System.setSecurityManager(flinkSecurityManager);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Could not register security manager due to no
permission to "
+ + "set a SecurityManager. Either
update your existing "
+ + "SecurityManager to allow the
permission or not using "
+ + "security manager features (e.g.,
'%s: %s', '%s: %s')",
+
ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.key(),
+
ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.defaultValue(),
+ ClusterOptions.HALT_ON_SYSTEM_EXIT.key(),
+
ClusterOptions.HALT_ON_SYSTEM_EXIT.defaultValue(),
+ e));
+ }
+ }
+ FlinkSecurityManager.flinkSecurityManager = flinkSecurityManager;
+ }
+
+ public static void monitorUserSystemExitForCurrentThread() {
+ if (FlinkSecurityManager.flinkSecurityManager != null) {
+ FlinkSecurityManager.flinkSecurityManager.monitorUserSystemExit();
+ }
+ }
+
+ public static void unmonitorUserSystemExitForCurrentThread() {
+ if (FlinkSecurityManager.flinkSecurityManager != null) {
+
FlinkSecurityManager.flinkSecurityManager.unmonitorUserSystemExit();
+ }
+ }
+
+ @Override
+ public void checkPermission(Permission perm) {
+ if (originalSecurityManager != null) {
+ originalSecurityManager.checkPermission(perm);
+ }
+ }
+
+ @Override
+ public void checkPermission(Permission perm, Object context) {
+ if (originalSecurityManager != null) {
+ originalSecurityManager.checkPermission(perm, context);
+ }
+ }
+
+ @Override
+ public void checkExit(int status) {
+ if (userSystemExitMonitored()) {
+ switch (userSystemExitMode) {
+ case DISABLED:
+ break;
+ case LOG:
+ // Add exception trace log to help users to debug where
exit came from.
+ LOG.warn(
+ "Exiting JVM with status {} is monitored, logging
and exiting",
+ status,
+ new UserSystemExitException());
Review comment:
Do we need to add this exception here? It does not add any value,
doesn't it?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test
+ public void testExitBehaviorChangedWithExistingSecurityManager() {
+ TestExitSecurityManager existingSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(existingSecurityManager);
+ AtomicInteger customExitExecuted = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
customExitExecuted::set);
+
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(existingSecurityManager.getExitStatus(),
is(TEST_EXIT_CODE));
+ assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testExitBehaviorUnchangeOnThrowingUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testDisabledConfiguration() {
+ // Default case (no provided option) - allowing everything, so null
security manager is
+ // expected.
+ Configuration configuration = new Configuration();
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // Disabled case (same as default)
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+ ClusterOptions.UserSystemExitMode.DISABLED);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // No halt (same as default)
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+ }
+
+ @Test
+ public void testLogConfiguration() {
+ // Enabled - log case (logging as warning but allowing exit)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.LOG);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ }
+
+ @Test
+ public void testThrowConfiguration() {
+ // Enabled - throw case (disallowing by throwing exception)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+ // Test for disabled test to check if exit is still allowed
(fromConfiguration gives null
+ // since currently
+ // there is only one option to have a valid security manager, so test
with constructor).
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testHaltConfiguration() {
+ // Halt as forceful shutdown replacing graceful system exit
+ Configuration configuration = new Configuration();
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testInvalidConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ }
+
+ @Test
+ public void testExistingSecurityManagerRespected() {
+ // Don't set the following security manager directly to system, which
makes test hang.
+ SecurityManager originalSecurityManager =
+ new SecurityManager() {
+ @Override
+ public void checkPermission(Permission perm) {
+ throw new SecurityException("not allowed");
+ }
+ };
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> Assert.fail(),
+ originalSecurityManager);
+
+ assertThrows(
+ "not allowed",
+ SecurityException.class,
+ () -> {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ return null;
+ });
+ }
+
+ @Test
+ public void testRegistrationNotAllowedByExistingSecurityManager() {
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+
+ System.setSecurityManager(
+ new SecurityManager() {
+
+ private boolean fired;
+
+ @Override
+ public void checkPermission(Permission perm) {
+ if (!fired &&
perm.getName().equals("setSecurityManager")) {
+ try {
+ throw new SecurityException("not allowed");
+ } finally {
+ // Allow removing this manager again
+ fired = true;
+ }
+ }
+ }
+ });
+
+ assertThrows(
+ "Could not register security manager",
+ IllegalConfigurationException.class,
+ () -> {
+ FlinkSecurityManager.setFromConfiguration(configuration);
+ return null;
+ });
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testMultiSecurityManagersWithSetFirstAndMonitored() {
+ Configuration configuration = new Configuration();
+
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+
+ FlinkSecurityManager.setFromConfiguration(configuration);
+
+ TestExitSecurityManager newSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(newSecurityManager);
+
+ FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+ newSecurityManager.checkExit(TEST_EXIT_CODE);
Review comment:
You would have to catch the `UserSystemExitException` here instead of
using JUnit's `expected` feature if you want to check afterwards whether
`TestExitSecurityManager` was triggered.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test
+ public void testExitBehaviorChangedWithExistingSecurityManager() {
+ TestExitSecurityManager existingSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(existingSecurityManager);
+ AtomicInteger customExitExecuted = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
customExitExecuted::set);
+
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(existingSecurityManager.getExitStatus(),
is(TEST_EXIT_CODE));
+ assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testExitBehaviorUnchangeOnThrowingUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testDisabledConfiguration() {
+ // Default case (no provided option) - allowing everything, so null
security manager is
+ // expected.
+ Configuration configuration = new Configuration();
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // Disabled case (same as default)
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+ ClusterOptions.UserSystemExitMode.DISABLED);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // No halt (same as default)
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+ }
+
+ @Test
+ public void testLogConfiguration() {
+ // Enabled - log case (logging as warning but allowing exit)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.LOG);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ }
+
+ @Test
+ public void testThrowConfiguration() {
+ // Enabled - throw case (disallowing by throwing exception)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+ // Test for disabled test to check if exit is still allowed
(fromConfiguration gives null
+ // since currently
+ // there is only one option to have a valid security manager, so test
with constructor).
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testHaltConfiguration() {
+ // Halt as forceful shutdown replacing graceful system exit
+ Configuration configuration = new Configuration();
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testInvalidConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ }
+
+ @Test
+ public void testExistingSecurityManagerRespected() {
+ // Don't set the following security manager directly to system, which
makes test hang.
+ SecurityManager originalSecurityManager =
+ new SecurityManager() {
+ @Override
+ public void checkPermission(Permission perm) {
+ throw new SecurityException("not allowed");
+ }
+ };
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> Assert.fail(),
+ originalSecurityManager);
+
+ assertThrows(
+ "not allowed",
+ SecurityException.class,
+ () -> {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ return null;
+ });
+ }
+
+ @Test
+ public void testRegistrationNotAllowedByExistingSecurityManager() {
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+
+ System.setSecurityManager(
+ new SecurityManager() {
Review comment:
You could add this feature to the testing implementation of
`SecurityManager` instead of using an anonymous class. See
[TestingDispatcherRunner](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/TestingDispatcherRunner.java)
as an example `Testing*` implementation.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.security.Permission;
+import java.util.function.Consumer;
+
+/**
+ * {@code FlinkSecurityManager} to control certain behaviors that can be
captured by Java system
+ * security manager. It can be used to control unexpected user behaviors that
potentially impact
+ * cluster availability, for example, it can warn or prevent user code from
terminating JVM by
+ * System.exit or halt by logging or throwing an exception. This does not
necessarily prevent
+ * malicious users who try to tweak security manager on their own, but more
for being dependable
+ * against user mistakes by gracefully handling them informing users rather
than causing silent
+ * unavailability.
+ */
+public class FlinkSecurityManager extends SecurityManager {
+
+ static final Logger LOG =
LoggerFactory.getLogger(FlinkSecurityManager.class);
+
+ /**
+ * Security manager reference lastly set to system's security manager by
public API. As system
+ * security manager can be reset with another but still chain-called into
this manager properly,
+ * this reference may not be referenced by System.getSecurityManager, but
we still need to
+ * control runtime check behaviors such as monitoring exit from user code.
+ */
+ private static FlinkSecurityManager flinkSecurityManager;
+
+ private final SecurityManager originalSecurityManager;
+ private final ThreadLocal<Boolean> monitorUserSystemExit = new
InheritableThreadLocal<>();
+ private final ClusterOptions.UserSystemExitMode userSystemExitMode;
+
+ /** The behavior to execute when the JVM exists. */
+ private final Consumer<Integer> onExitBehavior;
+
+ @VisibleForTesting
+ FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode userSystemExitMode,
+ @Nullable Consumer<Integer> onExitBehavior) {
+ this(userSystemExitMode, onExitBehavior, System.getSecurityManager());
+ }
+
+ @VisibleForTesting
+ FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode userSystemExitMode,
+ @Nullable Consumer<Integer> onExitBehavior,
+ SecurityManager originalSecurityManager) {
+ this.userSystemExitMode =
Preconditions.checkNotNull(userSystemExitMode);
+ this.onExitBehavior = onExitBehavior;
+ this.originalSecurityManager = originalSecurityManager;
+ }
+
+ /**
+ * Instantiate FlinkUserSecurityManager from configuration. Return null if
no security manager
+ * check is needed, so that a caller can skip setting security manager
avoiding runtime check
+ * cost, if there is no security check set up already. Use {@link
#setFromConfiguration} helper,
+ * which handles disabled case.
+ *
+ * @param configuration to instantiate the security manager from
+ * @return FlinkUserSecurityManager instantiated based on configuration.
Return null if
+ * disabled.
+ */
+ @VisibleForTesting
+ static FlinkSecurityManager fromConfiguration(Configuration configuration)
{
+ final ClusterOptions.UserSystemExitMode userSystemExitMode =
+ configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT);
+
+ boolean haltOnSystemExit =
configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT);
+
+ // If no check is needed, return null so that caller can avoid setting
security manager not
+ // to incur any runtime cost.
+ if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED
&& !haltOnSystemExit) {
+ return null;
+ }
+ Consumer<Integer> onExitBehavior = null;
+ // If halt on system exit is configured, registers a custom
SecurityManager which converts
+ // graceful exists calls using {@code System#exit} into forceful exit
calls using
+ // {@code Runtime#halt}. The latter does not perform a clean shutdown
using the registered
+ // shutdown hooks. This may be configured to prevent deadlocks with
Java 8 and the G1
+ // garbage collection, see
https://issues.apache.org/jira/browse/FLINK-16510.
+ if (haltOnSystemExit) {
+ onExitBehavior = status -> Runtime.getRuntime().halt(status);
+ }
+ LOG.info(
+ "FlinkSecurityManager is created with {} user system exit mode
and {} exit",
+ userSystemExitMode,
+ haltOnSystemExit ? "forceful" : "graceful");
+ // Add more configuration parameters that need user security manager
(currently only for
+ // system exit).
+ return new FlinkSecurityManager(userSystemExitMode, onExitBehavior);
+ }
+
+ public static void setFromConfiguration(Configuration configuration) {
+ final FlinkSecurityManager flinkSecurityManager =
+ FlinkSecurityManager.fromConfiguration(configuration);
+ if (flinkSecurityManager != null) {
+ try {
+ System.setSecurityManager(flinkSecurityManager);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Could not register security manager due to no
permission to "
+ + "set a SecurityManager. Either
update your existing "
+ + "SecurityManager to allow the
permission or not using "
Review comment:
```suggestion
+ "SecurityManager to allow the
permission or do not use "
```
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -88,21 +93,28 @@
.build());
@Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
- public static final ConfigOption<Boolean> HALT_ON_FATAL_ERROR =
- key("cluster.processes.halt-on-fatal-error")
+ public static final ConfigOption<Boolean> HALT_ON_SYSTEM_EXIT =
+ key("cluster.processes.halt-on-system-exit")
Review comment:
Renaming an existing parameter has a bigger impact on users as they
possibly have to change their existing configuration files when upgrading.
Having said that, I would be ok with that change if we decide to merge both
parameters into one as they are more or less just defining some special exit
behavior. What do you think?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.security.Permission;
+import java.util.function.Consumer;
+
+/**
+ * {@code FlinkSecurityManager} to control certain behaviors that can be
captured by Java system
+ * security manager. It can be used to control unexpected user behaviors that
potentially impact
+ * cluster availability, for example, it can warn or prevent user code from
terminating JVM by
+ * System.exit or halt by logging or throwing an exception. This does not
necessarily prevent
+ * malicious users who try to tweak security manager on their own, but more
for being dependable
+ * against user mistakes by gracefully handling them informing users rather
than causing silent
+ * unavailability.
+ */
+public class FlinkSecurityManager extends SecurityManager {
+
+ static final Logger LOG =
LoggerFactory.getLogger(FlinkSecurityManager.class);
+
+ /**
+ * Security manager reference lastly set to system's security manager by
public API. As system
+ * security manager can be reset with another but still chain-called into
this manager properly,
+ * this reference may not be referenced by System.getSecurityManager, but
we still need to
+ * control runtime check behaviors such as monitoring exit from user code.
+ */
+ private static FlinkSecurityManager flinkSecurityManager;
+
+ private final SecurityManager originalSecurityManager;
+ private final ThreadLocal<Boolean> monitorUserSystemExit = new
InheritableThreadLocal<>();
+ private final ClusterOptions.UserSystemExitMode userSystemExitMode;
+
+ /** The behavior to execute when the JVM exists. */
+ private final Consumer<Integer> onExitBehavior;
+
+ @VisibleForTesting
+ FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode userSystemExitMode,
+ @Nullable Consumer<Integer> onExitBehavior) {
+ this(userSystemExitMode, onExitBehavior, System.getSecurityManager());
+ }
+
+ @VisibleForTesting
+ FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode userSystemExitMode,
+ @Nullable Consumer<Integer> onExitBehavior,
+ SecurityManager originalSecurityManager) {
+ this.userSystemExitMode =
Preconditions.checkNotNull(userSystemExitMode);
+ this.onExitBehavior = onExitBehavior;
+ this.originalSecurityManager = originalSecurityManager;
+ }
+
+ /**
+ * Instantiate FlinkUserSecurityManager from configuration. Return null if
no security manager
+ * check is needed, so that a caller can skip setting security manager
avoiding runtime check
+ * cost, if there is no security check set up already. Use {@link
#setFromConfiguration} helper,
+ * which handles disabled case.
+ *
+ * @param configuration to instantiate the security manager from
+ * @return FlinkUserSecurityManager instantiated based on configuration.
Return null if
+ * disabled.
+ */
+ @VisibleForTesting
+ static FlinkSecurityManager fromConfiguration(Configuration configuration)
{
+ final ClusterOptions.UserSystemExitMode userSystemExitMode =
+ configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT);
+
+ boolean haltOnSystemExit =
configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT);
+
+ // If no check is needed, return null so that caller can avoid setting
security manager not
+ // to incur any runtime cost.
+ if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED
&& !haltOnSystemExit) {
+ return null;
+ }
+ Consumer<Integer> onExitBehavior = null;
+ // If halt on system exit is configured, registers a custom
SecurityManager which converts
+ // graceful exists calls using {@code System#exit} into forceful exit
calls using
+ // {@code Runtime#halt}. The latter does not perform a clean shutdown
using the registered
+ // shutdown hooks. This may be configured to prevent deadlocks with
Java 8 and the G1
+ // garbage collection, see
https://issues.apache.org/jira/browse/FLINK-16510.
+ if (haltOnSystemExit) {
+ onExitBehavior = status -> Runtime.getRuntime().halt(status);
+ }
+ LOG.info(
+ "FlinkSecurityManager is created with {} user system exit mode
and {} exit",
+ userSystemExitMode,
+ haltOnSystemExit ? "forceful" : "graceful");
+ // Add more configuration parameters that need user security manager
(currently only for
+ // system exit).
+ return new FlinkSecurityManager(userSystemExitMode, onExitBehavior);
+ }
+
+ public static void setFromConfiguration(Configuration configuration) {
+ final FlinkSecurityManager flinkSecurityManager =
+ FlinkSecurityManager.fromConfiguration(configuration);
+ if (flinkSecurityManager != null) {
+ try {
+ System.setSecurityManager(flinkSecurityManager);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Could not register security manager due to no
permission to "
+ + "set a SecurityManager. Either
update your existing "
+ + "SecurityManager to allow the
permission or not using "
+ + "security manager features (e.g.,
'%s: %s', '%s: %s')",
+
ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.key(),
+
ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.defaultValue(),
+ ClusterOptions.HALT_ON_SYSTEM_EXIT.key(),
+
ClusterOptions.HALT_ON_SYSTEM_EXIT.defaultValue(),
+ e));
+ }
+ }
+ FlinkSecurityManager.flinkSecurityManager = flinkSecurityManager;
+ }
+
+ public static void monitorUserSystemExitForCurrentThread() {
+ if (FlinkSecurityManager.flinkSecurityManager != null) {
+ FlinkSecurityManager.flinkSecurityManager.monitorUserSystemExit();
+ }
+ }
+
+ public static void unmonitorUserSystemExitForCurrentThread() {
+ if (FlinkSecurityManager.flinkSecurityManager != null) {
+
FlinkSecurityManager.flinkSecurityManager.unmonitorUserSystemExit();
+ }
+ }
+
+ @Override
+ public void checkPermission(Permission perm) {
+ if (originalSecurityManager != null) {
+ originalSecurityManager.checkPermission(perm);
+ }
+ }
+
+ @Override
+ public void checkPermission(Permission perm, Object context) {
+ if (originalSecurityManager != null) {
+ originalSecurityManager.checkPermission(perm, context);
+ }
+ }
+
+ @Override
+ public void checkExit(int status) {
+ if (userSystemExitMonitored()) {
+ switch (userSystemExitMode) {
+ case DISABLED:
+ break;
+ case LOG:
+ // Add exception trace log to help users to debug where
exit came from.
+ LOG.warn(
+ "Exiting JVM with status {} is monitored, logging
and exiting",
Review comment:
```suggestion
"Exiting JVM with status {} is monitored: The
system will exit due to this call.",
```
I suggest a slight modification here...
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test
+ public void testExitBehaviorChangedWithExistingSecurityManager() {
+ TestExitSecurityManager existingSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(existingSecurityManager);
+ AtomicInteger customExitExecuted = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
customExitExecuted::set);
+
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(existingSecurityManager.getExitStatus(),
is(TEST_EXIT_CODE));
+ assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testExitBehaviorUnchangeOnThrowingUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testDisabledConfiguration() {
+ // Default case (no provided option) - allowing everything, so null
security manager is
+ // expected.
+ Configuration configuration = new Configuration();
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // Disabled case (same as default)
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+ ClusterOptions.UserSystemExitMode.DISABLED);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // No halt (same as default)
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+ }
+
+ @Test
+ public void testLogConfiguration() {
+ // Enabled - log case (logging as warning but allowing exit)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.LOG);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ }
+
+ @Test
+ public void testThrowConfiguration() {
+ // Enabled - throw case (disallowing by throwing exception)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+ // Test for disabled test to check if exit is still allowed
(fromConfiguration gives null
+ // since currently
+ // there is only one option to have a valid security manager, so test
with constructor).
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testHaltConfiguration() {
+ // Halt as forceful shutdown replacing graceful system exit
+ Configuration configuration = new Configuration();
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testInvalidConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
Review comment:
```suggestion
flinkSecurityManager = FlinkSecurityManager.fromConfiguration(new
Configuration());
```
You're testing the `NullPointerException` being thrown by
`Configuration.set(..)`. Calling `fromConfiguration` on an empty configuration
would do it. But, I guess, this test is obsolete as there is no special
behavior in `fromConfiguration` that needs to get tested. It should use the
default value for `ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.security.Permission;
+import java.util.function.Consumer;
+
+/**
+ * {@code FlinkSecurityManager} to control certain behaviors that can be
captured by Java system
+ * security manager. It can be used to control unexpected user behaviors that
potentially impact
+ * cluster availability, for example, it can warn or prevent user code from
terminating JVM by
+ * System.exit or halt by logging or throwing an exception. This does not
necessarily prevent
+ * malicious users who try to tweak security manager on their own, but more
for being dependable
+ * against user mistakes by gracefully handling them informing users rather
than causing silent
+ * unavailability.
+ */
+public class FlinkSecurityManager extends SecurityManager {
+
+ static final Logger LOG =
LoggerFactory.getLogger(FlinkSecurityManager.class);
+
+ /**
+ * Security manager reference lastly set to system's security manager by
public API. As system
+ * security manager can be reset with another but still chain-called into
this manager properly,
+ * this reference may not be referenced by System.getSecurityManager, but
we still need to
+ * control runtime check behaviors such as monitoring exit from user code.
+ */
+ private static FlinkSecurityManager flinkSecurityManager;
+
+ private final SecurityManager originalSecurityManager;
+ private final ThreadLocal<Boolean> monitorUserSystemExit = new
InheritableThreadLocal<>();
+ private final ClusterOptions.UserSystemExitMode userSystemExitMode;
+
+ /** The behavior to execute when the JVM exists. */
+ private final Consumer<Integer> onExitBehavior;
+
+ @VisibleForTesting
+ FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode userSystemExitMode,
+ @Nullable Consumer<Integer> onExitBehavior) {
+ this(userSystemExitMode, onExitBehavior, System.getSecurityManager());
+ }
+
+ @VisibleForTesting
+ FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode userSystemExitMode,
+ @Nullable Consumer<Integer> onExitBehavior,
+ SecurityManager originalSecurityManager) {
+ this.userSystemExitMode =
Preconditions.checkNotNull(userSystemExitMode);
+ this.onExitBehavior = onExitBehavior;
+ this.originalSecurityManager = originalSecurityManager;
+ }
+
+ /**
+ * Instantiate FlinkUserSecurityManager from configuration. Return null if
no security manager
+ * check is needed, so that a caller can skip setting security manager
avoiding runtime check
+ * cost, if there is no security check set up already. Use {@link
#setFromConfiguration} helper,
+ * which handles disabled case.
+ *
+ * @param configuration to instantiate the security manager from
+ * @return FlinkUserSecurityManager instantiated based on configuration.
Return null if
+ * disabled.
+ */
+ @VisibleForTesting
+ static FlinkSecurityManager fromConfiguration(Configuration configuration)
{
Review comment:
We have a special parameter combination where we might have to add (at
least) a warning: `HALT_ON_SYSTEM_EXIT ` is implicitly disabled right now if
`INTERCEPT_USER_SYSTEM_EXIT` is set to `THROW`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test
+ public void testExitBehaviorChangedWithExistingSecurityManager() {
+ TestExitSecurityManager existingSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(existingSecurityManager);
+ AtomicInteger customExitExecuted = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
customExitExecuted::set);
+
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(existingSecurityManager.getExitStatus(),
is(TEST_EXIT_CODE));
+ assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testExitBehaviorUnchangeOnThrowingUserExit() {
Review comment:
This test is obsolete as it tests the same thing as
`testThrowConfiguration` if I'm not mistaken?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
Review comment:
```suggestion
public void testExitHandlerTriggered() {
```
I'm gonna suggest the name change as "behaviorChanged" felt like it's too
general to describe what the test is doing.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test
+ public void testExitBehaviorChangedWithExistingSecurityManager() {
+ TestExitSecurityManager existingSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(existingSecurityManager);
+ AtomicInteger customExitExecuted = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
customExitExecuted::set);
+
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(existingSecurityManager.getExitStatus(),
is(TEST_EXIT_CODE));
+ assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testExitBehaviorUnchangeOnThrowingUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testDisabledConfiguration() {
+ // Default case (no provided option) - allowing everything, so null
security manager is
+ // expected.
+ Configuration configuration = new Configuration();
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // Disabled case (same as default)
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+ ClusterOptions.UserSystemExitMode.DISABLED);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // No halt (same as default)
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+ }
+
+ @Test
+ public void testLogConfiguration() {
+ // Enabled - log case (logging as warning but allowing exit)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.LOG);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ }
+
+ @Test
+ public void testThrowConfiguration() {
+ // Enabled - throw case (disallowing by throwing exception)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+ // Test for disabled test to check if exit is still allowed
(fromConfiguration gives null
+ // since currently
+ // there is only one option to have a valid security manager, so test
with constructor).
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testHaltConfiguration() {
+ // Halt as forceful shutdown replacing graceful system exit
+ Configuration configuration = new Configuration();
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testInvalidConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ }
+
+ @Test
+ public void testExistingSecurityManagerRespected() {
+ // Don't set the following security manager directly to system, which
makes test hang.
+ SecurityManager originalSecurityManager =
+ new SecurityManager() {
+ @Override
+ public void checkPermission(Permission perm) {
+ throw new SecurityException("not allowed");
+ }
+ };
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> Assert.fail(),
+ originalSecurityManager);
+
+ assertThrows(
+ "not allowed",
+ SecurityException.class,
+ () -> {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ return null;
+ });
+ }
+
+ @Test
+ public void testRegistrationNotAllowedByExistingSecurityManager() {
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+
+ System.setSecurityManager(
+ new SecurityManager() {
+
+ private boolean fired;
+
+ @Override
+ public void checkPermission(Permission perm) {
+ if (!fired &&
perm.getName().equals("setSecurityManager")) {
+ try {
+ throw new SecurityException("not allowed");
+ } finally {
+ // Allow removing this manager again
+ fired = true;
+ }
+ }
+ }
+ });
+
+ assertThrows(
+ "Could not register security manager",
+ IllegalConfigurationException.class,
+ () -> {
+ FlinkSecurityManager.setFromConfiguration(configuration);
+ return null;
+ });
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testMultiSecurityManagersWithSetFirstAndMonitored() {
+ Configuration configuration = new Configuration();
+
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+
+ FlinkSecurityManager.setFromConfiguration(configuration);
+
+ TestExitSecurityManager newSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(newSecurityManager);
+
+ FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+ newSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(newSecurityManager.getExitStatus(), is(TEST_EXIT_CODE));
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testMultiSecurityManagersWithSetLastAndMonitored() {
+ Configuration configuration = new Configuration();
+
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+
+ TestExitSecurityManager oldSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(oldSecurityManager);
+
+ FlinkSecurityManager.setFromConfiguration(configuration);
+
+ FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+ System.getSecurityManager().checkExit(TEST_EXIT_CODE);
Review comment:
Here as well: You would have to catch the `UserSystemExitException` here
instead of using JUnit's `expected` feature if you want to check afterwards
whether `TestExitSecurityManager` was triggered.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test
+ public void testExitBehaviorChangedWithExistingSecurityManager() {
+ TestExitSecurityManager existingSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(existingSecurityManager);
+ AtomicInteger customExitExecuted = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
customExitExecuted::set);
+
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(existingSecurityManager.getExitStatus(),
is(TEST_EXIT_CODE));
+ assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testExitBehaviorUnchangeOnThrowingUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testDisabledConfiguration() {
Review comment:
We could split this test case up into three.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
Review comment:
Is there a reason why we have `FlinkSecurityManager` as a member instead
of local variables within each test?
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -88,21 +93,28 @@
.build());
@Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
- public static final ConfigOption<Boolean> HALT_ON_FATAL_ERROR =
- key("cluster.processes.halt-on-fatal-error")
+ public static final ConfigOption<Boolean> HALT_ON_SYSTEM_EXIT =
+ key("cluster.processes.halt-on-system-exit")
Review comment:
Thinking about it again, I realize that they behave in a slightly
different way as the `System.exit` logic is only called in specific code
regions (i.e. user code) whereas the halt logic applies anywhere. @mxm The halt
logic was intended to work on the whole code base, wasn't it?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
Review comment:
```suggestion
status -> exitStatus::set);
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test
+ public void testExitBehaviorChangedWithExistingSecurityManager() {
+ TestExitSecurityManager existingSecurityManager = new
TestExitSecurityManager();
+ System.setSecurityManager(existingSecurityManager);
+ AtomicInteger customExitExecuted = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
customExitExecuted::set);
+
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(existingSecurityManager.getExitStatus(),
is(TEST_EXIT_CODE));
+ assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testExitBehaviorUnchangeOnThrowingUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testDisabledConfiguration() {
+ // Default case (no provided option) - allowing everything, so null
security manager is
+ // expected.
+ Configuration configuration = new Configuration();
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // Disabled case (same as default)
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+ ClusterOptions.UserSystemExitMode.DISABLED);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+
+ // No halt (same as default)
+ configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNull(flinkSecurityManager);
+ }
+
+ @Test
+ public void testLogConfiguration() {
+ // Enabled - log case (logging as warning but allowing exit)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.LOG);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ }
+
+ @Test
+ public void testThrowConfiguration() {
+ // Enabled - throw case (disallowing by throwing exception)
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
ClusterOptions.UserSystemExitMode.THROW);
+ flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
+ assertNotNull(flinkSecurityManager);
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+ flinkSecurityManager.monitorUserSystemExit();
+ assertTrue(flinkSecurityManager.userSystemExitMonitored());
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+ // Test for disabled test to check if exit is still allowed
(fromConfiguration gives null
+ // since currently
+ // there is only one option to have a valid security manager, so test
with constructor).
+ flinkSecurityManager =
Review comment:
This should be a separated test.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
Review comment:
`testLogConfiguration` covers the same scenario - we could delete that
test in my opinion.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+ private static final int TEST_EXIT_CODE = 123;
+ SecurityManager originalSecurityManager;
+ FlinkSecurityManager flinkSecurityManager;
+
+ @Before
+ public void setUp() {
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @After
+ public void tearDown() {
+ System.setSecurityManager(originalSecurityManager);
+ }
+
+ @Test(expected = UserSystemExitException.class)
+ public void testThrowUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testToggleUserExit() {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ flinkSecurityManager.unmonitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testPerThreadThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ // Async thread test before enabling monitoring ensures it does not
throw while prestarting
+ // worker thread, which is to be unmonitored and tested after enabling
monitoring enabled.
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ // This threaded exit should be allowed as thread is not spawned while
monitor is enabled.
+ future =
+ CompletableFuture.runAsync(
+ () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE),
executorService);
+ future.get();
+ }
+
+ @Test
+ public void testInheritedThrowUserExit() throws Exception {
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ }
+ CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ try {
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ fail();
+ } catch (UserSystemExitException ignored) {
+ } catch (Throwable t) {
+ fail();
+ }
+ }
+ };
+ thread.start();
+ thread.sync();
+ }
+
+ @Test
+ public void testLogUserExit() {
+ // Log mode enables monitor but only logging allowing exit, hence not
expecting exception.
+ // NOTE - Do not specifically test warning logging.
+ flinkSecurityManager =
+ new
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+ flinkSecurityManager.monitorUserSystemExit();
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ }
+
+ @Test
+ public void testExitBehaviorChanged() {
+ AtomicInteger exitStatus = new AtomicInteger(0);
+ flinkSecurityManager =
+ new FlinkSecurityManager(
+ ClusterOptions.UserSystemExitMode.DISABLED,
+ status -> exitStatus.set(status));
+ flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+ assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+ }
+
+ @Test
+ public void testExitBehaviorChangedWithExistingSecurityManager() {
Review comment:
```suggestion
public void testExitHandlerTriggeredWithExistingSecurityManager() {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]