This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a40eda0262822a7c4c9b556b72822dedcd1179eb
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Dec 3 13:37:21 2021 +0200

    [Functions] Fix classloader leaks (#12973)
    
    * Fix classloader leak in FunctionCommon.getClassLoaderFromPackage
    
    * Fix classloader leak in SinksImpl and SourcesImpl
    
    * Fix logic for shouldCloseClassLoader
    
    (cherry picked from commit cab946b4ca68e1ffc6dee3932bc4c0fc7e7da66e)
---
 .../pulsar/common/util/ClassLoaderUtils.java       |  14 ++
 .../pulsar/functions/utils/FunctionCommon.java     | 151 ++++++++++++---------
 .../functions/worker/rest/api/SinksImpl.java       |  32 +++--
 .../functions/worker/rest/api/SourcesImpl.java     |  33 +++--
 4 files changed, 140 insertions(+), 90 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
index 0e1e188..69e4c63 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -18,16 +18,20 @@
  */
 package org.apache.pulsar.common.util;
 
+import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Helper methods wrt Classloading.
  */
+@Slf4j
 public class ClassLoaderUtils {
     /**
      * Load a jar.
@@ -76,4 +80,14 @@ public class ClassLoaderUtils {
                     String.format("%s does not implement %s", className, 
klass.getName()));
         }
     }
+
+    public static void closeClassLoader(ClassLoader classLoader) {
+        if (classLoader instanceof Closeable) {
+            try {
+                ((Closeable) classLoader).close();
+            } catch (IOException e) {
+                log.error("Error closing classloader {}", classLoader, e);
+            }
+        }
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index a13695e..f72814d 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -382,97 +382,114 @@ public class FunctionCommon {
             String narExtractionDirectory) {
         String connectorClassName = className;
         ClassLoader jarClassLoader = null;
+        boolean keepJarClassLoader = false;
         ClassLoader narClassLoader = null;
+        boolean keepNarClassLoader = false;
 
         Exception jarClassLoaderException = null;
         Exception narClassLoaderException = null;
 
         try {
-            jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
-        } catch (Exception e) {
-            jarClassLoaderException = e;
-        }
-        try {
-            narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, 
narExtractionDirectory);
-        } catch (Exception e) {
-            narClassLoaderException = e;
-        }
-
-        // if connector class name is not provided, we can only try to load 
archive as a NAR
-        if (isEmpty(connectorClassName)) {
-            if (narClassLoader == null) {
-                throw new IllegalArgumentException(String.format("%s package 
does not have the correct format. " +
-                                "Pulsar cannot determine if the package is a 
NAR package or JAR package. " +
-                                "%s classname is not provided and attempts to 
load it as a NAR package produced the following error.",
-                        capFirstLetter(componentType), 
capFirstLetter(componentType)),
-                        narClassLoaderException);
-            }
             try {
-                if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE)
 {
-                    connectorClassName = 
ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
-                } else {
-                    connectorClassName = 
ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException(String.format("Failed to 
extract %s class from archive",
-                        componentType.toString().toLowerCase()), e);
+                jarClassLoader = 
ClassLoaderUtils.extractClassLoader(packageFile);
+            } catch (Exception e) {
+                jarClassLoaderException = e;
             }
-
             try {
-                narClassLoader.loadClass(connectorClassName);
-                return narClassLoader;
-            } catch (ClassNotFoundException | NoClassDefFoundError e) {
-                throw new IllegalArgumentException(
-                        String.format("%s class %s must be in class path", 
capFirstLetter(componentType), connectorClassName), e);
+                narClassLoader = 
FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
+            } catch (Exception e) {
+                narClassLoaderException = e;
             }
 
-        } else {
-            // if connector class name is provided, we need to try to load it 
as a JAR and as a NAR.
-            if (jarClassLoader != null) {
+            // if connector class name is not provided, we can only try to 
load archive as a NAR
+            if (isEmpty(connectorClassName)) {
+                if (narClassLoader == null) {
+                    throw new IllegalArgumentException(String.format("%s 
package does not have the correct format. " +
+                                    "Pulsar cannot determine if the package is 
a NAR package or JAR package. " +
+                                    "%s classname is not provided and attempts 
to load it as a NAR package produced " +
+                                    "the following error.",
+                            capFirstLetter(componentType), 
capFirstLetter(componentType)),
+                            narClassLoaderException);
+                }
                 try {
-                    jarClassLoader.loadClass(connectorClassName);
-                    return jarClassLoader;
-                } catch (ClassNotFoundException | NoClassDefFoundError e) {
-                    // class not found in JAR try loading as a NAR and 
searching for the class
-                    if (narClassLoader != null) {
-
-                        try {
-                            narClassLoader.loadClass(connectorClassName);
-                            return narClassLoader;
-                        } catch (ClassNotFoundException | NoClassDefFoundError 
e1) {
-                            throw new IllegalArgumentException(
-                                    String.format("%s class %s must be in 
class path",
-                                            capFirstLetter(componentType), 
connectorClassName), e1);
-                        }
+                    if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE)
 {
+                        connectorClassName = 
ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
                     } else {
-                        throw new IllegalArgumentException(
-                                String.format("%s class %s must be in class 
path", capFirstLetter(componentType),
-                                        connectorClassName), e);
+                        connectorClassName = 
ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
                     }
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("Failed 
to extract %s class from archive",
+                            componentType.toString().toLowerCase()), e);
                 }
-            } else if (narClassLoader != null) {
+
                 try {
                     narClassLoader.loadClass(connectorClassName);
+                    keepNarClassLoader = true;
                     return narClassLoader;
-                } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+                } catch (ClassNotFoundException | NoClassDefFoundError e) {
                     throw new IllegalArgumentException(
-                            String.format("%s class %s must be in class path",
-                                    capFirstLetter(componentType), 
connectorClassName), e1);
+                            String.format("%s class %s must be in class path", 
capFirstLetter(componentType),
+                                    connectorClassName), e);
                 }
+
             } else {
-                StringBuilder errorMsg = new 
StringBuilder(capFirstLetter(componentType)
-                        + " package does not have the correct format."
-                        + " Pulsar cannot determine if the package is a NAR 
package or JAR package.");
+                // if connector class name is provided, we need to try to load 
it as a JAR and as a NAR.
+                if (jarClassLoader != null) {
+                    try {
+                        jarClassLoader.loadClass(connectorClassName);
+                        keepJarClassLoader = true;
+                        return jarClassLoader;
+                    } catch (ClassNotFoundException | NoClassDefFoundError e) {
+                        // class not found in JAR try loading as a NAR and 
searching for the class
+                        if (narClassLoader != null) {
+
+                            try {
+                                narClassLoader.loadClass(connectorClassName);
+                                keepNarClassLoader = true;
+                                return narClassLoader;
+                            } catch (ClassNotFoundException | 
NoClassDefFoundError e1) {
+                                throw new IllegalArgumentException(
+                                        String.format("%s class %s must be in 
class path",
+                                                capFirstLetter(componentType), 
connectorClassName), e1);
+                            }
+                        } else {
+                            throw new IllegalArgumentException(
+                                    String.format("%s class %s must be in 
class path", capFirstLetter(componentType),
+                                            connectorClassName), e);
+                        }
+                    }
+                } else if (narClassLoader != null) {
+                    try {
+                        narClassLoader.loadClass(connectorClassName);
+                        keepNarClassLoader = true;
+                        return narClassLoader;
+                    } catch (ClassNotFoundException | NoClassDefFoundError e1) 
{
+                        throw new IllegalArgumentException(
+                                String.format("%s class %s must be in class 
path",
+                                        capFirstLetter(componentType), 
connectorClassName), e1);
+                    }
+                } else {
+                    StringBuilder errorMsg = new 
StringBuilder(capFirstLetter(componentType)
+                            + " package does not have the correct format."
+                            + " Pulsar cannot determine if the package is a 
NAR package or JAR package.");
 
-                if (jarClassLoaderException != null) {
-                    errorMsg.append(" Attempts to load it as a JAR package 
produced error: " + jarClassLoaderException.getMessage());
-                }
+                    if (jarClassLoaderException != null) {
+                        errorMsg.append(" Attempts to load it as a JAR package 
produced error: " + jarClassLoaderException.getMessage());
+                    }
 
-                if (narClassLoaderException != null) {
-                    errorMsg.append(" Attempts to load it as a NAR package 
produced error: " + narClassLoaderException.getMessage());
-                }
+                    if (narClassLoaderException != null) {
+                        errorMsg.append(" Attempts to load it as a NAR package 
produced error: " + narClassLoaderException.getMessage());
+                    }
 
-                throw new IllegalArgumentException(errorMsg.toString());
+                    throw new IllegalArgumentException(errorMsg.toString());
+                }
+            }
+        } finally {
+            if (!keepJarClassLoader) {
+                ClassLoaderUtils.closeClassLoader(jarClassLoader);
+            }
+            if (!keepNarClassLoader) {
+                ClassLoaderUtils.closeClassLoader(narClassLoader);
             }
         }
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 31a7234..bbb1680 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -734,19 +735,28 @@ public class SinksImpl extends ComponentImpl implements 
Sinks<PulsarWorkerServic
             }
         }
 
-        // if sink is not builtin, attempt to extract classloader from package 
file if it exists
-        if (classLoader == null && sinkPackageFile != null) {
-            classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
-                    sinkPackageFile, 
worker().getWorkerConfig().getNarExtractionDirectory());
-        }
+        boolean shouldCloseClassLoader = false;
+        try {
 
-        if (classLoader == null) {
-            throw new IllegalArgumentException("Sink package is not provided");
-        }
+            // if sink is not builtin, attempt to extract classloader from 
package file if it exists
+            if (classLoader == null && sinkPackageFile != null) {
+                classLoader = 
getClassLoaderFromPackage(sinkConfig.getClassName(),
+                        sinkPackageFile, 
worker().getWorkerConfig().getNarExtractionDirectory());
+                shouldCloseClassLoader = true;
+            }
 
-        SinkConfigUtils.ExtractedSinkDetails sinkDetails = 
SinkConfigUtils.validateAndExtractDetails(
-                sinkConfig, classLoader, 
worker().getWorkerConfig().getValidateConnectorConfig());
-        return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+            if (classLoader == null) {
+                throw new IllegalArgumentException("Sink package is not 
provided");
+            }
+
+            SinkConfigUtils.ExtractedSinkDetails sinkDetails = 
SinkConfigUtils.validateAndExtractDetails(
+                    sinkConfig, classLoader, 
worker().getWorkerConfig().getValidateConnectorConfig());
+            return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+        } finally {
+            if (shouldCloseClassLoader) {
+                ClassLoaderUtils.closeClassLoader(classLoader);
+            }
+        }
     }
 
     private File downloadPackageFile(String packageName) throws IOException, 
PulsarAdminException {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 1e9148b..82a818d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -730,20 +731,28 @@ public class SourcesImpl extends ComponentImpl implements 
Sources<PulsarWorkerSe
             }
         }
 
-        // if source is not builtin, attempt to extract classloader from 
package file if it exists
-        if (classLoader == null && sourcePackageFile != null) {
-            classLoader = 
getClassLoaderFromPackage(sourceConfig.getClassName(),
-                    sourcePackageFile, 
worker().getWorkerConfig().getNarExtractionDirectory());
-        }
+        boolean shouldCloseClassLoader = false;
+        try {
+            // if source is not builtin, attempt to extract classloader from 
package file if it exists
+            if (classLoader == null && sourcePackageFile != null) {
+                classLoader = 
getClassLoaderFromPackage(sourceConfig.getClassName(),
+                        sourcePackageFile, 
worker().getWorkerConfig().getNarExtractionDirectory());
+                shouldCloseClassLoader = true;
+            }
 
-        if (classLoader == null) {
-            throw new IllegalArgumentException("Source package is not 
provided");
-        }
+            if (classLoader == null) {
+                throw new IllegalArgumentException("Source package is not 
provided");
+            }
 
-        SourceConfigUtils.ExtractedSourceDetails sourceDetails
-                = SourceConfigUtils.validateAndExtractDetails(
-                        sourceConfig, classLoader, 
worker().getWorkerConfig().getValidateConnectorConfig());
-        return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+            SourceConfigUtils.ExtractedSourceDetails sourceDetails
+                    = SourceConfigUtils.validateAndExtractDetails(
+                            sourceConfig, classLoader, 
worker().getWorkerConfig().getValidateConnectorConfig());
+            return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+        } finally {
+            if (shouldCloseClassLoader) {
+                ClassLoaderUtils.closeClassLoader(classLoader);
+            }
+        }
     }
 
     private File downloadPackageFile(String packageName) throws IOException, 
PulsarAdminException {

Reply via email to