scwhittle commented on code in PR #32922: URL: https://github.com/apache/beam/pull/32922#discussion_r1820519685
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler; +import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; + +/** + * Utility class for {@link java.util.concurrent.ExecutorService}s that will terminate the JVM on + * uncaught exceptions. + * + * @implNote Ensures that all threads produced by the {@link ExecutorService}s have a {@link + * WorkerUncaughtExceptionHandler} attached to prevent hidden/silent exceptions and errors. + */ +public final class TerminatingExecutors { + private TerminatingExecutors() {} + + public static TerminatingExecutorService newSingleThreadExecutor( + ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return new TerminatingExecutorService( + Executors.newSingleThreadExecutor(terminatingThreadFactory(threadFactoryBuilder, logger))); + } + + public static TerminatingScheduledExecutorService newSingleThreadScheduledExecutor( + ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return new TerminatingScheduledExecutorService( + Executors.newSingleThreadScheduledExecutor( + terminatingThreadFactory(threadFactoryBuilder, logger))); + } + + public static TerminatingExecutorService newCachedThreadPool( + ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return new TerminatingExecutorService( + Executors.newCachedThreadPool(terminatingThreadFactory(threadFactoryBuilder, logger))); + } + + public static TerminatingExecutorService newFixedThreadPool( + int numThreads, ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return new TerminatingExecutorService( + Executors.newFixedThreadPool( + numThreads, terminatingThreadFactory(threadFactoryBuilder, logger))); + } + + public static TerminatingExecutorService newSingleThreadedExecutorForTesting( + JvmRuntime jvmRuntime, ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return new TerminatingExecutorService( + Executors.newSingleThreadExecutor( Review Comment: I think it depends on the use case. There can be cases where the futures are being examined and having the exceptions manually handled by retrying etc. If we are using schedule and not ignoring the future result then that seems like a case we wouldn't want a failure. If we are using execute or otherwise ignoring the future it seems like we would want a failure. I think Arun's advice to use ExecutorService where we can makes sense. We can look at the remaining scheduledexecutorservice cases to see if we want to fail or if the errors are already handled via examining the futures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
