This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2694d7aad98 KAFKA-19248: Multiversioning in Kafka Connect - Plugin
Loading Isolation Tests (#18325)
2694d7aad98 is described below
commit 2694d7aad987ca60aabbbc54f52e5503ad1bbe25
Author: snehashisp <[email protected]>
AuthorDate: Thu Jun 5 06:31:18 2025 +0530
KAFKA-19248: Multiversioning in Kafka Connect - Plugin Loading Isolation
Tests (#18325)
This adds tests for
[KIP-891](https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+Connector+plugins).
It primarily focuses on tests for the new additions in plugin loading
isolation. It has dependency on the actual KIP implementation PRs and
should be merged post https://github.com/apache/kafka/pull/17742
Reviewers: Greg Harris <[email protected]>
---
.../runtime/isolation/PluginsRecommenders.java | 8 +-
.../runtime/isolation/MultiVersionTest.java | 291 +++++++++++++++++++++
.../runtime/isolation/PluginRecommenderTest.java | 151 +++++++++++
.../connect/runtime/isolation/TestPlugins.java | 28 +-
.../runtime/isolation/VersionedPluginBuilder.java | 114 ++++++++
.../org.apache.kafka.connect.storage.Converter | 16 ++
.../test/plugins/VersionedConverter.java | 66 +++++
...rg.apache.kafka.connect.storage.HeaderConverter | 16 ++
.../test/plugins/VersionedHeaderConverter.java | 69 +++++
...e.kafka.connect.transforms.predicates.Predicate | 16 ++
.../test/plugins/VersionedPredicate.java | 63 +++++
.../org.apache.kafka.connect.sink.SinkConnector | 16 ++
.../test/plugins/VersionedSinkConnector.java | 96 +++++++
...org.apache.kafka.connect.source.SourceConnector | 16 ++
.../test/plugins/VersionedSourceConnector.java | 96 +++++++
....apache.kafka.connect.transforms.Transformation | 16 ++
.../test/plugins/VersionedTransformation.java | 63 +++++
17 files changed, 1131 insertions(+), 10 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
index a1ccf209e69..fa9efeed7d3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
@@ -77,12 +77,12 @@ public class PluginsRecommenders {
return headerConverterPluginVersionRecommender;
}
- public TransformationPluginRecommender
transformationPluginRecommender(String classOrAlias) {
- return new TransformationPluginRecommender(classOrAlias);
+ public TransformationPluginRecommender
transformationPluginRecommender(String classOrAliasConfig) {
+ return new TransformationPluginRecommender(classOrAliasConfig);
}
- public PredicatePluginRecommender predicatePluginRecommender(String
classOrAlias) {
- return new PredicatePluginRecommender(classOrAlias);
+ public PredicatePluginRecommender predicatePluginRecommender(String
classOrAliasConfig) {
+ return new PredicatePluginRecommender(classOrAliasConfig);
}
public class ConnectorPluginVersionRecommender implements
ConfigDef.Recommender {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
new file mode 100644
index 00000000000..b770de74b80
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+ private static Plugins setUpPlugins(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+ String pluginPath =
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+ Map<String, String> configs = new HashMap<>();
+ configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+ configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+ return new Plugins(configs);
+ }
+
+ private void assertPluginLoad(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+ throws InvalidVersionSpecificationException,
ClassNotFoundException {
+
+ Plugins plugins = setUpPlugins(artifacts, mode);
+
+ for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry :
artifacts.entrySet()) {
+ String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+ for (VersionedPluginBuilder.BuildInfo buildInfo :
entry.getValue()) {
+ ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(PluginClassLoader.class,
pluginLoader);
+ Assertions.assertTrue(((PluginClassLoader)
pluginLoader).location().contains(pluginLocation));
+ Object p = plugins.newPlugin(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(Versioned.class, p);
+ Assertions.assertEquals(buildInfo.version(), ((Versioned)
p).version());
+ }
+ }
+ }
+
+ private void assertCorrectLatestPluginVersion(
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts,
+ PluginDiscoveryMode mode,
+ String latestVersion
+ ) {
+ Plugins plugins = setUpPlugins(artifacts, mode);
+ List<String> classes = artifacts.values().stream()
+ .flatMap(List::stream)
+ .map(VersionedPluginBuilder.BuildInfo::plugin)
+ .map(VersionedPluginBuilder.VersionedTestPlugin::className)
+ .distinct()
+ .toList();
+ for (String className : classes) {
+ String version = plugins.latestVersion(className,
PluginType.values());
+ Assertions.assertEquals(latestVersion, version);
+ }
+ }
+
+ private static Map<Path, List<VersionedPluginBuilder.BuildInfo>>
buildIsolatedArtifacts(
+ String[] versions,
+ VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes
+ ) throws IOException {
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new
HashMap<>();
+ for (String v : versions) {
+ for (VersionedPluginBuilder.VersionedTestPlugin pluginType:
pluginTypes) {
+ VersionedPluginBuilder builder = new VersionedPluginBuilder();
+ builder.include(pluginType, v);
+ artifacts.put(builder.build(pluginType + "-" + v),
builder.buildInfos());
+ }
+ }
+ return artifacts;
+ }
+
+ public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION;
+ public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>>
DEFAULT_ISOLATED_ARTIFACTS;
+ public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>>
DEFAULT_COMBINED_ARTIFACT;
+ public static final Plugins MULTI_VERSION_PLUGINS;
+ public static final Map<VersionedPluginBuilder.VersionedTestPlugin,
String> DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+
+ static {
+
+ String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0",
"2.3.0", "4.3.0"};
+ try {
+ DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts(
+ defaultIsolatedArtifactsVersions,
VersionedPluginBuilder.VersionedTestPlugin.values()
+ );
+ DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0";
+ DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>();
+
+ VersionedPluginBuilder builder = new VersionedPluginBuilder();
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
k -> "0.0.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
k -> "0.1.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
k -> "0.2.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
k -> "0.3.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
k -> "0.4.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
k -> "0.5.0"));
+ DEFAULT_COMBINED_ARTIFACT =
Collections.singletonMap(builder.build("all_versioned_artifact"),
builder.buildInfos());
+
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new
HashMap<>();
+ artifacts.putAll(DEFAULT_COMBINED_ARTIFACT);
+ artifacts.putAll(DEFAULT_ISOLATED_ARTIFACTS);
+ MULTI_VERSION_PLUGINS = setUpPlugins(artifacts,
PluginDiscoveryMode.SERVICE_LOAD);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testVersionedPluginLoaded() throws
InvalidVersionSpecificationException, ClassNotFoundException {
+ assertPluginLoad(DEFAULT_COMBINED_ARTIFACT,
PluginDiscoveryMode.SERVICE_LOAD);
+ assertPluginLoad(DEFAULT_COMBINED_ARTIFACT,
PluginDiscoveryMode.ONLY_SCAN);
+ }
+
+ @Test
+ public void testMultipleIsolatedVersionedPluginLoading() throws
InvalidVersionSpecificationException, ClassNotFoundException {
+ assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.SERVICE_LOAD);
+ assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.ONLY_SCAN);
+ }
+
+ @Test
+ public void testLatestVersion() {
+ assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.SERVICE_LOAD, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION);
+ assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.ONLY_SCAN, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION);
+ }
+
+ @Test
+ public void testBundledPluginLoading() throws
InvalidVersionSpecificationException, ClassNotFoundException {
+
+ Plugins plugins = MULTI_VERSION_PLUGINS;
+ // get the connector loader of the combined artifact which includes
all plugin types
+ ClassLoader connectorLoader = plugins.pluginLoader(
+
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
+ PluginUtils.connectorVersionRequirement("0.1.0")
+ );
+ Assertions.assertInstanceOf(PluginClassLoader.class, connectorLoader);
+
+ List<VersionedPluginBuilder.VersionedTestPlugin> pluginTypes = List.of(
+ VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+ VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+ VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+ VersionedPluginBuilder.VersionedTestPlugin.PREDICATE
+ );
+ // should match the version used in setUp for creating the combined
artifact
+ List<String> versions =
pluginTypes.stream().map(DEFAULT_COMBINED_ARTIFACT_VERSIONS::get).toList();
+ for (int i = 0; i < 4; i++) {
+ String className = pluginTypes.get(i).className();
+ // when using the connector loader, the version and plugin
returned should be from the ones in the combined artifact
+ String version = plugins.pluginVersion(className, connectorLoader,
PluginType.values());
+ Assertions.assertEquals(versions.get(i), version);
+ Object p = plugins.newPlugin(className, null, connectorLoader);
+ Assertions.assertInstanceOf(Versioned.class, p);
+ Assertions.assertEquals(versions.get(i), ((Versioned)
p).version());
+
+ String latestVersion = plugins.latestVersion(className,
PluginType.values());
+ Assertions.assertEquals(DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION,
latestVersion);
+ }
+ }
+
+ @Test
+ public void testCorrectVersionRange() throws IOException,
InvalidVersionSpecificationException, ClassNotFoundException {
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts =
buildIsolatedArtifacts(
+ new String[]{"1.0.0", "1.1.0", "1.1.2", "2.0.0", "2.0.2", "3.0.0",
"4.0.0"},
+ VersionedPluginBuilder.VersionedTestPlugin.values()
+ );
+
+ Plugins plugins = setUpPlugins(artifacts,
PluginDiscoveryMode.SERVICE_LOAD);
+ Map<VersionRange, String> requiredVersions = new HashMap<>();
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("latest"),
"4.0.0");
+ requiredVersions.put(PluginUtils.connectorVersionRequirement(null),
"4.0.0");
+ requiredVersions.put(PluginUtils.connectorVersionRequirement("1.0.0"),
"1.0.0");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("[2.0.2]"),
"2.0.2");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("[1.1.0,3.0.1]"),
"3.0.0");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,2.0.0)"),
"1.1.2");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,1.0.0]"),
"1.0.0");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("[2.0.0,)"),
"4.0.0");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,2.0.0],[2.0.3,
2.0.4)"), "2.0.0");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("(2.0.0,3.0.0)"),
"2.0.2");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,1.1.0),[4.1.1,)"),
"1.0.0");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("[1.1.0,1.1.0]"),
"1.1.0");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,1.1.0),(2.0.0,
2.0.2]"), "2.0.2");
+
requiredVersions.put(PluginUtils.connectorVersionRequirement("[1.1.0,1.1.3)"),
"1.1.2");
+
+ for (Map.Entry<VersionRange, String> entry :
requiredVersions.entrySet()) {
+ for (VersionedPluginBuilder.VersionedTestPlugin pluginType:
VersionedPluginBuilder.VersionedTestPlugin.values()) {
+ Object p = plugins.newPlugin(pluginType.className(),
entry.getKey());
+ Assertions.assertInstanceOf(Versioned.class, p);
+ Assertions.assertEquals(entry.getValue(), ((Versioned)
p).version(),
+ String.format("Provided Version Range %s for class %s
should return plugin version %s instead of %s",
+ entry.getKey(), pluginType.className(),
entry.getValue(), ((Versioned) p).version()));
+ }
+ }
+ }
+
+ @Test
+ public void testInvalidVersionRange() throws IOException,
InvalidVersionSpecificationException {
+ String[] validVersions = new String[]{"1.0.0", "1.1.0", "1.1.2",
"2.0.0", "2.0.2", "3.0.0", "4.0.0"};
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts =
buildIsolatedArtifacts(
+ validVersions,
+ VersionedPluginBuilder.VersionedTestPlugin.values()
+ );
+
+ Plugins plugins = setUpPlugins(artifacts,
PluginDiscoveryMode.SERVICE_LOAD);
+ Set<VersionRange> invalidVersions = new HashSet<>();
+ invalidVersions.add(PluginUtils.connectorVersionRequirement("0.9.0"));
+
invalidVersions.add(PluginUtils.connectorVersionRequirement("[4.0.1,)"));
+
invalidVersions.add(PluginUtils.connectorVersionRequirement("(4.0.0,)"));
+
invalidVersions.add(PluginUtils.connectorVersionRequirement("[4.0.1]"));
+ invalidVersions.add(PluginUtils.connectorVersionRequirement("(2.0.0,
2.0.1)"));
+
invalidVersions.add(PluginUtils.connectorVersionRequirement("(,1.0.0)"));
+ invalidVersions.add(PluginUtils.connectorVersionRequirement("(1.1.0,
1.1.2)"));
+ invalidVersions.add(PluginUtils.connectorVersionRequirement("(1.1.0,
1.1.2),[1.1.3, 2.0.0)"));
+
+ for (VersionRange versionRange : invalidVersions) {
+ for (VersionedPluginBuilder.VersionedTestPlugin pluginType:
VersionedPluginBuilder.VersionedTestPlugin.values()) {
+ VersionedPluginLoadingException e =
Assertions.assertThrows(VersionedPluginLoadingException.class, () -> {
+ plugins.newPlugin(pluginType.className(), versionRange);
+ }, String.format("Provided Version Range %s for class %s
should throw VersionedPluginLoadingException", versionRange,
pluginType.className()));
+ Assertions.assertEquals(e.availableVersions(),
List.of(validVersions));
+ }
+ }
+ }
+
+ @Test
+ public void testVersionedConverter() {
+ Plugins plugins = setUpPlugins(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.SERVICE_LOAD);
+ Map<String, String> converterConfig = new HashMap<>();
+ converterConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+ converterConfig.put(WorkerConfig.KEY_CONVERTER_VERSION, "1.1.0");
+ converterConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+ converterConfig.put(WorkerConfig.VALUE_CONVERTER_VERSION, "2.3.0");
+ converterConfig.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
+ converterConfig.put(WorkerConfig.HEADER_CONVERTER_VERSION, "4.3.0");
+
+ AbstractConfig config;
+ try (LoaderSwap swap =
plugins.safeLoaderSwapper().apply(plugins.delegatingLoader())) {
+ config = new PluginsTest.TestableWorkerConfig(converterConfig);
+ }
+
+ Converter keyConverter = plugins.newConverter(config,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
+ Assertions.assertEquals(keyConverter.getClass().getName(),
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+ Assertions.assertInstanceOf(Versioned.class, keyConverter);
+ Assertions.assertEquals("1.1.0", ((Versioned) keyConverter).version());
+
+ Converter valueConverter = plugins.newConverter(config,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION);
+ Assertions.assertEquals(valueConverter.getClass().getName(),
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+ Assertions.assertInstanceOf(Versioned.class, valueConverter);
+ Assertions.assertEquals("2.3.0", ((Versioned)
valueConverter).version());
+
+ HeaderConverter headerConverter = plugins.newHeaderConverter(config,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
WorkerConfig.HEADER_CONVERTER_VERSION);
+ Assertions.assertEquals(headerConverter.getClass().getName(),
VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
+ Assertions.assertInstanceOf(Versioned.class, headerConverter);
+ Assertions.assertEquals("4.3.0", ((Versioned)
headerConverter).version());
+ }
+}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java
new file mode 100644
index 00000000000..35e6f43b3f6
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+import static
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_ISOLATED_ARTIFACTS;
+import static
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.MULTI_VERSION_PLUGINS;
+
+public class PluginRecommenderTest {
+
+ private Set<String> allVersionsOf(String classOrAlias) {
+ Set<String> versions = DEFAULT_ISOLATED_ARTIFACTS.values().stream()
+ .flatMap(List::stream)
+ .filter(b -> b.plugin().className().equals(classOrAlias))
+ .map(VersionedPluginBuilder.BuildInfo::version)
+ .collect(Collectors.toSet());
+
Arrays.stream(VersionedPluginBuilder.VersionedTestPlugin.values()).filter(p ->
p.className().equals(classOrAlias))
+ .forEach(r ->
versions.add(DEFAULT_COMBINED_ARTIFACT_VERSIONS.get(r)));
+ return versions;
+ }
+
+ @Test
+ public void testConnectorVersionRecommenders() {
+ PluginsRecommenders recommender = new
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+ for (String connectorClass : Arrays.asList(
+
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
+
VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR.className())
+ ) {
+ Set<String> versions =
recommender.connectorPluginVersionRecommender().validValues(
+ ConnectorConfig.CONNECTOR_CLASS_CONFIG,
Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass)
+ ).stream().map(Object::toString).collect(Collectors.toSet());
+ Set<String> allVersions = allVersionsOf(connectorClass);
+ Assertions.assertEquals(allVersions.size(), versions.size());
+ allVersions.forEach(v ->
Assertions.assertTrue(versions.contains(v), "Missing version " + v + " for
connector " + connectorClass));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testConverterVersionRecommenders() throws
ClassNotFoundException {
+ PluginsRecommenders recommender = new
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+ Map<String, Object> config = new HashMap<>();
+ Class converterClass =
MULTI_VERSION_PLUGINS.pluginClass(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+ config.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, converterClass);
+ config.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
converterClass);
+ Set<String> allVersions =
allVersionsOf(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+ for (ConfigDef.Recommender r :
Arrays.asList(recommender.keyConverterPluginVersionRecommender(),
recommender.valueConverterPluginVersionRecommender())) {
+ Set<String> versions = r.validValues(null,
config).stream().map(Object::toString).collect(Collectors.toSet());
+ Assertions.assertEquals(allVersions.size(), versions.size());
+ allVersions.forEach(v ->
Assertions.assertTrue(versions.contains(v), "Missing version " + v + " for
converter"));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testHeaderConverterVersionRecommenders() throws
ClassNotFoundException {
+ PluginsRecommenders recommender = new
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+ Map<String, Object> config = new HashMap<>();
+ Class headerConverterClass =
MULTI_VERSION_PLUGINS.pluginClass(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
+ config.put(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG,
headerConverterClass);
+ Set<String> versions =
recommender.headerConverterPluginVersionRecommender().validValues(null,
config).stream().map(Object::toString).collect(Collectors.toSet());
+ Set<String> allVersions =
allVersionsOf(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
+ Assertions.assertEquals(allVersions.size(), versions.size());
+ allVersions.forEach(v -> Assertions.assertTrue(versions.contains(v),
"Missing version " + v + " for header converter"));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testTransformationVersionRecommenders() throws
ClassNotFoundException {
+ PluginsRecommenders recommender = new
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+ Class transformationClass =
MULTI_VERSION_PLUGINS.pluginClass(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION.className());
+ Set<String> versions =
recommender.transformationPluginRecommender("transforms.t1.type")
+ .validValues("transforms.t1.type",
Collections.singletonMap("transforms.t1.type", transformationClass))
+ .stream().map(Object::toString).collect(Collectors.toSet());
+ Set<String> allVersions =
allVersionsOf(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION.className());
+ Assertions.assertEquals(allVersions.size(), versions.size());
+ allVersions.forEach(v -> Assertions.assertTrue(versions.contains(v),
"Missing version " + v + " for transformation"));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testPredicateVersionRecommenders() throws
ClassNotFoundException {
+ PluginsRecommenders recommender = new
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+ Class predicateClass =
MULTI_VERSION_PLUGINS.pluginClass(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE.className());
+ Set<String> versions =
recommender.predicatePluginRecommender("predicates.p1.type")
+ .validValues("predicates.p1.type",
Collections.singletonMap("predicates.p1.type", predicateClass))
+ .stream().map(Object::toString).collect(Collectors.toSet());
+ Set<String> allVersions =
allVersionsOf(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE.className());
+ Assertions.assertEquals(allVersions.size(), versions.size());
+ allVersions.forEach(v -> Assertions.assertTrue(versions.contains(v),
"Missing version " + v + " for predicate"));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testConverterPluginRecommender() {
+ PluginsRecommenders recommender = new
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+ Set<String> converters =
recommender.converterPluginRecommender().validValues(null, null)
+ .stream().map(c -> ((Class)
c).getName()).collect(Collectors.toSet());
+
Assertions.assertTrue(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className()));
+ // some sanity checks to ensure that other plugin types are not
included
+
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className()));
+
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR.className()));
+
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className()));
+
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION.className()));
+
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE.className()));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testHeaderConverterPluginRecommender() {
+ PluginsRecommenders recommender = new
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+ Set<String> headerConverters =
recommender.headerConverterPluginRecommender().validValues(null, null)
+ .stream().map(c -> ((Class)
c).getName()).collect(Collectors.toSet());
+
Assertions.assertTrue(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className()));
+ // some sanity checks to ensure that other plugin types are not
included
+
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className()));
+
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR.className()));
+
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className()));
+
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION.className()));
+
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE.className()));
+ }
+}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index d4da3987a64..14a90197a8d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -298,7 +298,7 @@ public class TestPlugins {
if (pluginJars.containsKey(testPackage)) {
log.debug("Skipping recompilation of {}",
testPackage.resourceDir());
}
- pluginJars.put(testPackage,
createPluginJar(testPackage.resourceDir(), testPackage.removeRuntimeClasses()));
+ pluginJars.put(testPackage,
createPluginJar(testPackage.resourceDir(), testPackage.removeRuntimeClasses(),
Collections.emptyMap()));
}
} catch (Throwable e) {
log.error("Could not set up plugin test jars", e);
@@ -385,10 +385,11 @@ public class TestPlugins {
.toArray(TestPlugin[]::new);
}
- private static Path createPluginJar(String resourceDir, Predicate<String>
removeRuntimeClasses) throws IOException {
+
+ static Path createPluginJar(String resourceDir, Predicate<String>
removeRuntimeClasses, Map<String, String> replacements) throws IOException {
Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir);
Path binDir = Files.createTempDirectory(resourceDir + ".bin.");
- compileJavaSources(inputDir, binDir);
+ compileJavaSources(inputDir, binDir, replacements);
Path jarFile = Files.createTempFile(resourceDir + ".", ".jar");
try (JarOutputStream jar = openJarFile(jarFile)) {
writeJar(jar, inputDir, removeRuntimeClasses);
@@ -448,7 +449,7 @@ public class TestPlugins {
* @param sourceDir Directory containing java source files
* @throws IOException if the files cannot be compiled
*/
- private static void compileJavaSources(Path sourceDir, Path binDir) throws
IOException {
+ private static void compileJavaSources(Path sourceDir, Path binDir,
Map<String, String> replacements) throws IOException {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
List<File> sourceFiles;
try (Stream<Path> stream = Files.walk(sourceDir)) {
@@ -456,13 +457,14 @@ public class TestPlugins {
.filter(Files::isRegularFile)
.map(Path::toFile)
.filter(file -> file.getName().endsWith(".java"))
+ .map(file -> replacements.isEmpty() ? file :
copyAndReplace(file, replacements))
.collect(Collectors.toList());
}
+
StringWriter writer = new StringWriter();
List<String> options = Arrays.asList(
"-d", binDir.toString() // Write class output to a different
directory.
);
-
try (StandardJavaFileManager fileManager =
compiler.getStandardFileManager(null, null, null)) {
boolean success = compiler.getTask(
writer,
@@ -478,6 +480,21 @@ public class TestPlugins {
}
}
+ private static File copyAndReplace(File source, Map<String, String>
replacements) throws RuntimeException {
+ try {
+ String content = Files.readString(source.toPath());
+ for (Map.Entry<String, String> entry : replacements.entrySet()) {
+ content = content.replace(entry.getKey(), entry.getValue());
+ }
+ File tmpFile = new File(System.getProperty("java.io.tmpdir") +
File.separator + source.getName());
+ Files.writeString(tmpFile.toPath(), content);
+ tmpFile.deleteOnExit();
+ return tmpFile;
+ } catch (IOException e) {
+ throw new RuntimeException("Could not copy and replace file: " +
source, e);
+ }
+ }
+
private static void writeJar(JarOutputStream jar, Path inputDir,
Predicate<String> removeRuntimeClasses) throws IOException {
List<Path> paths;
try (Stream<Path> stream = Files.walk(inputDir)) {
@@ -503,5 +520,4 @@ public class TestPlugins {
}
}
}
-
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java
new file mode 100644
index 00000000000..d2f349c2011
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VersionedPluginBuilder {
+
+ private static final String VERSION_PLACEHOLDER =
"PLACEHOLDER_FOR_VERSION";
+
+ public enum VersionedTestPlugin {
+
+ SINK_CONNECTOR("versioned-sink-connector",
"test.plugins.VersionedSinkConnector"),
+ SOURCE_CONNECTOR("versioned-source-connector",
"test.plugins.VersionedSourceConnector"),
+ CONVERTER("versioned-converter", "test.plugins.VersionedConverter"),
+ HEADER_CONVERTER("versioned-header-converter",
"test.plugins.VersionedHeaderConverter"),
+ TRANSFORMATION("versioned-transformation",
"test.plugins.VersionedTransformation"),
+ PREDICATE("versioned-predicate", "test.plugins.VersionedPredicate");
+
+ private final String resourceDir;
+ private final String className;
+
+ VersionedTestPlugin(String resourceDir, String className) {
+ this.resourceDir = resourceDir;
+ this.className = className;
+ }
+
+ public String resourceDir() {
+ return resourceDir;
+ }
+
+ public String className() {
+ return className;
+ }
+ }
+
+ public static class BuildInfo {
+
+ private final VersionedTestPlugin plugin;
+ private final String version;
+ private String location;
+
+ private BuildInfo(VersionedTestPlugin plugin, String version) {
+ this.plugin = plugin;
+ this.version = version;
+ }
+
+ private void setLocation(String location) {
+ this.location = location;
+ }
+
+ public VersionedTestPlugin plugin() {
+ return plugin;
+ }
+
+ public String version() {
+ return version;
+ }
+
+ public String location() {
+ return location;
+ }
+ }
+
+ private final List<BuildInfo> pluginBuilds;
+
+ public VersionedPluginBuilder() {
+ pluginBuilds = new ArrayList<>();
+ }
+
+ public VersionedPluginBuilder include(VersionedTestPlugin plugin, String
version) {
+ pluginBuilds.add(new BuildInfo(plugin, version));
+ return this;
+ }
+
+ public synchronized Path build(String pluginDir) throws IOException {
+ Path pluginDirPath = Files.createTempDirectory(pluginDir);
+ pluginDirPath.toFile().deleteOnExit();
+ Path subDir = Files.createDirectory(pluginDirPath.resolve("lib"));
+ subDir.toFile().deleteOnExit();
+ for (BuildInfo buildInfo : pluginBuilds) {
+ Path jarFile =
TestPlugins.createPluginJar(buildInfo.plugin.resourceDir(), ignored -> false,
Collections.singletonMap(VERSION_PLACEHOLDER, buildInfo.version));
+ Path targetJar =
subDir.resolve(jarFile.getFileName()).toAbsolutePath();
+ buildInfo.setLocation(targetJar.toString());
+ targetJar.toFile().deleteOnExit();
+ Files.move(jarFile, targetJar);
+ }
+ return pluginDirPath.toAbsolutePath();
+ }
+
+ public List<BuildInfo> buildInfos() {
+ return pluginBuilds;
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
new file mode 100644
index 00000000000..d37bb90859a
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedConverter
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
new file mode 100644
index 00000000000..766f29330ef
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * Converter to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public class VersionedConverter implements Converter, Versioned {
+
+ public VersionedConverter() {
+ super();
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ }
+
+ @Override
+ public byte[] fromConnectData(final String topic, final Schema schema,
final Object value) {
+ return new byte[0];
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(final String topic, final byte[]
value) {
+ return null;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ // version specific config will have the defaul value
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin
compilation
+ // this will help with testing differnt configdef for
different version of converter
+ .define("version-specific-config", ConfigDef.Type.STRING,
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+ .define("other-config", ConfigDef.Type.STRING, "defaultVal",
ConfigDef.Importance.HIGH, "other docs");
+ }
+
+ @Override
+ public String version() {
+ return "PLACEHOLDER_FOR_VERSION";
+ }
+
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
new file mode 100644
index 00000000000..25e4b7665be
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedHeaderConverter
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
new file mode 100644
index 00000000000..c0ef947e669
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import java.util.Map;
+
+/**
+ * Header Converter to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public class VersionedHeaderConverter implements HeaderConverter, Versioned {
+
+ public VersionedHeaderConverter() {
+ super();
+ }
+
+ @Override
+ public SchemaAndValue toConnectHeader(String topic, String headerKey,
byte[] value) {
+ return null;
+ }
+
+ @Override
+ public byte[] fromConnectHeader(String topic, String headerKey, Schema
schema, Object value) {
+ return new byte[0];
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ // version specific config will have the defaul value
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin
compilation
+ // this will help with testing differnt configdef for
different version of header converter
+ .define("version-specific-config", ConfigDef.Type.STRING,
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+ .define("other-config", ConfigDef.Type.STRING, "defaultVal",
ConfigDef.Importance.HIGH, "other docs");
+ }
+
+ @Override
+ public String version() {
+ return "PLACEHOLDER_FOR_VERSION";
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
new file mode 100644
index 00000000000..af841817aba
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedPredicate
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
new file mode 100644
index 00000000000..2e92c79c351
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+import java.util.Map;
+
+/**
+ /**
+ * Predicate to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public class VersionedPredicate<R extends ConnectRecord<R>> implements
Predicate<R>, Versioned {
+
+ @Override
+ public String version() {
+ return "PLACEHOLDER_FOR_VERSION";
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ // version specific config will have the defaul value
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin
compilation
+ // this will help with testing differnt configdef for
different version of the predicate
+ .define("version-specific-config", ConfigDef.Type.STRING,
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+ .define("other-config", ConfigDef.Type.STRING, "defaultVal",
ConfigDef.Importance.HIGH, "other docs");
+ }
+
+ @Override
+ public boolean test(R record) {
+ return false;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+
+ }
+}
\ No newline at end of file
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
b/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
new file mode 100644
index 00000000000..a5c560853f0
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedSinkConnector
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/test/plugins/VersionedSinkConnector.java
b/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/test/plugins/VersionedSinkConnector.java
new file mode 100644
index 00000000000..9710099368b
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/test/plugins/VersionedSinkConnector.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+
+/**
+ * VersionedSamplingSourceConnector is a test connector that extends
SamplingConnector and overrides the version method.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public class VersionedSinkConnector extends SinkConnector {
+
+ public VersionedSinkConnector() {
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return VersionedSinkConnectorTask.class;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ List<Map<String, String>> configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(Collections.singletonMap("task-config-version",
"PLACEHOLDER_FOR_VERSION"));
+ }
+ return configs;
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ // version specific config will have the defaul value
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin
compilation
+ // this will help with testing differnt configdef for
different version of connector
+ .define("version-specific-config", ConfigDef.Type.STRING,
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+ .define("other-config", ConfigDef.Type.STRING, "defaultVal",
ConfigDef.Importance.HIGH, "other docs");
+ }
+
+ @Override
+ public String version() {
+ return "PLACEHOLDER_FOR_VERSION";
+ }
+
+ public static class VersionedSinkConnectorTask extends SinkTask {
+
+ @Override
+ public String version() {
+ return "PLACEHOLDER_FOR_VERSION";
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ }
+
+ @Override
+ public void put(Collection<SinkRecord> records) {
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/META-INF/services/org.apache.kafka.connect.source.SourceConnector
b/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/META-INF/services/org.apache.kafka.connect.source.SourceConnector
new file mode 100644
index 00000000000..efee272749d
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/META-INF/services/org.apache.kafka.connect.source.SourceConnector
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedSourceConnector
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/test/plugins/VersionedSourceConnector.java
b/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/test/plugins/VersionedSourceConnector.java
new file mode 100644
index 00000000000..4ef066b4c8c
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/test/plugins/VersionedSourceConnector.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+
+/**
+ * VersionedSamplingSourceConnector is a test connector that extends
SamplingConnector and overrides the version method.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public class VersionedSourceConnector extends SourceConnector {
+
+ public VersionedSourceConnector() {
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return VersionedSourceConnectorTask.class;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ List<Map<String, String>> configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(Collections.singletonMap("task-config-version",
"PLACEHOLDER_FOR_VERSION"));
+ }
+ return configs;
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ // version specific config will have the defaul value
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin
compilation
+ // this will help with testing differnt configdef for
different version of connector
+ .define("version-specific-config", ConfigDef.Type.STRING,
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+ .define("other-config", ConfigDef.Type.STRING, "defaultVal",
ConfigDef.Importance.HIGH, "other docs");
+ }
+
+ @Override
+ public String version() {
+ return "PLACEHOLDER_FOR_VERSION";
+ }
+
+ public static class VersionedSourceConnectorTask extends SourceTask {
+
+ @Override
+ public String version() {
+ return "PLACEHOLDER_FOR_VERSION";
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ }
+
+ @Override
+ public List<SourceRecord> poll() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/META-INF/services/org.apache.kafka.connect.transforms.Transformation
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/META-INF/services/org.apache.kafka.connect.transforms.Transformation
new file mode 100644
index 00000000000..7fed78370ff
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/META-INF/services/org.apache.kafka.connect.transforms.Transformation
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedTransformation
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
new file mode 100644
index 00000000000..0422834d027
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+
+/**
+ * Transformation to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public class VersionedTransformation<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+
+
+ @Override
+ public R apply(R record) {
+ return null;
+ }
+
+ @Override
+ public String version() {
+ return "PLACEHOLDER_FOR_VERSION";
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ // version specific config will have the defaul value
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin
compilation
+ // this will help with testing differnt configdef for
different version of the transformation
+ .define("version-specific-config", ConfigDef.Type.STRING,
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+ .define("other-config", ConfigDef.Type.STRING, "defaultVal",
ConfigDef.Importance.HIGH, "other docs");
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+
+ }
+}