This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 88ee6d84 [client] Fluss client shouldn't load plugin by thread context 
classloader (#1267)
88ee6d84 is described below

commit 88ee6d84ba987891343d83624b23b8a68b12b1ec
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Jul 7 18:34:40 2025 +0800

    [client] Fluss client shouldn't load plugin by thread context classloader 
(#1267)
---
 .../token/SecurityTokenReceiverRepository.java     |  5 +-
 .../main/java/com/alibaba/fluss/fs/FileSystem.java |  6 ++-
 .../lake/lakestorage/LakeStoragePluginSetUp.java   |  4 +-
 .../fluss/security/auth/AuthenticationFactory.java |  7 ++-
 .../security/auth/sasl/jaas/DefaultLogin.java      | 29 +++++++-----
 .../security/auth/AuthenticationFactoryTest.java   | 16 +++++++
 .../utils/ParentResourceBlockingClassLoader.java   | 54 ++++++++++++++++++++++
 .../security/acl/FlinkAuthorizationITCase.java     | 26 +++++++++++
 .../fluss/rpc/netty/server/NettyServer.java        |  5 +-
 .../fluss/server/authorizer/AuthorizerLoader.java  |  7 ++-
 10 files changed, 141 insertions(+), 18 deletions(-)

diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java
index 898e3ff0..ba18f44b 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java
@@ -61,7 +61,10 @@ class SecurityTokenReceiverRepository {
                             receiver.scheme());
                 };
 
-        
ServiceLoader.load(SecurityTokenReceiver.class).iterator().forEachRemaining(loadReceiver);
+        ServiceLoader.load(
+                        SecurityTokenReceiver.class, 
SecurityTokenReceiver.class.getClassLoader())
+                .iterator()
+                .forEachRemaining(loadReceiver);
 
         LOG.info("Security token receivers loaded successfully");
         return receivers;
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java 
b/fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java
index f6c7ceb6..f5b0c62b 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java
@@ -266,7 +266,11 @@ public abstract class FileSystem {
                     Collection<Supplier<Iterator<FileSystemPlugin>>> 
pluginSuppliers =
                             new ArrayList<>(2);
                     pluginSuppliers.add(
-                            () -> 
ServiceLoader.load(FileSystemPlugin.class).iterator());
+                            () ->
+                                    ServiceLoader.load(
+                                                    FileSystemPlugin.class,
+                                                    
FileSystem.class.getClassLoader())
+                                            .iterator());
 
                     if (pluginManager != null) {
                         pluginSuppliers.add(
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java
index 8f226eed..0e77baca 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java
@@ -54,7 +54,9 @@ public class LakeStoragePluginSetUp {
     private static Iterator<LakeStoragePlugin> getAllLakeStoragePlugins(
             @Nullable PluginManager pluginManager) {
         final Iterator<LakeStoragePlugin> pluginIteratorSPI =
-                ServiceLoader.load(LakeStoragePlugin.class).iterator();
+                ServiceLoader.load(
+                                LakeStoragePlugin.class, 
LakeStoragePlugin.class.getClassLoader())
+                        .iterator();
         if (pluginManager == null) {
             return pluginIteratorSPI;
         } else {
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java
index 8f9ebbcd..8a31dbca 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java
@@ -118,7 +118,12 @@ public class AuthenticationFactory {
             String protocol, Class<T> pluginInterface, @Nullable PluginManager 
pluginManager) {
 
         Collection<Supplier<Iterator<AuthenticationPlugin>>> pluginSuppliers = 
new ArrayList<>(2);
-        pluginSuppliers.add(() -> 
ServiceLoader.load(AuthenticationPlugin.class).iterator());
+        pluginSuppliers.add(
+                () ->
+                        ServiceLoader.load(
+                                        AuthenticationPlugin.class,
+                                        
AuthenticationPlugin.class.getClassLoader())
+                                .iterator());
         if (pluginManager != null) {
             pluginSuppliers.add(() -> 
pluginManager.load(AuthenticationPlugin.class));
         }
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
index 2d41ef14..f32a0a7a 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
@@ -17,6 +17,8 @@
 
 package com.alibaba.fluss.security.auth.sasl.jaas;
 
+import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,18 +47,21 @@ public class DefaultLogin implements Login {
 
     @Override
     public LoginContext login() throws LoginException {
-        loginContext =
-                new LoginContext(
-                        contextName,
-                        null,
-                        callbacks -> {
-                            // Nothing here until we support some mechanisms 
such as sasl/GSSAPI
-                            // later.
-                            throw new UnsupportedCallbackException(
-                                    callbacks[0], "Unrecognized SASL 
mechanism.");
-                        },
-                        jaasConfig);
-        loginContext.login();
+        try (TemporaryClassLoaderContext ignored =
+                
TemporaryClassLoaderContext.of(DefaultLogin.class.getClassLoader())) {
+            loginContext =
+                    new LoginContext(
+                            contextName,
+                            null,
+                            callbacks -> {
+                                // Nothing here until we support some 
mechanisms such as sasl/GSSAPI
+                                // later.
+                                throw new UnsupportedCallbackException(
+                                        callbacks[0], "Unrecognized SASL 
mechanism.");
+                            },
+                            jaasConfig);
+            loginContext.login();
+        }
         LOG.info("Successfully logged in.");
         return loginContext;
     }
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java
index fb21d0d6..3bf5c57f 100644
--- 
a/fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java
@@ -21,9 +21,13 @@ import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.metadata.ValidationException;
 import 
com.alibaba.fluss.security.auth.TestIdentifierAuthenticationPlugin.TestIdentifierClientAuthenticator;
 import 
com.alibaba.fluss.security.auth.TestIdentifierAuthenticationPlugin.TestIdentifierServerAuthenticator;
+import com.alibaba.fluss.utils.ParentResourceBlockingClassLoader;
+import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
 
 import org.junit.jupiter.api.Test;
 
+import java.net.URL;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -95,4 +99,16 @@ public class AuthenticationFactoryTest {
                                 .get())
                 .isInstanceOf(TestIdentifierServerAuthenticator.class);
     }
+
+    @Test
+    void testNotIncludedInThreadContextClassloader() {
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(new 
ParentResourceBlockingClassLoader(new URL[0]))) {
+            Configuration configuration = new Configuration();
+            configuration.setString("client.security.protocol", "SSL_TEST");
+            configuration.setString("security.protocol.map", "FLUSS:SSL_TEST");
+            
assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration).get())
+                    .isInstanceOf(TestIdentifierClientAuthenticator.class);
+        }
+    }
 }
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/utils/ParentResourceBlockingClassLoader.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/utils/ParentResourceBlockingClassLoader.java
new file mode 100644
index 00000000..cd68ebde
--- /dev/null
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/utils/ParentResourceBlockingClassLoader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.utils;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Enumeration;
+
+import static org.apache.logging.log4j.util.LoaderUtil.findResources;
+
+/**
+ * A class loader that blocks resource loading from parent class loader. 
Designed to simulate SPI
+ * loading behavior without delegation to parent.
+ */
+public class ParentResourceBlockingClassLoader extends URLClassLoader {
+    public ParentResourceBlockingClassLoader(URL[] urls) {
+        super(urls);
+    }
+
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        synchronized (getClassLoadingLock(name)) {
+            // First, check if the class has already been loaded
+            Class<?> c = findLoadedClass(name);
+            c = findClass(name);
+            if (resolve) {
+                resolveClass(c);
+            }
+            return c;
+        }
+    }
+
+    @Override
+    public Enumeration<URL> getResources(String name) throws IOException {
+        // Skip parent class loader resource loading during Service.load
+        return findResources(name);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 15f84f5e..3d0e5bec 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -27,7 +27,10 @@ import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.security.acl.FlussPrincipal;
 import com.alibaba.fluss.security.acl.OperationType;
 import com.alibaba.fluss.security.acl.Resource;
+import com.alibaba.fluss.security.auth.sasl.jaas.LoginManager;
 import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import com.alibaba.fluss.utils.ParentResourceBlockingClassLoader;
+import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
 
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -43,6 +46,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.net.URL;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -385,6 +389,28 @@ abstract class FlinkAuthorizationITCase extends 
AbstractTestBase {
                 Arrays.asList("+I[1, beijing, zhangsan]", "+I[2, shanghai, 
lisi]"));
     }
 
+    // this test is to mock `--jar` which is not loaded by the app classloader.
+    @Test
+    void testNotIncludedInThreadContextClassloader() throws Exception {
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(new 
ParentResourceBlockingClassLoader(new URL[0]))) {
+            // clear the cache of login context to make sure load the class 
again.
+            LoginManager.closeAll();
+            tEnv.executeSql(
+                            String.format(
+                                    "create catalog test_classloader_catalog 
with ('type' = 'fluss', "
+                                            + "'bootstrap.servers' = '%s',"
+                                            + "'client.security.protocol' = 
'sasl',"
+                                            + 
"'client.security.sasl.mechanism' = 'PLAIN', \n"
+                                            + "'client.security.sasl.username' 
= 'guest', \n"
+                                            + "'client.security.sasl.password' 
= 'password2' \n"
+                                            + ")",
+                                    String.join(
+                                            ",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS))))
+                    .await();
+        }
+    }
+
     void addAcl(Resource resource, OperationType operationType)
             throws ExecutionException, InterruptedException {
         tEnv.executeSql(
diff --git 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java
index e1245d85..9e641239 100644
--- 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java
+++ 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java
@@ -256,7 +256,10 @@ public final class NettyServer implements RpcServer {
                             protocol.name());
                     protocols.put(protocol.name(), protocol);
                 };
-        
ServiceLoader.load(NetworkProtocolPlugin.class).iterator().forEachRemaining(loadProtocol);
+        ServiceLoader.load(
+                        NetworkProtocolPlugin.class, 
NetworkProtocolPlugin.class.getClassLoader())
+                .iterator()
+                .forEachRemaining(loadProtocol);
 
         if (protocols.containsKey(protocolName)) {
             LOG.info("Protocol plugin {} loaded successfully", protocolName);
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java
index d69f131b..08f1f1c5 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java
@@ -49,7 +49,12 @@ public class AuthorizerLoader {
         }
         String authorizerType = configuration.get(AUTHORIZER_TYPE);
         Collection<Supplier<Iterator<AuthorizationPlugin>>> pluginSuppliers = 
new ArrayList<>(2);
-        pluginSuppliers.add(() -> 
ServiceLoader.load(AuthorizationPlugin.class).iterator());
+        pluginSuppliers.add(
+                () ->
+                        ServiceLoader.load(
+                                        AuthorizationPlugin.class,
+                                        
AuthorizationPlugin.class.getClassLoader())
+                                .iterator());
 
         if (pluginManager != null) {
             pluginSuppliers.add(() -> 
pluginManager.load(AuthorizationPlugin.class));

Reply via email to