This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a40eda0262822a7c4c9b556b72822dedcd1179eb Author: Lari Hotari <[email protected]> AuthorDate: Fri Dec 3 13:37:21 2021 +0200 [Functions] Fix classloader leaks (#12973) * Fix classloader leak in FunctionCommon.getClassLoaderFromPackage * Fix classloader leak in SinksImpl and SourcesImpl * Fix logic for shouldCloseClassLoader (cherry picked from commit cab946b4ca68e1ffc6dee3932bc4c0fc7e7da66e) --- .../pulsar/common/util/ClassLoaderUtils.java | 14 ++ .../pulsar/functions/utils/FunctionCommon.java | 151 ++++++++++++--------- .../functions/worker/rest/api/SinksImpl.java | 32 +++-- .../functions/worker/rest/api/SourcesImpl.java | 33 +++-- 4 files changed, 140 insertions(+), 90 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java index 0e1e188..69e4c63 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java @@ -18,16 +18,20 @@ */ package org.apache.pulsar.common.util; +import java.io.Closeable; import java.io.File; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.security.AccessController; import java.security.PrivilegedAction; +import lombok.extern.slf4j.Slf4j; /** * Helper methods wrt Classloading. */ +@Slf4j public class ClassLoaderUtils { /** * Load a jar. @@ -76,4 +80,14 @@ public class ClassLoaderUtils { String.format("%s does not implement %s", className, klass.getName())); } } + + public static void closeClassLoader(ClassLoader classLoader) { + if (classLoader instanceof Closeable) { + try { + ((Closeable) classLoader).close(); + } catch (IOException e) { + log.error("Error closing classloader {}", classLoader, e); + } + } + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index a13695e..f72814d 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -382,97 +382,114 @@ public class FunctionCommon { String narExtractionDirectory) { String connectorClassName = className; ClassLoader jarClassLoader = null; + boolean keepJarClassLoader = false; ClassLoader narClassLoader = null; + boolean keepNarClassLoader = false; Exception jarClassLoaderException = null; Exception narClassLoaderException = null; try { - jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile); - } catch (Exception e) { - jarClassLoaderException = e; - } - try { - narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory); - } catch (Exception e) { - narClassLoaderException = e; - } - - // if connector class name is not provided, we can only try to load archive as a NAR - if (isEmpty(connectorClassName)) { - if (narClassLoader == null) { - throw new IllegalArgumentException(String.format("%s package does not have the correct format. " + - "Pulsar cannot determine if the package is a NAR package or JAR package. " + - "%s classname is not provided and attempts to load it as a NAR package produced the following error.", - capFirstLetter(componentType), capFirstLetter(componentType)), - narClassLoaderException); - } try { - if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) { - connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader); - } else { - connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader); - } - } catch (IOException e) { - throw new IllegalArgumentException(String.format("Failed to extract %s class from archive", - componentType.toString().toLowerCase()), e); + jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile); + } catch (Exception e) { + jarClassLoaderException = e; } - try { - narClassLoader.loadClass(connectorClassName); - return narClassLoader; - } catch (ClassNotFoundException | NoClassDefFoundError e) { - throw new IllegalArgumentException( - String.format("%s class %s must be in class path", capFirstLetter(componentType), connectorClassName), e); + narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory); + } catch (Exception e) { + narClassLoaderException = e; } - } else { - // if connector class name is provided, we need to try to load it as a JAR and as a NAR. - if (jarClassLoader != null) { + // if connector class name is not provided, we can only try to load archive as a NAR + if (isEmpty(connectorClassName)) { + if (narClassLoader == null) { + throw new IllegalArgumentException(String.format("%s package does not have the correct format. " + + "Pulsar cannot determine if the package is a NAR package or JAR package. " + + "%s classname is not provided and attempts to load it as a NAR package produced " + + "the following error.", + capFirstLetter(componentType), capFirstLetter(componentType)), + narClassLoaderException); + } try { - jarClassLoader.loadClass(connectorClassName); - return jarClassLoader; - } catch (ClassNotFoundException | NoClassDefFoundError e) { - // class not found in JAR try loading as a NAR and searching for the class - if (narClassLoader != null) { - - try { - narClassLoader.loadClass(connectorClassName); - return narClassLoader; - } catch (ClassNotFoundException | NoClassDefFoundError e1) { - throw new IllegalArgumentException( - String.format("%s class %s must be in class path", - capFirstLetter(componentType), connectorClassName), e1); - } + if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) { + connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader); } else { - throw new IllegalArgumentException( - String.format("%s class %s must be in class path", capFirstLetter(componentType), - connectorClassName), e); + connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader); } + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Failed to extract %s class from archive", + componentType.toString().toLowerCase()), e); } - } else if (narClassLoader != null) { + try { narClassLoader.loadClass(connectorClassName); + keepNarClassLoader = true; return narClassLoader; - } catch (ClassNotFoundException | NoClassDefFoundError e1) { + } catch (ClassNotFoundException | NoClassDefFoundError e) { throw new IllegalArgumentException( - String.format("%s class %s must be in class path", - capFirstLetter(componentType), connectorClassName), e1); + String.format("%s class %s must be in class path", capFirstLetter(componentType), + connectorClassName), e); } + } else { - StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType) - + " package does not have the correct format." - + " Pulsar cannot determine if the package is a NAR package or JAR package."); + // if connector class name is provided, we need to try to load it as a JAR and as a NAR. + if (jarClassLoader != null) { + try { + jarClassLoader.loadClass(connectorClassName); + keepJarClassLoader = true; + return jarClassLoader; + } catch (ClassNotFoundException | NoClassDefFoundError e) { + // class not found in JAR try loading as a NAR and searching for the class + if (narClassLoader != null) { + + try { + narClassLoader.loadClass(connectorClassName); + keepNarClassLoader = true; + return narClassLoader; + } catch (ClassNotFoundException | NoClassDefFoundError e1) { + throw new IllegalArgumentException( + String.format("%s class %s must be in class path", + capFirstLetter(componentType), connectorClassName), e1); + } + } else { + throw new IllegalArgumentException( + String.format("%s class %s must be in class path", capFirstLetter(componentType), + connectorClassName), e); + } + } + } else if (narClassLoader != null) { + try { + narClassLoader.loadClass(connectorClassName); + keepNarClassLoader = true; + return narClassLoader; + } catch (ClassNotFoundException | NoClassDefFoundError e1) { + throw new IllegalArgumentException( + String.format("%s class %s must be in class path", + capFirstLetter(componentType), connectorClassName), e1); + } + } else { + StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType) + + " package does not have the correct format." + + " Pulsar cannot determine if the package is a NAR package or JAR package."); - if (jarClassLoaderException != null) { - errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage()); - } + if (jarClassLoaderException != null) { + errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage()); + } - if (narClassLoaderException != null) { - errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage()); - } + if (narClassLoaderException != null) { + errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage()); + } - throw new IllegalArgumentException(errorMsg.toString()); + throw new IllegalArgumentException(errorMsg.toString()); + } + } + } finally { + if (!keepJarClassLoader) { + ClassLoaderUtils.closeClassLoader(jarClassLoader); + } + if (!keepNarClassLoader) { + ClassLoaderUtils.closeClassLoader(narClassLoader); } } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index 31a7234..bbb1680 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.policies.data.ExceptionInformation; import org.apache.pulsar.common.policies.data.SinkStatus; +import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.functions.auth.FunctionAuthData; import org.apache.pulsar.functions.instance.InstanceUtils; @@ -734,19 +735,28 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic } } - // if sink is not builtin, attempt to extract classloader from package file if it exists - if (classLoader == null && sinkPackageFile != null) { - classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(), - sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory()); - } + boolean shouldCloseClassLoader = false; + try { - if (classLoader == null) { - throw new IllegalArgumentException("Sink package is not provided"); - } + // if sink is not builtin, attempt to extract classloader from package file if it exists + if (classLoader == null && sinkPackageFile != null) { + classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(), + sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory()); + shouldCloseClassLoader = true; + } - SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails( - sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig()); - return SinkConfigUtils.convert(sinkConfig, sinkDetails); + if (classLoader == null) { + throw new IllegalArgumentException("Sink package is not provided"); + } + + SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails( + sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig()); + return SinkConfigUtils.convert(sinkConfig, sinkDetails); + } finally { + if (shouldCloseClassLoader) { + ClassLoaderUtils.closeClassLoader(classLoader); + } + } } private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index 1e9148b..82a818d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.policies.data.ExceptionInformation; import org.apache.pulsar.common.policies.data.SourceStatus; +import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.functions.auth.FunctionAuthData; import org.apache.pulsar.functions.instance.InstanceUtils; @@ -730,20 +731,28 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe } } - // if source is not builtin, attempt to extract classloader from package file if it exists - if (classLoader == null && sourcePackageFile != null) { - classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(), - sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory()); - } + boolean shouldCloseClassLoader = false; + try { + // if source is not builtin, attempt to extract classloader from package file if it exists + if (classLoader == null && sourcePackageFile != null) { + classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(), + sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory()); + shouldCloseClassLoader = true; + } - if (classLoader == null) { - throw new IllegalArgumentException("Source package is not provided"); - } + if (classLoader == null) { + throw new IllegalArgumentException("Source package is not provided"); + } - SourceConfigUtils.ExtractedSourceDetails sourceDetails - = SourceConfigUtils.validateAndExtractDetails( - sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig()); - return SourceConfigUtils.convert(sourceConfig, sourceDetails); + SourceConfigUtils.ExtractedSourceDetails sourceDetails + = SourceConfigUtils.validateAndExtractDetails( + sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig()); + return SourceConfigUtils.convert(sourceConfig, sourceDetails); + } finally { + if (shouldCloseClassLoader) { + ClassLoaderUtils.closeClassLoader(classLoader); + } + } } private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
