tillrohrmann commented on a change in pull request #15938: URL: https://github.com/apache/flink/pull/15938#discussion_r633689621
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/ClusterUncaughtExceptionHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ClusterOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility for handling any uncaught exceptions + * + * <p>Handles any uncaught exceptions according to cluster configuration in {@link ClusterOptions} + * to either just log exception, or fail job. + */ +public class ClusterUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + private static final Logger LOG = + LoggerFactory.getLogger(ClusterUncaughtExceptionHandler.class); + ClusterOptions.UncaughtExceptionHandleMode handleMode; + + public ClusterUncaughtExceptionHandler(ClusterOptions.UncaughtExceptionHandleMode handleMode) { + this.handleMode = handleMode; + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + if (handleMode == ClusterOptions.UncaughtExceptionHandleMode.LOG) { + LOG.error("WARNING: Thread '{}' produced an uncaught exception.", t.getName(), e); + } else { // by default, fail the job + new FatalExitExceptionHandler().uncaughtException(t, e); Review comment: ```suggestion new FatalExitExceptionHandler.INSTANCE.uncaughtException(t, e); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/ClusterUncaughtExceptionHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ClusterOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility for handling any uncaught exceptions + * + * <p>Handles any uncaught exceptions according to cluster configuration in {@link ClusterOptions} + * to either just log exception, or fail job. + */ +public class ClusterUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + private static final Logger LOG = + LoggerFactory.getLogger(ClusterUncaughtExceptionHandler.class); + ClusterOptions.UncaughtExceptionHandleMode handleMode; + + public ClusterUncaughtExceptionHandler(ClusterOptions.UncaughtExceptionHandleMode handleMode) { + this.handleMode = handleMode; + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + if (handleMode == ClusterOptions.UncaughtExceptionHandleMode.LOG) { + LOG.error("WARNING: Thread '{}' produced an uncaught exception.", t.getName(), e); Review comment: Add a pointer to the config option with which to change this behaviour. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ########## @@ -183,6 +184,9 @@ public void startCluster() throws ClusterEntrypointException { SecurityContext securityContext = installSecurityContext(configuration); + Thread.setDefaultUncaughtExceptionHandler( + new ClusterUncaughtExceptionHandler( + configuration.get(ClusterOptions.UNCAUGHT_EXCEPTION_HANDLING))); Review comment: Maybe move this logic into the `ClusterEntrypointUtils.configureUncaughtExceptionHandler(Configuration)` and then reuse it for the `TaskManagerRunners`. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java ########## @@ -131,6 +131,18 @@ .withDescription( "Defines whether the cluster uses fine-grained resource management."); + @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER) + public static final ConfigOption<UncaughtExceptionHandleMode> UNCAUGHT_EXCEPTION_HANDLING = + ConfigOptions.key("cluster.uncaught-exception-handling") + .enumType(UncaughtExceptionHandleMode.class) + .defaultValue(UncaughtExceptionHandleMode.FAIL) Review comment: This default value will change a bit Flink's behavior. I am wondering whether this behavior change is too much or not. Maybe we should set the default to `LOG` in order to not change Flink's behavior in case of unstable libraries. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/ClusterUncaughtExceptionHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ClusterOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility for handling any uncaught exceptions + * + * <p>Handles any uncaught exceptions according to cluster configuration in {@link ClusterOptions} + * to either just log exception, or fail job. + */ +public class ClusterUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { Review comment: I think it would be good to add a test for the exit behavior using the `SystemExitTrackingSecurityManager`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/ClusterUncaughtExceptionHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ClusterOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility for handling any uncaught exceptions + * + * <p>Handles any uncaught exceptions according to cluster configuration in {@link ClusterOptions} + * to either just log exception, or fail job. + */ +public class ClusterUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + private static final Logger LOG = + LoggerFactory.getLogger(ClusterUncaughtExceptionHandler.class); + ClusterOptions.UncaughtExceptionHandleMode handleMode; Review comment: ```suggestion private final ClusterOptions.UncaughtExceptionHandleMode handleMode; ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/ClusterUncaughtExceptionHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ClusterOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility for handling any uncaught exceptions + * + * <p>Handles any uncaught exceptions according to cluster configuration in {@link ClusterOptions} + * to either just log exception, or fail job. + */ +public class ClusterUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + private static final Logger LOG = + LoggerFactory.getLogger(ClusterUncaughtExceptionHandler.class); + ClusterOptions.UncaughtExceptionHandleMode handleMode; + + public ClusterUncaughtExceptionHandler(ClusterOptions.UncaughtExceptionHandleMode handleMode) { + this.handleMode = handleMode; + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + if (handleMode == ClusterOptions.UncaughtExceptionHandleMode.LOG) { + LOG.error("WARNING: Thread '{}' produced an uncaught exception.", t.getName(), e); + } else { // by default, fail the job + new FatalExitExceptionHandler().uncaughtException(t, e); Review comment: I am also wondering whether a pointer to the configuration option to control the behavior would be helpful here. If yes, then we probably need a way to pass in a custom logging message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
