[ 
https://issues.apache.org/jira/browse/KAFKA-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351915#comment-16351915
 ] 

ASF GitHub Bot commented on KAFKA-6288:
---------------------------------------

ewencp closed pull request #4481: KAFKA-6288: Broken symlink interrupts 
scanning of the plugin path
URL: https://github.com/apache/kafka/pull/4481
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f06155f930a..de1bdfd4b04 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -57,7 +57,7 @@
               
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
 
     <suppress checks="NPathComplexity"
-              
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values).java"/>
+              
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values|PluginUtils).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 0c133b9a891..8a44d4d1207 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -52,6 +52,7 @@
 
 public class DelegatingClassLoader extends URLClassLoader {
     private static final Logger log = 
LoggerFactory.getLogger(DelegatingClassLoader.class);
+    private static final String CLASSPATH_NAME = "classpath";
 
     private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> 
pluginLoaders;
     private final Map<String, String> aliases;
@@ -139,10 +140,23 @@ public Object run() {
     }
 
     protected void initLoaders() {
-        String path = null;
+        for (String configPath : pluginPaths) {
+            initPluginLoader(configPath);
+        }
+        // Finally add parent/system loader.
+        initPluginLoader(CLASSPATH_NAME);
+        addAllAliases();
+    }
+
+    private void initPluginLoader(String path) {
         try {
-            for (String configPath : pluginPaths) {
-                path = configPath;
+            if (CLASSPATH_NAME.equals(path)) {
+                scanUrlsAndAddPlugins(
+                        getParent(),
+                        ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
+                        null
+                );
+            } else {
                 Path pluginPath = Paths.get(path).toAbsolutePath();
                 // Update for exception handling
                 path = pluginPath.toString();
@@ -156,14 +170,6 @@ protected void initLoaders() {
                     registerPlugin(pluginPath);
                 }
             }
-
-            path = "classpath";
-            // Finally add parent/system loader.
-            scanUrlsAndAddPlugins(
-                    getParent(),
-                    ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
-                    null
-            );
         } catch (InvalidPathException | MalformedURLException e) {
             log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
         } catch (IOException e) {
@@ -171,7 +177,6 @@ protected void initLoaders() {
         } catch (InstantiationException | IllegalAccessException e) {
             log.error("Could not instantiate plugins in: {}. Ignoring: {}", 
path, e);
         }
-        addAllAliases();
     }
 
     private void registerPlugin(Path pluginLocation)
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index d85986ea628..d490bde4ed2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -133,7 +133,7 @@
     private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new 
DirectoryStream
             .Filter<Path>() {
         @Override
-        public boolean accept(Path path) throws IOException {
+        public boolean accept(Path path) {
             return Files.isDirectory(path) || isArchive(path) || 
isClassFile(path);
         }
     };
@@ -232,16 +232,29 @@ public static boolean isClassFile(Path path) {
 
                 Path adjacent = neighbors.next();
                 if (Files.isSymbolicLink(adjacent)) {
-                    Path symlink = Files.readSymbolicLink(adjacent);
-                    // if symlink is absolute resolve() returns the absolute 
symlink itself
-                    Path parent = adjacent.getParent();
-                    if (parent == null) {
-                        continue;
-                    }
-                    Path absolute = parent.resolve(symlink).toRealPath();
-                    if (Files.exists(absolute)) {
-                        adjacent = absolute;
-                    } else {
+                    try {
+                        Path symlink = Files.readSymbolicLink(adjacent);
+                        // if symlink is absolute resolve() returns the 
absolute symlink itself
+                        Path parent = adjacent.getParent();
+                        if (parent == null) {
+                            continue;
+                        }
+                        Path absolute = parent.resolve(symlink).toRealPath();
+                        if (Files.exists(absolute)) {
+                            adjacent = absolute;
+                        } else {
+                            continue;
+                        }
+                    } catch (IOException e) {
+                        // See 
https://issues.apache.org/jira/browse/KAFKA-6288 for a reported
+                        // failure. Such a failure at this stage is not easily 
reproducible and
+                        // therefore an exception is caught and ignored after 
issuing a
+                        // warning. This allows class scanning to continue for 
non-broken plugins.
+                        log.warn(
+                                "Resolving symbolic link '{}' failed. Ignoring 
this path.",
+                                adjacent,
+                                e
+                        );
                         continue;
                     }
                 }
@@ -341,8 +354,8 @@ private static String prunePluginName(PluginDesc<?> plugin, 
String suffix) {
     }
 
     private static class DirectoryEntry {
-        DirectoryStream<Path> stream;
-        Iterator<Path> iterator;
+        final DirectoryStream<Path> stream;
+        final Iterator<Path> iterator;
 
         DirectoryEntry(DirectoryStream<Path> stream) {
             this.stream = stream;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broken symlink interrupts scanning the plugin path
> --------------------------------------------------
>
>                 Key: KAFKA-6288
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6288
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0, 0.11.0.2
>            Reporter: Yeva Byzek
>            Assignee: Konstantine Karantasis
>            Priority: Major
>             Fix For: 1.0.1, 0.11.0.3
>
>         Attachments: 6288.v1.txt
>
>
> KAFKA-6087 introduced support for scanning relative symlinks in the plugin 
> path. However, if a relative symlink points to a target that doesn't exist, 
> then scanning the plugin path is interrupted. The consequence is that the 
> unscanned connectors in the plugin path may effectively not be usable.
> Desired behavior is that the symlink with the non-existent target is skipped 
> and scanning the plugin path continues.
> Example of error message:
> {noformat}
> [2017-11-30 20:19:26,226] ERROR Could not get listing for plugin path: 
> /usr/share/java. Ignoring. 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:170)
> java.nio.file.NoSuchFileException: /usr/share/java/name.jar
>       at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>       at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>       at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>       at sun.nio.fs.UnixPath.toRealPath(UnixPath.java:837)
>       at 
> org.apache.kafka.connect.runtime.isolation.PluginUtils.pluginUrls(PluginUtils.java:241)
>       at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:181)
>       at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:153)
>       at 
> org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:47)
>       at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:70)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to