This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 37e3861  KAFKA-6288: Broken symlink interrupts scanning of the plugin 
path
37e3861 is described below

commit 37e38611ccebff1bf14b0e6435fadcd74e406192
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Sun Feb 4 14:57:25 2018 -0800

    KAFKA-6288: Broken symlink interrupts scanning of the plugin path
    
    Submitting a fail safe fix for rare IOExceptions on symbolic links.
    
    The fix is submitted without a test case since it does seem easy to 
reproduce such type of failures (just having a broken symbolic link does not 
reproduce the issue) and it's considered pretty low risk.
    
    If accepted, needs to be ported at least to 1.0, if not 0.11
    
    Author: Konstantine Karantasis <konstant...@confluent.io>
    
    Reviewers: Randall Hauch <rha...@gmail.com>, Ewen Cheslack-Postava 
<e...@confluent.io>
    
    Closes #4481 from 
kkonstantine/KAFKA-6288-Broken-symlink-interrupts-scanning-the-plugin-path
    
    (cherry picked from commit 17aaff3606393b42d4e8ef5299141b5bb21300b0)
    Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../runtime/isolation/DelegatingClassLoader.java   | 29 +++++++++-------
 .../connect/runtime/isolation/PluginUtils.java     | 39 ++++++++++++++--------
 3 files changed, 44 insertions(+), 26 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6fd5f97..9825671 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -54,7 +54,7 @@
               files="AbstractRequest.java"/>
 
     <suppress checks="NPathComplexity"
-              
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes).java"/>
+              
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes|PluginUtils).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index ac0530e..886b415 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -49,6 +49,7 @@ import java.util.TreeSet;
 
 public class DelegatingClassLoader extends URLClassLoader {
     private static final Logger log = 
LoggerFactory.getLogger(DelegatingClassLoader.class);
+    private static final String CLASSPATH_NAME = "classpath";
 
     private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> 
pluginLoaders;
     private final Map<String, String> aliases;
@@ -136,10 +137,23 @@ public class DelegatingClassLoader extends URLClassLoader 
{
     }
 
     protected void initLoaders() {
-        String path = null;
+        for (String configPath : pluginPaths) {
+            initPluginLoader(configPath);
+        }
+        // Finally add parent/system loader.
+        initPluginLoader(CLASSPATH_NAME);
+        addAllAliases();
+    }
+
+    private void initPluginLoader(String path) {
         try {
-            for (String configPath : pluginPaths) {
-                path = configPath;
+            if (CLASSPATH_NAME.equals(path)) {
+                scanUrlsAndAddPlugins(
+                        getParent(),
+                        ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
+                        null
+                );
+            } else {
                 Path pluginPath = Paths.get(path).toAbsolutePath();
                 // Update for exception handling
                 path = pluginPath.toString();
@@ -153,14 +167,6 @@ public class DelegatingClassLoader extends URLClassLoader {
                     registerPlugin(pluginPath);
                 }
             }
-
-            path = "classpath";
-            // Finally add parent/system loader.
-            scanUrlsAndAddPlugins(
-                    getParent(),
-                    ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
-                    null
-            );
         } catch (InvalidPathException | MalformedURLException e) {
             log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
         } catch (IOException e) {
@@ -168,7 +174,6 @@ public class DelegatingClassLoader extends URLClassLoader {
         } catch (InstantiationException | IllegalAccessException e) {
             log.error("Could not instantiate plugins in: {}. Ignoring: {}", 
path, e);
         }
-        addAllAliases();
     }
 
     private void registerPlugin(Path pluginLocation)
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index d85986e..d490bde 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -133,7 +133,7 @@ public class PluginUtils {
     private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new 
DirectoryStream
             .Filter<Path>() {
         @Override
-        public boolean accept(Path path) throws IOException {
+        public boolean accept(Path path) {
             return Files.isDirectory(path) || isArchive(path) || 
isClassFile(path);
         }
     };
@@ -232,16 +232,29 @@ public class PluginUtils {
 
                 Path adjacent = neighbors.next();
                 if (Files.isSymbolicLink(adjacent)) {
-                    Path symlink = Files.readSymbolicLink(adjacent);
-                    // if symlink is absolute resolve() returns the absolute 
symlink itself
-                    Path parent = adjacent.getParent();
-                    if (parent == null) {
-                        continue;
-                    }
-                    Path absolute = parent.resolve(symlink).toRealPath();
-                    if (Files.exists(absolute)) {
-                        adjacent = absolute;
-                    } else {
+                    try {
+                        Path symlink = Files.readSymbolicLink(adjacent);
+                        // if symlink is absolute resolve() returns the 
absolute symlink itself
+                        Path parent = adjacent.getParent();
+                        if (parent == null) {
+                            continue;
+                        }
+                        Path absolute = parent.resolve(symlink).toRealPath();
+                        if (Files.exists(absolute)) {
+                            adjacent = absolute;
+                        } else {
+                            continue;
+                        }
+                    } catch (IOException e) {
+                        // See 
https://issues.apache.org/jira/browse/KAFKA-6288 for a reported
+                        // failure. Such a failure at this stage is not easily 
reproducible and
+                        // therefore an exception is caught and ignored after 
issuing a
+                        // warning. This allows class scanning to continue for 
non-broken plugins.
+                        log.warn(
+                                "Resolving symbolic link '{}' failed. Ignoring 
this path.",
+                                adjacent,
+                                e
+                        );
                         continue;
                     }
                 }
@@ -341,8 +354,8 @@ public class PluginUtils {
     }
 
     private static class DirectoryEntry {
-        DirectoryStream<Path> stream;
-        Iterator<Path> iterator;
+        final DirectoryStream<Path> stream;
+        final Iterator<Path> iterator;
 
         DirectoryEntry(DirectoryStream<Path> stream) {
             this.stream = stream;

-- 
To stop receiving notification emails like this one, please contact
ewe...@apache.org.

Reply via email to