tillrohrmann commented on a change in pull request #12446: URL: https://github.com/apache/flink/pull/12446#discussion_r435420178
########## File path: flink-core/src/main/java/org/apache/flink/util/ClassLoaderWithErrorHandler.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.function.Consumer; + +/** + * This class loader accepts a custom handler if an exception occurs in {@link #loadClass(String, boolean)}. + */ +public abstract class ClassLoaderWithErrorHandler extends URLClassLoader { + public static final Consumer<Throwable> EMPTY_EXCEPTION_HANDLER = classLoadingException -> {}; + + private final Consumer<Throwable> classLoadingExceptionHandler; + + protected ClassLoaderWithErrorHandler(URL[] urls, ClassLoader parent) { + this(urls, parent, EMPTY_EXCEPTION_HANDLER); + } + + protected ClassLoaderWithErrorHandler( + URL[] urls, + ClassLoader parent, + Consumer<Throwable> classLoadingExceptionHandler) { + super(urls, parent); + this.classLoadingExceptionHandler = classLoadingExceptionHandler; + } + + @SuppressWarnings("FinalMethod") Review comment: Why do we need this suppression here? Is it because we make the protected method `final`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java ########## @@ -140,7 +141,8 @@ public static JobManagerSharedServices fromConfiguration( blobServer, BlobLibraryCacheManager.defaultClassLoaderFactory( FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), - alwaysParentFirstLoaderPatterns)); + alwaysParentFirstLoaderPatterns, + ClassLoaderWithErrorHandler.EMPTY_EXCEPTION_HANDLER)); Review comment: Shouldn't we also fail if we encounter a metaspace OOM on the `JobManager` side? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java ########## @@ -140,7 +141,8 @@ public static JobManagerSharedServices fromConfiguration( blobServer, BlobLibraryCacheManager.defaultClassLoaderFactory( FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), - alwaysParentFirstLoaderPatterns)); + alwaysParentFirstLoaderPatterns, + ClassLoaderWithErrorHandler.EMPTY_EXCEPTION_HANDLER)); Review comment: Why are we using an `EMPTY_EXCEPTION_HANDLER` here? ########## File path: flink-core/src/test/java/org/apache/flink/util/ClassLoaderWithErrorHandlerTest.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link ClassLoaderWithErrorHandler}. + */ +public class ClassLoaderWithErrorHandlerTest extends TestLogger { + @Test + public void testExceptionHandling() { + RuntimeException expectedException = new RuntimeException("Expected exception"); + AtomicReference<Throwable> handledException = new AtomicReference<>(); + try (ClassLoaderWithErrorHandler classLoaderWithErrorHandler = + new ClassLoaderWithErrorHandler(new ThrowingURLClassLoader(expectedException), handledException::set)) { + classLoaderWithErrorHandler.loadClass("dummy.class"); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException("Unexpected", e); Review comment: I think we don't need this catch clause here. ########## File path: flink-core/src/main/java/org/apache/flink/util/ClassLoaderWithErrorHandler.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.function.Consumer; + +/** + * This class loader accepts a custom handler if an exception occurs in {@link #loadClass(String, boolean)}. + */ +public abstract class ClassLoaderWithErrorHandler extends URLClassLoader { + public static final Consumer<Throwable> EMPTY_EXCEPTION_HANDLER = classLoadingException -> {}; + + private final Consumer<Throwable> classLoadingExceptionHandler; + + protected ClassLoaderWithErrorHandler(URL[] urls, ClassLoader parent) { + this(urls, parent, EMPTY_EXCEPTION_HANDLER); + } + + protected ClassLoaderWithErrorHandler( + URL[] urls, + ClassLoader parent, + Consumer<Throwable> classLoadingExceptionHandler) { + super(urls, parent); + this.classLoadingExceptionHandler = classLoadingExceptionHandler; + } + + @SuppressWarnings("FinalMethod") Review comment: So far we have never used this annotation in the Flink code base. ########## File path: flink-core/src/main/java/org/apache/flink/util/ClassLoaderWithErrorHandler.java ########## @@ -18,47 +18,76 @@ package org.apache.flink.util; +import java.io.IOException; +import java.io.InputStream; import java.net.URL; import java.net.URLClassLoader; +import java.util.Enumeration; import java.util.function.Consumer; /** - * This class loader accepts a custom handler if an exception occurs in {@link #loadClass(String, boolean)}. + * This {@link URLClassLoader} decorator accepts a custom exception handler. + * + * <p>The handler is called if an exception occurs in the {@link #loadClass(String, boolean)} of inner class loader. */ -public abstract class ClassLoaderWithErrorHandler extends URLClassLoader { +public class ClassLoaderWithErrorHandler extends URLClassLoader { public static final Consumer<Throwable> EMPTY_EXCEPTION_HANDLER = classLoadingException -> {}; + private static final URL[] EMPTY_URL_ARRAY = {}; + private final URLClassLoader inner; private final Consumer<Throwable> classLoadingExceptionHandler; - protected ClassLoaderWithErrorHandler(URL[] urls, ClassLoader parent) { - this(urls, parent, EMPTY_EXCEPTION_HANDLER); - } - - protected ClassLoaderWithErrorHandler( - URL[] urls, - ClassLoader parent, + public ClassLoaderWithErrorHandler( + URLClassLoader inner, Consumer<Throwable> classLoadingExceptionHandler) { - super(urls, parent); + super(EMPTY_URL_ARRAY, inner.getParent()); + this.inner = inner; this.classLoadingExceptionHandler = classLoadingExceptionHandler; } - @SuppressWarnings("FinalMethod") @Override - protected final Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - try { - return loadClassWithoutExceptionHandling(name, resolve); - } catch (Throwable classLoadingException) { - classLoadingExceptionHandler.accept(classLoadingException); - throw classLoadingException; + protected Class<?> loadClass(final String name, final boolean resolve) throws ClassNotFoundException { + synchronized (getClassLoadingLock(name)) { + final Class<?> loadedClass = findLoadedClass(name); + if (loadedClass != null) { + return resolveIfNeeded(resolve, loadedClass); + } + + try { + return resolveIfNeeded(resolve, inner.loadClass(name)); + } catch (Throwable classLoadingException) { + classLoadingExceptionHandler.accept(classLoadingException); + throw classLoadingException; + } } } - /** - * Same as {@link #loadClass(String, boolean)} but without exception handling. - * - * <p>Extending concrete class loaders should implement this instead of {@link #loadClass(String, boolean)}. - */ - protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve) throws ClassNotFoundException { - return super.loadClass(name, resolve); + private Class<?> resolveIfNeeded(final boolean resolve, final Class<?> loadedClass) { + if (resolve) { + resolveClass(loadedClass); + } + + return loadedClass; + } + + @Override + public URL getResource(final String name) { + return inner.getResource(name); + } + + @Override + public Enumeration<URL> getResources(final String name) throws IOException { + return inner.getResources(name); + } + + @Override + public InputStream getResourceAsStream(String name) { + return inner.getResourceAsStream(name); + } + + @Override + public void close() throws IOException { + super.close(); + inner.close(); Review comment: I think this should be inverted since deconstruction is usually inverse to construction. ########## File path: flink-core/src/main/java/org/apache/flink/util/ClassLoaderWithErrorHandler.java ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Enumeration; +import java.util.function.Consumer; + +/** + * This {@link URLClassLoader} decorator accepts a custom exception handler. + * + * <p>The handler is called if an exception occurs in the {@link #loadClass(String, boolean)} of inner class loader. + */ +public class ClassLoaderWithErrorHandler extends URLClassLoader { Review comment: We could name this class ```suggestion public class FlinkUserCodeClassLoader extends URLClassLoader { ``` if it is the base class for every Flink user code class loader. ########## File path: flink-core/src/test/java/org/apache/flink/util/ClassLoaderWithErrorHandlerTest.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link ClassLoaderWithErrorHandler}. + */ +public class ClassLoaderWithErrorHandlerTest extends TestLogger { + @Test + public void testExceptionHandling() { + RuntimeException expectedException = new RuntimeException("Expected exception"); + AtomicReference<Throwable> handledException = new AtomicReference<>(); + try (ClassLoaderWithErrorHandler classLoaderWithErrorHandler = + new ClassLoaderWithErrorHandler(new ThrowingURLClassLoader(expectedException), handledException::set)) { + classLoaderWithErrorHandler.loadClass("dummy.class"); Review comment: `fail()` is missing ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ########## @@ -311,7 +314,12 @@ public static TaskManagerServices fromConfiguration( permanentBlobService, BlobLibraryCacheManager.defaultClassLoaderFactory( taskManagerServicesConfiguration.getClassLoaderResolveOrder(), - taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns())); + taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(), + classLoadingException -> { + if (ExceptionUtils.isMetaspaceOutOfMemoryError(classLoadingException)) { + fatalErrorHandler.onFatalError(classLoadingException); + } Review comment: I think it would be fine to simply create the user code class loaders with something like the `FatalExitExceptionHandler`. It should also be enough to decide on the handler in `FlinkUserCodeClassLoaders`. That way we don't have to forward the handler all the way to this class. Maybe it is enough to extend `FlinkUserCodeClassLoaders.create` to take a parameter which says whether it should fail fatally on metaspace OOMs or not. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
