Repository: kafka Updated Branches: refs/heads/trunk 5aaaba7ff -> 45f226176
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java new file mode 100644 index 0000000..c943863 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class PluginUtilsTest { + @Rule + public TemporaryFolder rootDir = new TemporaryFolder(); + + @Before + public void setUp() throws Exception { + } + + @Test + public void testJavaLibraryClasses() throws Exception { + assertFalse(PluginUtils.shouldLoadInIsolation("java.")); + assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.Object")); + assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.String")); + assertFalse(PluginUtils.shouldLoadInIsolation("java.util.HashMap$Entry")); + assertFalse(PluginUtils.shouldLoadInIsolation("java.io.Serializable")); + assertFalse(PluginUtils.shouldLoadInIsolation("javax.")); + assertFalse(PluginUtils.shouldLoadInIsolation( + "javax.management.loading.ClassLoaderRepository") + ); + assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.")); + assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA.Object")); + assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom.")); + assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom.traversal.TreeWalker")); + } + + @Test + public void testThirdPartyClasses() throws Exception { + assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j.")); + assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j.Level")); + } + + @Test + public void testConnectFrameworkClasses() throws Exception { + assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.common.")); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.common.config.AbstractConfig") + ); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.common.config.ConfigDef$Type") + ); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.common.serialization.Deserializer") + ); + assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.")); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.connector.Connector") + ); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.source.SourceConnector") + ); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.sink.SinkConnector") + ); + assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.connector.Task")); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.source.SourceTask") + ); + assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.sink.SinkTask")); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.transforms.Transformation") + ); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.storage.Converter") + ); + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.storage.OffsetBackingStore") + ); + } + + @Test + public void testAllowedConnectFrameworkClasses() throws Exception { + assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms.")); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.transforms.ExtractField") + ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.transforms.ExtractField$Key") + ); + assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json.")); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.json.JsonConverter") + ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.json.JsonConverter$21") + ); + assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file.")); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.file.FileStreamSourceTask") + ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.file.FileStreamSinkConnector") + ); + assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.converters.")); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.converters.ByteArrayConverter") + ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.storage.StringConverter") + ); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 966098c..2d0448e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.rest.resources; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -31,6 +30,9 @@ import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; @@ -57,7 +59,8 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.io.IOException; +import javax.ws.rs.BadRequestException; +import java.net.URL; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -66,8 +69,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; - -import javax.ws.rs.BadRequestException; +import java.util.TreeSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -88,12 +90,14 @@ public class ConnectorPluginsResourceTest { props = new HashMap<>(partialProps); props.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName()); + props.put("plugin.path", null); } private static final ConfigInfos CONFIG_INFOS; private static final ConfigInfos PARTIAL_CONFIG_INFOS; private static final int ERROR_COUNT = 0; private static final int PARTIAL_CONFIG_ERROR_COUNT = 1; + private static final Set<PluginDesc<Connector>> CONNECTOR_PLUGINS = new TreeSet<>(); static { List<ConfigInfo> configs = new LinkedList<>(); @@ -133,19 +137,58 @@ public class ConnectorPluginsResourceTest { CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, Collections.singletonList("Test"), configs); PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs); + + Class<?>[] abstractConnectorClasses = { + Connector.class, + SourceConnector.class, + SinkConnector.class + }; + + Class<?>[] connectorClasses = { + VerifiableSourceConnector.class, + VerifiableSinkConnector.class, + MockSourceConnector.class, + MockSinkConnector.class, + MockConnector.class, + SchemaSourceConnector.class, + ConnectorPluginsResourceTestConnector.class + }; + + try { + for (Class<?> klass : abstractConnectorClasses) { + CONNECTOR_PLUGINS.add( + new MockConnectorPluginDesc((Class<? extends Connector>) klass, "0.0.0")); + } + for (Class<?> klass : connectorClasses) { + CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc((Class<? extends Connector>) klass)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } } @Mock private Herder herder; + @Mock + private Plugins plugins; private ConnectorPluginsResource connectorPluginsResource; @Before - public void setUp() throws NoSuchMethodException { + public void setUp() throws Exception { PowerMock.mockStatic(RestServer.class, RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class)); + + plugins = PowerMock.createMock(Plugins.class); + herder = PowerMock.createMock(AbstractHerder.class); connectorPluginsResource = new ConnectorPluginsResource(herder); } + private void expectPlugins() { + EasyMock.expect(herder.plugins()).andReturn(plugins); + EasyMock.expect(plugins.connectors()).andReturn(CONNECTOR_PLUGINS); + PowerMock.replayAll(); + } + @Test public void testValidateConfigWithSingleErrorDueToMissingConnectorClassname() throws Throwable { herder.validateConnectorConfig(EasyMock.eq(partialProps)); @@ -359,27 +402,30 @@ public class ConnectorPluginsResourceTest { } @Test - public void testListConnectorPlugins() { + public void testListConnectorPlugins() throws Exception { + expectPlugins(); Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins()); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class))); - assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class))); + assertFalse(connectorPlugins.contains(newInfo(Connector.class, "0.0"))); + assertFalse(connectorPlugins.contains(newInfo(SourceConnector.class, "0.0"))); + assertFalse(connectorPlugins.contains(newInfo(SinkConnector.class, "0.0"))); + assertFalse(connectorPlugins.contains(newInfo(VerifiableSourceConnector.class))); + assertFalse(connectorPlugins.contains(newInfo(VerifiableSinkConnector.class))); + assertFalse(connectorPlugins.contains(newInfo(MockSourceConnector.class))); + assertFalse(connectorPlugins.contains(newInfo(MockSinkConnector.class))); + assertFalse(connectorPlugins.contains(newInfo(MockConnector.class))); + assertFalse(connectorPlugins.contains(newInfo(SchemaSourceConnector.class))); + assertTrue(connectorPlugins.contains(newInfo(ConnectorPluginsResourceTestConnector.class))); + PowerMock.verifyAll(); } @Test - public void testConnectorPluginsIncludesTypeAndVersionInformation() - throws IOException { - ConnectorPluginInfo sinkInfo = new ConnectorPluginInfo(TestSinkConnector.class); - ConnectorPluginInfo sourceInfo = new ConnectorPluginInfo(TestSourceConnector.class); + public void testConnectorPluginsIncludesTypeAndVersionInformation() throws Exception { + expectPlugins(); + ConnectorPluginInfo sinkInfo = newInfo(TestSinkConnector.class); + ConnectorPluginInfo sourceInfo = + newInfo(TestSourceConnector.class); ConnectorPluginInfo unkownInfo = - new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class); + newInfo(ConnectorPluginsResourceTestConnector.class); assertEquals(ConnectorType.SINK, sinkInfo.type()); assertEquals(ConnectorType.SOURCE, sourceInfo.type()); assertEquals(ConnectorType.UNKNOWN, unkownInfo.type()); @@ -407,6 +453,46 @@ public class ConnectorPluginsResourceTest { ); } + protected static ConnectorPluginInfo newInfo(Class<? extends Connector> klass, String version) + throws Exception { + return new ConnectorPluginInfo(new MockConnectorPluginDesc(klass, version)); + } + + protected static ConnectorPluginInfo newInfo(Class<? extends Connector> klass) + throws Exception { + return new ConnectorPluginInfo(new MockConnectorPluginDesc(klass)); + } + + public static class MockPluginClassLoader extends PluginClassLoader { + public MockPluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) { + super(pluginLocation, urls, parent); + } + + public MockPluginClassLoader(URL pluginLocation, URL[] urls) { + super(pluginLocation, urls); + } + + @Override + public String location() { + return "/tmp/mockpath"; + } + } + + public static class MockConnectorPluginDesc extends PluginDesc<Connector> { + public MockConnectorPluginDesc(Class<? extends Connector> klass, String version) + throws Exception { + super(klass, version, new MockPluginClassLoader(null, new URL[0])); + } + + public MockConnectorPluginDesc(Class<? extends Connector> klass) throws Exception { + super( + klass, + klass.newInstance().version(), + new MockPluginClassLoader(null, new URL[0]) + ); + } + } + public static class TestSinkConnector extends SinkConnector { static final String VERSION = "some great version"; http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index da1edbc..1c3dddb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.ConnectorConfig; -import org.apache.kafka.connect.runtime.ConnectorFactory; import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.HerderConnectorContext; @@ -35,6 +34,9 @@ import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; @@ -54,6 +56,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; @@ -75,6 +78,7 @@ import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @SuppressWarnings("unchecked") +@PrepareForTest({StandaloneHerder.class, Plugins.class}) public class StandaloneHerderTest { private static final String CONNECTOR_NAME = "test"; private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2"); @@ -90,12 +94,21 @@ public class StandaloneHerderTest { private Connector connector; @Mock protected Worker worker; + @Mock private Plugins plugins; + @Mock + private PluginClassLoader pluginLoader; + @Mock + private DelegatingClassLoader delegatingLoader; @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback; @Mock protected StatusBackingStore statusBackingStore; @Before public void setup() { herder = new StandaloneHerder(worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore()); + plugins = PowerMock.createMock(Plugins.class); + pluginLoader = PowerMock.createMock(PluginClassLoader.class); + delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); + PowerMock.mockStatic(Plugins.class); } @Test @@ -120,12 +133,12 @@ public class StandaloneHerderTest { Map<String, String> config = connectorConfig(SourceSink.SOURCE); config.remove(ConnectorConfig.NAME_CONFIG); - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef()); - EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList())); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull()); PowerMock.expectLastCall(); @@ -141,11 +154,10 @@ public class StandaloneHerderTest { public void testCreateConnectorFailedCustomValidation() throws Exception { connector = PowerMock.createMock(BogusSourceConnector.class); - Map<String, String> config = connectorConfig(SourceSink.SOURCE); - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); ConfigDef configDef = new ConfigDef(); configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc"); @@ -153,7 +165,9 @@ public class StandaloneHerderTest { ConfigValue validatedValue = new ConfigValue("foo.bar"); validatedValue.addErrorMessage("Failed foo.bar validation"); + Map<String, String> config = connectorConfig(SourceSink.SOURCE); EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue))); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull()); PowerMock.expectLastCall(); @@ -172,8 +186,13 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - expectConfigValidation(config, config); + Connector connectorMock = PowerMock.createMock(Connector.class); + expectConfigValidation(connectorMock, true, config, config); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + // No new connector is created + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); // Second should fail createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull()); PowerMock.expectLastCall(); @@ -435,7 +454,8 @@ public class StandaloneHerderTest { // Create connector = PowerMock.createMock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); - expectConfigValidation(connConfig, newConnConfig); + Connector connectorMock = PowerMock.createMock(Connector.class); + expectConfigValidation(connectorMock, true, connConfig); // Should get first config connectorConfigCb.onCompletion(null, connConfig); @@ -457,6 +477,7 @@ public class StandaloneHerderTest { putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo)); EasyMock.expectLastCall(); // Should get new config + expectConfigValidation(connectorMock, false, newConnConfig); connectorConfigCb.onCompletion(null, newConnConfig); EasyMock.expectLastCall(); @@ -501,11 +522,12 @@ public class StandaloneHerderTest { ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); EasyMock.expect(connectorMock.config()).andStubReturn(configDef); - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())) - .andReturn(connectorMock); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); Callback<Herder.Created<ConnectorInfo>> callback = PowerMock.createMock(Callback.class); Capture<BadRequestException> capture = Capture.newInstance(); callback.onCompletion( @@ -588,15 +610,27 @@ public class StandaloneHerderTest { private void expectConfigValidation(Map<String, String> ... configs) { - // config validation - ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); - EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock); + expectConfigValidation(connectorMock, true, configs); + } + + private void expectConfigValidation( + Connector connectorMock, + boolean shouldCreateConnector, + Map<String, String>... configs + ) { + // config validation + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); + if (shouldCreateConnector) { + EasyMock.expect(worker.getPlugins()).andReturn(plugins); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); + } EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef()); for (Map<String, String> config : configs) EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList())); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); } // We need to use a real class here due to some issue with mocking java.lang.Class http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/gradle/dependencies.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index cdd8c23..64ea09e 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -71,6 +71,7 @@ versions += [ zkclient: "0.10", zookeeper: "3.4.10", jfreechart: "1.0.0", + mavenArtifact: "3.5.0", ] libs += [ @@ -112,5 +113,6 @@ libs += [ snappy: "org.xerial.snappy:snappy-java:$versions.snappy", zkclient: "com.101tec:zkclient:$versions.zkclient", zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper", - jfreechart: "jfreechart:jfreechart:$versions.jfreechart" + jfreechart: "jfreechart:jfreechart:$versions.jfreechart", + mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact" ]