This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 86e12eb3fcec54d234154e59f8cb0557fc616494
Author: Shengkai <[email protected]>
AuthorDate: Thu Mar 2 20:05:51 2023 +0800

    [FLINK-31092][table-common] Fix ServiceLoaderUtil keeps loading even though 
the classloader has been closed
    
    This closes #22072
    
    Co-authored-by: Matthias Pohl <[email protected]>
---
 .../apache/flink/table/factories/FactoryUtil.java  | 52 ++++++++------
 .../flink/table/factories/ServiceLoaderUtil.java   | 83 ----------------------
 .../flink/table/factories/FactoryUtilTest.java     | 20 ++++++
 3 files changed, 50 insertions(+), 105 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index d90dbc6c2f8..e22ba4051da 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -52,11 +52,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -801,27 +802,34 @@ public final class FactoryUtil {
     }
 
     static List<Factory> discoverFactories(ClassLoader classLoader) {
-        final List<Factory> result = new LinkedList<>();
-        ServiceLoaderUtil.load(Factory.class, classLoader)
-                .forEach(
-                        loadResult -> {
-                            if (loadResult.hasFailed()) {
-                                if (loadResult.getError() instanceof 
NoClassDefFoundError) {
-                                    LOG.debug(
-                                            "NoClassDefFoundError when loading 
a "
-                                                    + Factory.class
-                                                    + ". This is expected when 
trying to load a format dependency but no flink-connector-files is loaded.",
-                                            loadResult.getError());
-                                    // After logging, we just ignore this 
failure
-                                    return;
-                                }
-                                throw new TableException(
-                                        "Unexpected error when trying to load 
service provider for factories.",
-                                        loadResult.getError());
-                            }
-                            result.add(loadResult.getService());
-                        });
-        return result;
+        final Iterator<Factory> serviceLoaderIterator =
+                ServiceLoader.load(Factory.class, classLoader).iterator();
+
+        final List<Factory> loadResults = new ArrayList<>();
+        while (true) {
+            try {
+                // error handling should also be applied to the hasNext() call 
because service
+                // loading might cause problems here as well
+                if (!serviceLoaderIterator.hasNext()) {
+                    break;
+                }
+
+                loadResults.add(serviceLoaderIterator.next());
+            } catch (Throwable t) {
+                if (t instanceof NoClassDefFoundError) {
+                    LOG.debug(
+                            "NoClassDefFoundError when loading a "
+                                    + Factory.class.getCanonicalName()
+                                    + ". This is expected when trying to load 
a format dependency but no flink-connector-files is loaded.",
+                            t);
+                } else {
+                    throw new TableException(
+                            "Unexpected error when trying to load service 
provider.", t);
+                }
+            }
+        }
+
+        return loadResults;
     }
 
     private static String stringifyOption(String key, String value) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java
deleted file mode 100644
index 620e9c3230a..00000000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.ServiceLoader;
-
-/** This class contains utilities to deal with {@link ServiceLoader}. */
-class ServiceLoaderUtil {
-
-    /**
-     * This method behaves similarly to {@link ServiceLoader#load(Class, 
ClassLoader)}, but it
-     * returns a list with the results of the iteration, wrapping the 
iteration failures such as
-     * {@link NoClassDefFoundError}.
-     */
-    static <T> List<LoadResult<T>> load(Class<T> clazz, ClassLoader 
classLoader) {
-        List<LoadResult<T>> loadResults = new ArrayList<>();
-
-        Iterator<T> serviceLoaderIterator = ServiceLoader.load(clazz, 
classLoader).iterator();
-
-        while (true) {
-            try {
-                T next = serviceLoaderIterator.next();
-                loadResults.add(new LoadResult<>(next));
-            } catch (NoSuchElementException e) {
-                break;
-            } catch (Throwable t) {
-                loadResults.add(new LoadResult<>(t));
-            }
-        }
-
-        return loadResults;
-    }
-
-    static class LoadResult<T> {
-        private final T service;
-        private final Throwable error;
-
-        private LoadResult(T service, Throwable error) {
-            this.service = service;
-            this.error = error;
-        }
-
-        private LoadResult(T service) {
-            this(service, null);
-        }
-
-        private LoadResult(Throwable error) {
-            this(null, error);
-        }
-
-        public boolean hasFailed() {
-            return error != null;
-        }
-
-        public Throwable getError() {
-            return error;
-        }
-
-        public T getService() {
-            return service;
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index 45ef9bda2ec..6f6d2a8f40c 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.factories;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
@@ -33,12 +34,15 @@ import 
org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock;
 import org.apache.flink.table.factories.TestFormatFactory.EncodingFormatMock;
 import org.apache.flink.table.factories.utils.FactoryMocks;
 import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.MutableURLClassLoader;
 
 import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -668,6 +672,22 @@ class FactoryUtilTest {
                 .contains(serializationSchemaImplementationName);
     }
 
+    @Test
+    void testDiscoverFactoryFromClosedClassLoader() throws Exception {
+        MutableURLClassLoader classLoader =
+                FlinkUserCodeClassLoaders.create(
+                        new URL[0], FactoryUtilTest.class.getClassLoader(), 
new Configuration());
+        classLoader.close();
+        assertThatThrownBy(() -> FactoryUtil.discoverFactory(classLoader, 
Factory.class, "test"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                IllegalStateException.class,
+                                "Trying to access closed classloader. Please 
check if you store classloaders directly "
+                                        + "or indirectly in static fields. If 
the stacktrace suggests that the leak occurs in a third "
+                                        + "party library and cannot be fixed 
immediately, you can disable this check with the "
+                                        + "configuration 
'classloader.check-leaked-classloader'"));
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Helper methods
     // 
--------------------------------------------------------------------------------------------

Reply via email to