http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java new file mode 100644 index 0000000..b2be997 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.io.IOException; +import java.lang.reflect.Modifier; +import java.net.URL; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; + +public class PluginUtils { + private static final String BLACKLIST = "^(?:" + + "java" + + "|javax" + + "|org\\.omg" + + "|org\\.w3c\\.dom" + + "|org\\.apache\\.kafka\\.common" + + "|org\\.apache\\.kafka\\.connect" + + "|org\\.apache\\.log4j" + + ")\\..*$"; + + private static final String WHITELIST = "^org\\.apache\\.kafka\\.connect\\.(?:" + + "transforms\\.(?!Transformation$).*" + + "|json\\..*" + + "|file\\..*" + + "|converters\\..*" + + "|storage\\.StringConverter" + + ")$"; + + private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream + .Filter<Path>() { + @Override + public boolean accept(Path path) throws IOException { + return Files.isDirectory(path) || PluginUtils.isJar(path); + } + }; + + public static boolean shouldLoadInIsolation(String name) { + return !(name.matches(BLACKLIST) && !name.matches(WHITELIST)); + } + + public static boolean isConcrete(Class<?> klass) { + int mod = klass.getModifiers(); + return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod); + } + + public static boolean isJar(Path path) { + return path.toString().toLowerCase(Locale.ROOT).endsWith(".jar"); + } + + public static List<URL> pluginUrls(Path pluginPath) throws IOException { + List<URL> urls = new ArrayList<>(); + if (PluginUtils.isJar(pluginPath)) { + urls.add(pluginPath.toUri().toURL()); + } else if (Files.isDirectory(pluginPath)) { + try ( + DirectoryStream<Path> listing = Files.newDirectoryStream( + pluginPath, + PLUGIN_PATH_FILTER + ) + ) { + for (Path jar : listing) { + urls.add(jar.toUri().toURL()); + } + } + } + return urls; + } + + public static List<Path> pluginLocations(Path topPath) throws IOException { + List<Path> locations = new ArrayList<>(); + // Non-recursive for now. Plugin directories or jars need to be exactly under the topPath. + try ( + DirectoryStream<Path> listing = Files.newDirectoryStream( + topPath, + PLUGIN_PATH_FILTER + ) + ) { + for (Path dir : listing) { + locations.add(dir); + } + } + return locations; + } + + public static String simpleName(PluginDesc<?> plugin) { + return plugin.pluginClass().getSimpleName(); + } + + public static String prunedName(PluginDesc<?> plugin) { + // It's currently simpler to switch on type than do pattern matching. + switch (plugin.type()) { + case SOURCE: + case SINK: + case CONNECTOR: + return prunePluginName(plugin, "Connector"); + default: + return prunePluginName(plugin, plugin.type().simpleName()); + } + } + + public static <U> boolean isAliasUnique( + PluginDesc<U> alias, + Collection<PluginDesc<U>> plugins + ) { + boolean matched = false; + for (PluginDesc<U> plugin : plugins) { + if (simpleName(alias).equals(simpleName(plugin)) + || prunedName(alias).equals(prunedName(plugin))) { + if (matched) { + return false; + } + matched = true; + } + } + return true; + } + + private static String prunePluginName(PluginDesc<?> plugin, String suffix) { + String simple = plugin.pluginClass().getSimpleName(); + int pos = simple.lastIndexOf(suffix); + if (pos > 0) { + return simple.substring(0, pos); + } + return simple; + } + +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java new file mode 100644 index 0000000..654f485 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class Plugins { + private static final Logger log = LoggerFactory.getLogger(Plugins.class); + private final DelegatingClassLoader delegatingLoader; + + public Plugins(Map<String, String> props) { + List<String> pluginLocations = WorkerConfig.pluginLocations(props); + delegatingLoader = newDelegatingClassLoader(pluginLocations); + delegatingLoader.initLoaders(); + } + + private static DelegatingClassLoader newDelegatingClassLoader(final List<String> paths) { + return (DelegatingClassLoader) AccessController.doPrivileged( + new PrivilegedAction() { + @Override + public Object run() { + return new DelegatingClassLoader(paths); + } + } + ); + } + + private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) { + return Utils.join(plugins, ", "); + } + + protected static <T> T newPlugin(Class<T> klass) { + try { + return Utils.newInstance(klass); + } catch (Throwable t) { + throw new ConnectException("Instantiation error", t); + } + } + + protected static <T> T newConfiguredPlugin(AbstractConfig config, Class<T> klass) { + T plugin = Utils.newInstance(klass); + if (plugin instanceof Configurable) { + ((Configurable) plugin).configure(config.originals()); + } + return plugin; + } + + @SuppressWarnings("unchecked") + protected static <U> Class<? extends U> pluginClass( + DelegatingClassLoader loader, + String classOrAlias, + Class<U> pluginClass + ) throws ClassNotFoundException { + Class<?> klass = loader.loadClass(classOrAlias, false); + if (pluginClass.isAssignableFrom(klass)) { + return (Class<? extends U>) klass; + } + + throw new ClassNotFoundException( + "Requested class: " + + classOrAlias + + " does not extend " + pluginClass.getSimpleName() + ); + } + + public static ClassLoader compareAndSwapLoaders(ClassLoader loader) { + ClassLoader current = Thread.currentThread().getContextClassLoader(); + if (!current.equals(loader)) { + Thread.currentThread().setContextClassLoader(loader); + } + return current; + } + + public ClassLoader currentThreadLoader() { + return Thread.currentThread().getContextClassLoader(); + } + + public ClassLoader compareAndSwapWithDelegatingLoader() { + ClassLoader current = Thread.currentThread().getContextClassLoader(); + if (!current.equals(delegatingLoader)) { + Thread.currentThread().setContextClassLoader(delegatingLoader); + } + return current; + } + + public ClassLoader compareAndSwapLoaders(Connector connector) { + ClassLoader connectorLoader = delegatingLoader.connectorLoader(connector); + return compareAndSwapLoaders(connectorLoader); + } + + public DelegatingClassLoader delegatingLoader() { + return delegatingLoader; + } + + public Set<PluginDesc<Connector>> connectors() { + return delegatingLoader.connectors(); + } + + public Set<PluginDesc<Converter>> converters() { + return delegatingLoader.converters(); + } + + public Set<PluginDesc<Transformation>> transformations() { + return delegatingLoader.transformations(); + } + + @SuppressWarnings("unchecked") + public Connector newConnector(String connectorClassOrAlias) { + Class<? extends Connector> klass; + try { + klass = pluginClass( + delegatingLoader, + connectorClassOrAlias, + Connector.class + ); + } catch (ClassNotFoundException e) { + List<PluginDesc<Connector>> matches = new ArrayList<>(); + for (PluginDesc<Connector> plugin : delegatingLoader.connectors()) { + Class<?> pluginClass = plugin.pluginClass(); + String simpleName = pluginClass.getSimpleName(); + if (simpleName.equals(connectorClassOrAlias) + || simpleName.equals(connectorClassOrAlias + "Connector")) { + matches.add(plugin); + } + } + + if (matches.isEmpty()) { + throw new ConnectException( + "Failed to find any class that implements Connector and which name matches " + + connectorClassOrAlias + + ", available connectors are: " + + pluginNames(delegatingLoader.connectors()) + ); + } + if (matches.size() > 1) { + throw new ConnectException( + "More than one connector matches alias " + + connectorClassOrAlias + + + ". Please use full package and class name instead. Classes found: " + + pluginNames(matches) + ); + } + + PluginDesc<Connector> entry = matches.get(0); + klass = entry.pluginClass(); + } + return newPlugin(klass); + } + + public Task newTask(Class<? extends Task> taskClass) { + return newPlugin(taskClass); + } + + public Converter newConverter(String converterClassOrAlias) { + return newConverter(converterClassOrAlias, null); + } + + public Converter newConverter(String converterClassOrAlias, AbstractConfig config) { + Class<? extends Converter> klass; + try { + klass = pluginClass( + delegatingLoader, + converterClassOrAlias, + Converter.class + ); + } catch (ClassNotFoundException e) { + throw new ConnectException( + "Failed to find any class that implements Converter and which name matches " + + converterClassOrAlias + + ", available connectors are: " + + pluginNames(delegatingLoader.converters()) + ); + } + return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass); + } + + public <R extends ConnectRecord<R>> Transformation<R> newTranformations( + String transformationClassOrAlias + ) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java index ff3c30d..36b896f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java @@ -18,22 +18,12 @@ package org.apache.kafka.connect.runtime.rest.entities; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.apache.kafka.connect.connector.Connector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; public class ConnectorPluginInfo { - - private static final Logger log = LoggerFactory.getLogger(ConnectorPluginInfo.class); - - private static final Map<Class<? extends Connector>, String> - VERSIONS = new ConcurrentHashMap<>(); - private String className; private ConnectorType type; private String version; @@ -49,29 +39,8 @@ public class ConnectorPluginInfo { this.version = version; } - public ConnectorPluginInfo(Class<? extends Connector> klass) { - this(klass.getCanonicalName(), ConnectorType.from(klass), getVersion(klass)); - } - - private static String getVersion(Class<? extends Connector> klass) { - if (!VERSIONS.containsKey(klass)) { - synchronized (VERSIONS) { - if (!VERSIONS.containsKey(klass)) { - try { - VERSIONS.put(klass, klass.newInstance().version()); - } catch ( - ExceptionInInitializerError - | InstantiationException - | IllegalAccessException - | SecurityException e - ) { - log.warn("Unable to instantiate connector", e); - VERSIONS.put(klass, "unknown"); - } - } - } - } - return VERSIONS.get(klass); + public ConnectorPluginInfo(PluginDesc<Connector> plugin) { + this(plugin.className(), ConnectorType.from(plugin.pluginClass()), plugin.version()); } @JsonProperty("class") http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 37e0f01..24eb93b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -16,14 +16,18 @@ */ package org.apache.kafka.connect.runtime.rest.resources; +import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.PluginDiscovery; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; - -import java.util.List; -import java.util.Map; +import org.apache.kafka.connect.tools.MockConnector; +import org.apache.kafka.connect.tools.MockSinkConnector; +import org.apache.kafka.connect.tools.MockSourceConnector; +import org.apache.kafka.connect.tools.SchemaSourceConnector; +import org.apache.kafka.connect.tools.VerifiableSinkConnector; +import org.apache.kafka.connect.tools.VerifiableSourceConnector; import javax.ws.rs.BadRequestException; import javax.ws.rs.Consumes; @@ -33,6 +37,11 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; @Path("/connector-plugins") @Produces(MediaType.APPLICATION_JSON) @@ -41,9 +50,17 @@ public class ConnectorPluginsResource { private static final String ALIAS_SUFFIX = "Connector"; private final Herder herder; + private final List<ConnectorPluginInfo> connectorPlugins; + + private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList( + VerifiableSourceConnector.class, VerifiableSinkConnector.class, + MockConnector.class, MockSourceConnector.class, MockSinkConnector.class, + SchemaSourceConnector.class + ); public ConnectorPluginsResource(Herder herder) { this.herder = herder; + this.connectorPlugins = new ArrayList<>(); } @PUT @@ -67,7 +84,20 @@ public class ConnectorPluginsResource { @GET @Path("/") public List<ConnectorPluginInfo> listConnectorPlugins() { - return PluginDiscovery.connectorPlugins(); + return getConnectorPlugins(); + } + + // TODO: improve once plugins are allowed to be added/removed during runtime. + private synchronized List<ConnectorPluginInfo> getConnectorPlugins() { + if (connectorPlugins.isEmpty()) { + for (PluginDesc<Connector> plugin : herder.plugins().connectors()) { + if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) { + connectorPlugins.add(new ConnectorPluginInfo(plugin)); + } + } + } + + return Collections.unmodifiableList(connectorPlugins); } private String normalizedPluginName(String pluginName) { http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 9c8c7ae..d57e75f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -251,12 +251,12 @@ public class StandaloneHerder extends AbstractHerder { ConnectorConfig connConfig; if (worker.isSinkConnector(connName)) { - connConfig = new SinkConnectorConfig(config); + connConfig = new SinkConnectorConfig(plugins(), config); return worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG)); } else { - connConfig = new SourceConnectorConfig(config); + connConfig = new SourceConnectorConfig(plugins(), config); return worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), null); http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java index 375b9c0..f8c4fd6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java @@ -20,12 +20,16 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.transforms.Transformation; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -33,6 +37,13 @@ import static org.junit.Assert.fail; public class ConnectorConfigTest<R extends ConnectRecord<R>> { + private static final Plugins MOCK_PLUGINS = new Plugins(new HashMap<String, String>()) { + @Override + public Set<PluginDesc<Transformation>> transformations() { + return Collections.emptySet(); + } + }; + public static abstract class TestConnector extends Connector { } @@ -67,7 +78,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { Map<String, String> props = new HashMap<>(); props.put("name", "test"); props.put("connector.class", TestConnector.class.getName()); - new ConnectorConfig(props); + new ConnectorConfig(MOCK_PLUGINS, props); } @Test(expected = ConfigException.class) @@ -76,7 +87,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("name", "test"); props.put("connector.class", TestConnector.class.getName()); props.put("transforms", "dangler"); - new ConnectorConfig(props); + new ConnectorConfig(MOCK_PLUGINS, props); } @Test(expected = ConfigException.class) @@ -86,7 +97,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("connector.class", TestConnector.class.getName()); props.put("transforms", "a"); props.put("transforms.a.type", "uninstantiable"); - new ConnectorConfig(props); + new ConnectorConfig(MOCK_PLUGINS, props); } @Test(expected = ConfigException.class) @@ -96,7 +107,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("connector.class", TestConnector.class.getName()); props.put("transforms", "a"); props.put("transforms.a.type", SimpleTransformation.class.getName()); - new ConnectorConfig(props); + new ConnectorConfig(MOCK_PLUGINS, props); } @Test @@ -108,7 +119,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("transforms.a.type", SimpleTransformation.class.getName()); props.put("transforms.a.magic.number", "40"); try { - new ConnectorConfig(props); + new ConnectorConfig(MOCK_PLUGINS, props); fail(); } catch (ConfigException e) { assertTrue(e.getMessage().contains("Value must be at least 42")); @@ -123,7 +134,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("transforms", "a"); props.put("transforms.a.type", SimpleTransformation.class.getName()); props.put("transforms.a.magic.number", "42"); - final ConnectorConfig config = new ConnectorConfig(props); + final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); final List<Transformation<R>> transformations = config.transformations(); assertEquals(1, transformations.size()); final SimpleTransformation xform = (SimpleTransformation) transformations.get(0); @@ -138,7 +149,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("transforms", "a, b"); props.put("transforms.a.type", SimpleTransformation.class.getName()); props.put("transforms.a.magic.number", "42"); - new ConnectorConfig(props); + new ConnectorConfig(MOCK_PLUGINS, props); } @Test @@ -151,7 +162,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("transforms.a.magic.number", "42"); props.put("transforms.b.type", SimpleTransformation.class.getName()); props.put("transforms.b.magic.number", "84"); - final ConnectorConfig config = new ConnectorConfig(props); + final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); final List<Transformation<R>> transformations = config.transformations(); assertEquals(2, transformations.size()); assertEquals(42, ((SimpleTransformation) transformations.get(0)).magicNumber); http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index 9e77198..11b05ee 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -18,10 +18,12 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; import org.easymock.Mock; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,12 +41,18 @@ public class WorkerConnectorTest extends EasyMockSupport { CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR); } - public static final ConnectorConfig CONNECTOR_CONFIG = new ConnectorConfig(CONFIG); + public ConnectorConfig connectorConfig; + @Mock Plugins plugins; @Mock Connector connector; @Mock ConnectorContext ctx; @Mock ConnectorStatus.Listener listener; + @Before + public void setup() { + connectorConfig = new ConnectorConfig(plugins, CONFIG); + } + @Test public void testInitializeFailure() { RuntimeException exception = new RuntimeException(); @@ -62,7 +70,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.shutdown(); verifyAll(); @@ -87,7 +95,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.STARTED); workerConnector.shutdown(); @@ -115,7 +123,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.STARTED); workerConnector.shutdown(); @@ -146,7 +154,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.STARTED); workerConnector.transitionTo(TargetState.PAUSED); workerConnector.shutdown(); @@ -178,7 +186,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.PAUSED); workerConnector.transitionTo(TargetState.STARTED); workerConnector.shutdown(); @@ -203,7 +211,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.PAUSED); workerConnector.shutdown(); @@ -230,7 +238,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.STARTED); workerConnector.shutdown(); @@ -260,7 +268,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.STARTED); workerConnector.shutdown(); @@ -289,7 +297,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.STARTED); workerConnector.transitionTo(TargetState.STARTED); workerConnector.shutdown(); @@ -321,7 +329,7 @@ public class WorkerConnectorTest extends EasyMockSupport { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener); - workerConnector.initialize(CONNECTOR_CONFIG); + workerConnector.initialize(connectorConfig); workerConnector.transitionTo(TargetState.STARTED); workerConnector.transitionTo(TargetState.PAUSED); workerConnector.transitionTo(TargetState.PAUSED); http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 26ac486..eb5f25c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; @@ -104,6 +105,8 @@ public class WorkerSinkTaskTest { private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture(); private WorkerConfig workerConfig; @Mock + private PluginClassLoader pluginLoader; + @Mock private Converter keyConverter; @Mock private Converter valueConverter; @@ -129,9 +132,10 @@ public class WorkerSinkTaskTest { workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); workerConfig = new StandaloneConfig(workerProps); + pluginLoader = PowerMock.createMock(PluginClassLoader.class); workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, time); + taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, pluginLoader, time); recordsReturned = 0; } @@ -140,7 +144,7 @@ public class WorkerSinkTaskTest { public void testStartPaused() throws Exception { workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, transformationChain, time); + taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, transformationChain, pluginLoader, time); expectInitializeTask(); expectPollInitialAssignment(); http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index fb7cf4f..29a6b52 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; @@ -101,6 +102,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Mock private SinkTask sinkTask; private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture(); private WorkerConfig workerConfig; + @Mock + private PluginClassLoader pluginLoader; @Mock private Converter keyConverter; @Mock private Converter valueConverter; @Mock private TransformationChain transformationChain; @@ -125,10 +128,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { workerProps.put("internal.key.converter.schemas.enable", "false"); workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); + pluginLoader = PowerMock.createMock(PluginClassLoader.class); workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, TransformationChain.<SinkRecord>noOp(), time); + taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, + valueConverter, TransformationChain.<SinkRecord>noOp(), pluginLoader, time); recordsReturned = 0; } http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 31204f0..a3ddb3e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -82,6 +83,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { private ExecutorService executor = Executors.newSingleThreadExecutor(); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private WorkerConfig config; + private Plugins plugins; @Mock private SourceTask sourceTask; @Mock private Converter keyConverter; @Mock private Converter valueConverter; @@ -116,6 +118,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { workerProps.put("internal.key.converter.schemas.enable", "false"); workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); + plugins = new Plugins(workerProps); config = new StandaloneConfig(workerProps); producerCallbacks = EasyMock.newCapture(); } @@ -126,7 +129,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { private void createWorkerTask(TargetState initialState) { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, transformationChain, - producer, offsetReader, offsetWriter, config, Time.SYSTEM); + producer, offsetReader, offsetWriter, config, plugins.delegatingLoader(), Time.SYSTEM); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index 6c2fc4d..871c887 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -44,10 +44,16 @@ public class WorkerTaskTest { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class); + ClassLoader loader = EasyMock.createMock(ClassLoader.class); WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class) - .withArgs(taskId, statusListener, TargetState.STARTED) + .withConstructor( + ConnectorTaskId.class, + TaskStatus.Listener.class, + TargetState.class, + ClassLoader.class + ) + .withArgs(taskId, statusListener, TargetState.STARTED, loader) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") @@ -83,10 +89,16 @@ public class WorkerTaskTest { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class); + ClassLoader loader = EasyMock.createMock(ClassLoader.class); WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class) - .withArgs(taskId, statusListener, TargetState.STARTED) + .withConstructor( + ConnectorTaskId.class, + TaskStatus.Listener.class, + TargetState.class, + ClassLoader.class + ) + .withArgs(taskId, statusListener, TargetState.STARTED, loader) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") @@ -115,10 +127,16 @@ public class WorkerTaskTest { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class); + ClassLoader loader = EasyMock.createMock(ClassLoader.class); WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class) - .withArgs(taskId, statusListener, TargetState.STARTED) + .withConstructor( + ConnectorTaskId.class, + TaskStatus.Listener.class, + TargetState.class, + ClassLoader.class + ) + .withArgs(taskId, statusListener, TargetState.STARTED, loader) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 47dfcef..ccc7e15 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; @@ -26,6 +27,9 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.source.SourceRecord; @@ -39,6 +43,7 @@ import org.apache.kafka.connect.util.MockTime; import org.apache.kafka.connect.util.ThreadedTest; import org.easymock.Capture; import org.easymock.EasyMock; +import org.easymock.Mock; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -59,7 +64,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) -@PrepareForTest({Worker.class}) +@PrepareForTest({Worker.class, Plugins.class}) @PowerMockIgnore("javax.management.*") public class WorkerTest extends ThreadedTest { @@ -69,7 +74,14 @@ public class WorkerTest extends ThreadedTest { private WorkerConfig config; private Worker worker; - private ConnectorFactory connectorFactory = PowerMock.createMock(ConnectorFactory.class); + + @Mock + private Plugins plugins = PowerMock.createMock(Plugins.class); + @Mock + private PluginClassLoader pluginLoader = PowerMock.createMock(PluginClassLoader.class); + @Mock + private DelegatingClassLoader delegatingLoader = + PowerMock.createMock(DelegatingClassLoader.class); private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class); private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class); private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class); @@ -87,17 +99,22 @@ public class WorkerTest extends ThreadedTest { workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); config = new StandaloneConfig(workerProps); + + PowerMock.mockStatic(Plugins.class); } @Test public void testStartAndStopConnector() throws Exception { + expectConverters(); expectStartStorage(); // Create Connector connector = PowerMock.createMock(Connector.class); ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); - EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector); + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); + EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName())) + .andReturn(connector); EasyMock.expect(connector.version()).andReturn("1.0"); Map<String, String> props = new HashMap<>(); @@ -106,11 +123,17 @@ public class WorkerTest extends ThreadedTest { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); + EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + .andReturn(delegatingLoader) + .times(2); connector.initialize(EasyMock.anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); connector.start(props); EasyMock.expectLastCall(); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) + .andReturn(pluginLoader).times(2); + connectorStatusListener.onStartup(CONNECTOR_ID); EasyMock.expectLastCall(); @@ -125,7 +148,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); assertEquals(Collections.emptySet(), worker.connectorNames()); @@ -147,20 +170,33 @@ public class WorkerTest extends ThreadedTest { @Test public void testStartConnectorFailure() throws Exception { + expectConverters(); expectStartStorage(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); - worker.start(); - Map<String, String> props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "java.util.HashMap"); // Bad connector class name - connectorStatusListener.onFailure(EasyMock.eq(CONNECTOR_ID), EasyMock.<Throwable>anyObject()); + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())) + .andThrow(new ConnectException("Failed to find Connector")); + + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) + .andReturn(pluginLoader); + + connectorStatusListener.onFailure( + EasyMock.eq(CONNECTOR_ID), + EasyMock.<ConnectException>anyObject() + ); EasyMock.expectLastCall(); + PowerMock.replayAll(); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); + worker.start(); + assertFalse(worker.startConnector(CONNECTOR_ID, props, PowerMock.createMock(ConnectorContext.class), connectorStatusListener, TargetState.STARTED)); assertEquals(Collections.emptySet(), worker.connectorNames()); @@ -170,13 +206,15 @@ public class WorkerTest extends ThreadedTest { @Test public void testAddConnectorByAlias() throws Exception { + expectConverters(); expectStartStorage(); // Create Connector connector = PowerMock.createMock(Connector.class); ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); - EasyMock.expect(connectorFactory.newConnector("WorkerTestConnector")).andReturn(connector); + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); + EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector); EasyMock.expect(connector.version()).andReturn("1.0"); Map<String, String> props = new HashMap<>(); @@ -185,11 +223,18 @@ public class WorkerTest extends ThreadedTest { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector"); + EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + .andReturn(delegatingLoader) + .times(2); connector.initialize(EasyMock.anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); connector.start(props); EasyMock.expectLastCall(); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) + .andReturn(pluginLoader) + .times(2); + connectorStatusListener.onStartup(CONNECTOR_ID); EasyMock.expectLastCall(); @@ -204,7 +249,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); assertEquals(Collections.emptySet(), worker.connectorNames()); @@ -221,13 +266,15 @@ public class WorkerTest extends ThreadedTest { @Test public void testAddConnectorByShortAlias() throws Exception { + expectConverters(); expectStartStorage(); // Create Connector connector = PowerMock.createMock(Connector.class); ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); - EasyMock.expect(connectorFactory.newConnector("WorkerTest")).andReturn(connector); + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); + EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector); EasyMock.expect(connector.version()).andReturn("1.0"); Map<String, String> props = new HashMap<>(); @@ -236,11 +283,18 @@ public class WorkerTest extends ThreadedTest { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest"); + EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + .andReturn(delegatingLoader) + .times(2); connector.initialize(EasyMock.anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); connector.start(props); EasyMock.expectLastCall(); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) + .andReturn(pluginLoader) + .times(2); + connectorStatusListener.onStartup(CONNECTOR_ID); EasyMock.expectLastCall(); @@ -255,7 +309,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); assertEquals(Collections.emptySet(), worker.connectorNames()); @@ -272,11 +326,12 @@ public class WorkerTest extends ThreadedTest { @Test public void testStopInvalidConnector() { + expectConverters(); expectStartStorage(); PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); worker.stopConnector(CONNECTOR_ID); @@ -284,13 +339,16 @@ public class WorkerTest extends ThreadedTest { @Test public void testReconfigureConnectorTasks() throws Exception { + expectConverters(); expectStartStorage(); // Create Connector connector = PowerMock.createMock(Connector.class); ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); - EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector); + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3); + EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName())) + .andReturn(connector); EasyMock.expect(connector.version()).andReturn("1.0"); Map<String, String> props = new HashMap<>(); @@ -299,11 +357,18 @@ public class WorkerTest extends ThreadedTest { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); + EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + .andReturn(delegatingLoader) + .times(3); connector.initialize(EasyMock.anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); connector.start(props); EasyMock.expectLastCall(); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) + .andReturn(pluginLoader) + .times(3); + connectorStatusListener.onStartup(CONNECTOR_ID); EasyMock.expectLastCall(); @@ -324,7 +389,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); assertEquals(Collections.emptySet(), worker.connectorNames()); @@ -355,16 +420,16 @@ public class WorkerTest extends ThreadedTest { @Test public void testAddRemoveTask() throws Exception { + expectConverters(); expectStartStorage(); // Create TestSourceTask task = PowerMock.createMock(TestSourceTask.class); WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); - - EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task); EasyMock.expect(task.version()).andReturn("1.0"); + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); PowerMock.expectNew( WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task), @@ -376,16 +441,30 @@ public class WorkerTest extends ThreadedTest { EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), - EasyMock.anyObject(WorkerConfig.class), + EasyMock.eq(config), + EasyMock.anyObject(ClassLoader.class), EasyMock.anyObject(Time.class)) .andReturn(workerTask); Map<String, String> origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + + EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task); workerTask.initialize(new TaskConfig(origProps)); EasyMock.expectLastCall(); workerTask.run(); EasyMock.expectLastCall(); + EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); + EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) + .andReturn(pluginLoader); + + EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader) + .times(2); + + EasyMock.expect(workerTask.loader()).andReturn(pluginLoader); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader) + .times(2); + // Remove workerTask.stop(); EasyMock.expectLastCall(); @@ -396,7 +475,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); assertEquals(Collections.emptySet(), worker.taskIds()); worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); @@ -411,24 +490,39 @@ public class WorkerTest extends ThreadedTest { @Test public void testStartTaskFailure() throws Exception { + expectConverters(); expectStartStorage(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); - worker.start(); - Map<String, String> origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath"); - assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED)); + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader); + EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); + EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) + .andReturn(pluginLoader); + + EasyMock.expect(pluginLoader.loadClass(origProps.get(TaskConfig.TASK_CLASS_CONFIG))) + .andThrow(new ClassNotFoundException()); + + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) + .andReturn(pluginLoader); - taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<Throwable>anyObject()); + taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject()); EasyMock.expectLastCall(); + PowerMock.replayAll(); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); + worker.start(); + + assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED)); + assertEquals(Collections.emptySet(), worker.taskIds()); } @Test public void testCleanupTasksOnStop() throws Exception { + expectConverters(); expectStartStorage(); // Create @@ -436,9 +530,10 @@ public class WorkerTest extends ThreadedTest { WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); - EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task); + EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task); EasyMock.expect(task.version()).andReturn("1.0"); - + + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); PowerMock.expectNew( WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task), @@ -451,6 +546,7 @@ public class WorkerTest extends ThreadedTest { EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.anyObject(WorkerConfig.class), + EasyMock.eq(pluginLoader), EasyMock.anyObject(Time.class)) .andReturn(workerTask); Map<String, String> origProps = new HashMap<>(); @@ -460,6 +556,17 @@ public class WorkerTest extends ThreadedTest { workerTask.run(); EasyMock.expectLastCall(); + EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); + EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) + .andReturn(pluginLoader); + + EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader) + .times(2); + + EasyMock.expect(workerTask.loader()).andReturn(pluginLoader); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader) + .times(2); + // Remove on Worker.stop() workerTask.stop(); EasyMock.expectLastCall(); @@ -472,7 +579,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); worker.stop(); @@ -482,18 +589,20 @@ public class WorkerTest extends ThreadedTest { @Test public void testConverterOverrides() throws Exception { + expectConverters(); expectStartStorage(); TestSourceTask task = PowerMock.createMock(TestSourceTask.class); WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); - EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task); + EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task); EasyMock.expect(task.version()).andReturn("1.0"); Capture<TestConverter> keyConverter = EasyMock.newCapture(); Capture<TestConverter> valueConverter = EasyMock.newCapture(); + EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); PowerMock.expectNew( WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task), @@ -506,15 +615,27 @@ public class WorkerTest extends ThreadedTest { EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.anyObject(WorkerConfig.class), + EasyMock.eq(pluginLoader), EasyMock.anyObject(Time.class)) .andReturn(workerTask); Map<String, String> origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + workerTask.initialize(new TaskConfig(origProps)); EasyMock.expectLastCall(); workerTask.run(); EasyMock.expectLastCall(); + EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); + EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) + .andReturn(pluginLoader); + + EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader) + .times(2); + + EasyMock.expect(workerTask.loader()).andReturn(pluginLoader); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader) + .times(2); // Remove workerTask.stop(); EasyMock.expectLastCall(); @@ -525,7 +646,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); assertEquals(Collections.emptySet(), worker.taskIds()); Map<String, String> connProps = anyConnectorConfigMap(); @@ -559,6 +680,51 @@ public class WorkerTest extends ThreadedTest { EasyMock.expectLastCall(); } + private void expectConverters() { + expectConverters(JsonConverter.class); + } + + private void expectConverters(Class<? extends Converter> converterClass) { + // connector default + Converter keyConverter = PowerMock.createMock(converterClass); + Converter valueConverter = PowerMock.createMock(converterClass); + //internal + Converter internalKeyConverter = PowerMock.createMock(converterClass); + Converter internalValueConverter = PowerMock.createMock(converterClass); + + // Instantiate and configure default + EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) + .andReturn(keyConverter); + keyConverter.configure( + EasyMock.<Map<String, ?>>anyObject(), + EasyMock.anyBoolean() + ); + EasyMock.expectLastCall(); + EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) + .andReturn(valueConverter); + valueConverter.configure( + EasyMock.<Map<String, ?>>anyObject(), + EasyMock.anyBoolean() + ); + EasyMock.expectLastCall(); + + // Instantiate and configure internal + EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) + .andReturn(internalKeyConverter); + internalKeyConverter.configure( + EasyMock.<Map<String, ?>>anyObject(), + EasyMock.anyBoolean() + ); + EasyMock.expectLastCall(); + EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config)) + .andReturn(internalValueConverter); + internalValueConverter.configure( + EasyMock.<Map<String, ?>>anyObject(), + EasyMock.anyBoolean() + ); + EasyMock.expectLastCall(); + } + private Map<String, String> anyConnectorConfigMap() { Map<String, String> props = new HashMap<>(); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 70d0736..18d83c5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -26,13 +26,15 @@ import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.errors.AlreadyExistsException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; -import org.apache.kafka.connect.runtime.ConnectorFactory; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; @@ -73,7 +75,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) -@PrepareForTest(DistributedHerder.class) +@PrepareForTest({DistributedHerder.class, Plugins.class}) @PowerMockIgnore("javax.management.*") public class DistributedHerderTest { private static final Map<String, String> HERDER_CONFIG = new HashMap<>(); @@ -150,6 +152,12 @@ public class DistributedHerderTest { private DistributedHerder herder; @Mock private Worker worker; @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback; + @Mock + private Plugins plugins; + @Mock + private PluginClassLoader pluginLoader; + @Mock + private DelegatingClassLoader delegatingLoader; private ConfigBackingStore.UpdateListener configUpdateListener; private WorkerRebalanceListener rebalanceListener; @@ -165,7 +173,10 @@ public class DistributedHerderTest { configUpdateListener = herder.new ConfigUpdateListener(); rebalanceListener = herder.new RebalanceListener(); - + plugins = PowerMock.createMock(Plugins.class); + pluginLoader = PowerMock.createMock(PluginClassLoader.class); + delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); + PowerMock.mockStatic(Plugins.class); PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); } @@ -173,6 +184,7 @@ public class DistributedHerderTest { public void testJoinAssignment() throws Exception { // Join group and get assignment EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectPostRebalanceCatchup(SNAPSHOT); worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), @@ -198,6 +210,7 @@ public class DistributedHerderTest { public void testRebalance() throws Exception { // Join group and get assignment EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectPostRebalanceCatchup(SNAPSHOT); worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), @@ -211,6 +224,7 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList()); @@ -235,6 +249,7 @@ public class DistributedHerderTest { public void testRebalanceFailedConnector() throws Exception { // Join group and get assignment EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectPostRebalanceCatchup(SNAPSHOT); worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), @@ -304,12 +319,13 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); // config validation - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(Collections.<ConfigValue>emptyList())); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); // CONN2 is new, should succeed configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG); @@ -342,12 +358,11 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); // config validation - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock); - EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); - EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList())); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); // CONN2 creation should fail @@ -380,10 +395,10 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); // config validation - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); ConfigDef configDef = new ConfigDef(); configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc"); @@ -392,6 +407,7 @@ public class DistributedHerderTest { ConfigValue validatedValue = new ConfigValue("foo.bar"); validatedValue.addErrorMessage("Failed foo.bar validation"); EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(singletonList(validatedValue))); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); // CONN2 creation should fail @@ -427,12 +443,13 @@ public class DistributedHerderTest { config.put(ConnectorConfig.NAME_CONFIG, "test-group"); // config validation - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); Connector connectorMock = PowerMock.createMock(SinkConnector.class); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList())); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); // CONN2 creation should fail because the worker group id (connect-test-group) conflicts with // the consumer group id we would use for this sink @@ -483,6 +500,7 @@ public class DistributedHerderTest { public void testDestroyConnector() throws Exception { EasyMock.expect(member.memberId()).andStubReturn("leader"); // Start with one connector + EasyMock.expect(worker.getPlugins()).andReturn(plugins); expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); expectPostRebalanceCatchup(SNAPSHOT); worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), @@ -516,6 +534,7 @@ public class DistributedHerderTest { // get the initial assignment EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); expectRebalance(1, singletonList(CONN1), Collections.<ConnectorTaskId>emptyList()); expectPostRebalanceCatchup(SNAPSHOT); member.poll(EasyMock.anyInt()); @@ -535,6 +554,7 @@ public class DistributedHerderTest { worker.stopConnector(CONN1); PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); @@ -839,6 +859,7 @@ public class DistributedHerderTest { worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); @@ -867,6 +888,7 @@ public class DistributedHerderTest { worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); @@ -882,6 +904,7 @@ public class DistributedHerderTest { worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); @@ -909,6 +932,7 @@ public class DistributedHerderTest { worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); @@ -948,6 +972,7 @@ public class DistributedHerderTest { worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -1155,6 +1180,7 @@ public class DistributedHerderTest { worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); @@ -1236,12 +1262,13 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); // config validation - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(6); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList())); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED); PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
