Repository: kafka Updated Branches: refs/heads/trunk 327400449 -> c7f9bd2a6
KAFKA-3606: Traverse CLASSPATH during herder start ewencp Can you take a quick look? Author: Liquan Pei <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1252 from Ishiihara/pre-list-connectors Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c7f9bd2a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c7f9bd2a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c7f9bd2a Branch: refs/heads/trunk Commit: c7f9bd2a68ea7bb604c4dcf2a2f0b030fc019ca7 Parents: 3274004 Author: Liquan Pei <[email protected]> Authored: Thu Apr 21 17:59:23 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Apr 21 17:59:23 2016 -0700 ---------------------------------------------------------------------- .../kafka/connect/runtime/AbstractHerder.java | 49 ++++++++++++++------ 1 file changed, 36 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c7f9bd2a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index a22f15c..bd73589 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -84,6 +84,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>(); private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class); private static List<ConnectorPluginInfo> validConnectorPlugins; + private static final Object LOCK = new Object(); + private Thread classPathTraverser; + public AbstractHerder(Worker worker, String workerId, @@ -101,12 +104,20 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con this.worker.start(); this.statusBackingStore.start(); this.configBackingStore.start(); + traverseClassPath(); } protected void stopServices() { this.statusBackingStore.stop(); this.configBackingStore.stop(); this.worker.stop(); + if (this.classPathTraverser != null) { + try { + this.classPathTraverser.join(); + } catch (InterruptedException e) { + // ignore as it can only happen during shutdown + } + } } @Override @@ -248,22 +259,24 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con } public static List<ConnectorPluginInfo> connectorPlugins() { - if (validConnectorPlugins != null) { - return validConnectorPlugins; - } + synchronized (LOCK) { + if (validConnectorPlugins != null) { + return validConnectorPlugins; + } - Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); - Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class); - connectorClasses.removeAll(SKIPPED_CONNECTORS); - List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>(); - for (Class<? extends Connector> connectorClass: connectorClasses) { - int mod = connectorClass.getModifiers(); - if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) { - connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName())); + Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); + Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class); + connectorClasses.removeAll(SKIPPED_CONNECTORS); + List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>(); + for (Class<? extends Connector> connectorClass : connectorClasses) { + int mod = connectorClass.getModifiers(); + if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) { + connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName())); + } } + validConnectorPlugins = connectorPlugins; + return connectorPlugins; } - validConnectorPlugins = connectorPlugins; - return connectorPlugins; } // public for testing @@ -354,4 +367,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return null; } } + + private void traverseClassPath() { + classPathTraverser = new Thread(new Runnable() { + @Override + public void run() { + connectorPlugins(); + } + }, "CLASSPATH traversal thread."); + classPathTraverser.start(); + } }
