This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 86e12eb3fcec54d234154e59f8cb0557fc616494 Author: Shengkai <[email protected]> AuthorDate: Thu Mar 2 20:05:51 2023 +0800 [FLINK-31092][table-common] Fix ServiceLoaderUtil keeps loading even though the classloader has been closed This closes #22072 Co-authored-by: Matthias Pohl <[email protected]> --- .../apache/flink/table/factories/FactoryUtil.java | 52 ++++++++------ .../flink/table/factories/ServiceLoaderUtil.java | 83 ---------------------- .../flink/table/factories/FactoryUtilTest.java | 20 ++++++ 3 files changed, 50 insertions(+), 105 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index d90dbc6c2f8..e22ba4051da 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -52,11 +52,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.ServiceLoader; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -801,27 +802,34 @@ public final class FactoryUtil { } static List<Factory> discoverFactories(ClassLoader classLoader) { - final List<Factory> result = new LinkedList<>(); - ServiceLoaderUtil.load(Factory.class, classLoader) - .forEach( - loadResult -> { - if (loadResult.hasFailed()) { - if (loadResult.getError() instanceof NoClassDefFoundError) { - LOG.debug( - "NoClassDefFoundError when loading a " - + Factory.class - + ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.", - loadResult.getError()); - // After logging, we just ignore this failure - return; - } - throw new TableException( - "Unexpected error when trying to load service provider for factories.", - loadResult.getError()); - } - result.add(loadResult.getService()); - }); - return result; + final Iterator<Factory> serviceLoaderIterator = + ServiceLoader.load(Factory.class, classLoader).iterator(); + + final List<Factory> loadResults = new ArrayList<>(); + while (true) { + try { + // error handling should also be applied to the hasNext() call because service + // loading might cause problems here as well + if (!serviceLoaderIterator.hasNext()) { + break; + } + + loadResults.add(serviceLoaderIterator.next()); + } catch (Throwable t) { + if (t instanceof NoClassDefFoundError) { + LOG.debug( + "NoClassDefFoundError when loading a " + + Factory.class.getCanonicalName() + + ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.", + t); + } else { + throw new TableException( + "Unexpected error when trying to load service provider.", t); + } + } + } + + return loadResults; } private static String stringifyOption(String key, String value) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java deleted file mode 100644 index 620e9c3230a..00000000000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.factories; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.ServiceLoader; - -/** This class contains utilities to deal with {@link ServiceLoader}. */ -class ServiceLoaderUtil { - - /** - * This method behaves similarly to {@link ServiceLoader#load(Class, ClassLoader)}, but it - * returns a list with the results of the iteration, wrapping the iteration failures such as - * {@link NoClassDefFoundError}. - */ - static <T> List<LoadResult<T>> load(Class<T> clazz, ClassLoader classLoader) { - List<LoadResult<T>> loadResults = new ArrayList<>(); - - Iterator<T> serviceLoaderIterator = ServiceLoader.load(clazz, classLoader).iterator(); - - while (true) { - try { - T next = serviceLoaderIterator.next(); - loadResults.add(new LoadResult<>(next)); - } catch (NoSuchElementException e) { - break; - } catch (Throwable t) { - loadResults.add(new LoadResult<>(t)); - } - } - - return loadResults; - } - - static class LoadResult<T> { - private final T service; - private final Throwable error; - - private LoadResult(T service, Throwable error) { - this.service = service; - this.error = error; - } - - private LoadResult(T service) { - this(service, null); - } - - private LoadResult(Throwable error) { - this(null, error); - } - - public boolean hasFailed() { - return error != null; - } - - public Throwable getError() { - return error; - } - - public T getService() { - return service; - } - } -} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java index 45ef9bda2ec..6f6d2a8f40c 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java @@ -21,6 +21,7 @@ package org.apache.flink.table.factories; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CommonCatalogOptions; @@ -33,12 +34,15 @@ import org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock; import org.apache.flink.table.factories.TestFormatFactory.EncodingFormatMock; import org.apache.flink.table.factories.utils.FactoryMocks; import org.apache.flink.testutils.ClassLoaderUtils; +import org.apache.flink.util.FlinkUserCodeClassLoaders; +import org.apache.flink.util.MutableURLClassLoader; import org.assertj.core.api.AbstractThrowableAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.net.URL; import java.net.URLClassLoader; import java.nio.file.Files; import java.nio.file.Path; @@ -668,6 +672,22 @@ class FactoryUtilTest { .contains(serializationSchemaImplementationName); } + @Test + void testDiscoverFactoryFromClosedClassLoader() throws Exception { + MutableURLClassLoader classLoader = + FlinkUserCodeClassLoaders.create( + new URL[0], FactoryUtilTest.class.getClassLoader(), new Configuration()); + classLoader.close(); + assertThatThrownBy(() -> FactoryUtil.discoverFactory(classLoader, Factory.class, "test")) + .satisfies( + FlinkAssertions.anyCauseMatches( + IllegalStateException.class, + "Trying to access closed classloader. Please check if you store classloaders directly " + + "or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third " + + "party library and cannot be fixed immediately, you can disable this check with the " + + "configuration 'classloader.check-leaked-classloader'")); + } + // -------------------------------------------------------------------------------------------- // Helper methods // --------------------------------------------------------------------------------------------
