This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3af1396 KAFKA-6503: Parallelize plugin scanning 3af1396 is described below commit 3af13967db089b9a8320b539f5d5d218488ce467 Author: Robert Yokota <rayok...@gmail.com> AuthorDate: Wed Feb 14 16:24:05 2018 -0800 KAFKA-6503: Parallelize plugin scanning This is a small change to parallelize plugin scanning. This may help in some environments where otherwise plugin scanning is slow. Author: Robert Yokota <rayok...@gmail.com> Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Randall Hauch <rha...@gmail.com>, Ewen Cheslack-Postava <e...@confluent.io> Closes #4561 from rayokota/K6503-improve-plugin-scanning --- .../runtime/isolation/DelegatingClassLoader.java | 29 +++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index 345d7ef..b21cdcb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -20,7 +20,10 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; +import org.reflections.Configuration; import org.reflections.Reflections; +import org.reflections.ReflectionsException; +import org.reflections.scanners.SubTypesScanner; import org.reflections.util.ClasspathHelper; import org.reflections.util.ConfigurationBuilder; import org.slf4j.Logger; @@ -269,7 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader { ConfigurationBuilder builder = new ConfigurationBuilder(); builder.setClassLoaders(new ClassLoader[]{loader}); builder.addUrls(urls); - Reflections reflections = new Reflections(builder); + builder.setScanners(new SubTypesScanner()); + builder.setExpandSuperTypes(false); + builder.useParallelExecutor(); + Reflections reflections = new InternalReflections(builder); return new PluginScanResult( getPluginDesc(reflections, Connector.class, loader), @@ -353,4 +359,25 @@ public class DelegatingClassLoader extends URLClassLoader { } } } + + private static class InternalReflections extends Reflections { + + public InternalReflections(Configuration configuration) { + super(configuration); + } + + // When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException + // as RuntimeException. Override the scan behavior to emulate the singled-threaded logic. + @Override + protected void scan(URL url) { + try { + super.scan(url); + } catch (ReflectionsException e) { + Logger log = Reflections.log; + if (log != null && log.isWarnEnabled()) { + log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e); + } + } + } + } } -- To stop receiving notification emails like this one, please contact ewe...@apache.org.