This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2694d7aad98 KAFKA-19248: Multiversioning in Kafka Connect - Plugin 
Loading Isolation Tests (#18325)
2694d7aad98 is described below

commit 2694d7aad987ca60aabbbc54f52e5503ad1bbe25
Author: snehashisp <[email protected]>
AuthorDate: Thu Jun 5 06:31:18 2025 +0530

    KAFKA-19248: Multiversioning in Kafka Connect - Plugin Loading Isolation 
Tests (#18325)
    
    This adds tests for 
[KIP-891](https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+Connector+plugins).
    It primarily focuses on tests for the new additions in plugin loading
    isolation. It has dependency on the actual KIP implementation PRs and
    should be merged post https://github.com/apache/kafka/pull/17742
    
    Reviewers: Greg Harris <[email protected]>
---
 .../runtime/isolation/PluginsRecommenders.java     |   8 +-
 .../runtime/isolation/MultiVersionTest.java        | 291 +++++++++++++++++++++
 .../runtime/isolation/PluginRecommenderTest.java   | 151 +++++++++++
 .../connect/runtime/isolation/TestPlugins.java     |  28 +-
 .../runtime/isolation/VersionedPluginBuilder.java  | 114 ++++++++
 .../org.apache.kafka.connect.storage.Converter     |  16 ++
 .../test/plugins/VersionedConverter.java           |  66 +++++
 ...rg.apache.kafka.connect.storage.HeaderConverter |  16 ++
 .../test/plugins/VersionedHeaderConverter.java     |  69 +++++
 ...e.kafka.connect.transforms.predicates.Predicate |  16 ++
 .../test/plugins/VersionedPredicate.java           |  63 +++++
 .../org.apache.kafka.connect.sink.SinkConnector    |  16 ++
 .../test/plugins/VersionedSinkConnector.java       |  96 +++++++
 ...org.apache.kafka.connect.source.SourceConnector |  16 ++
 .../test/plugins/VersionedSourceConnector.java     |  96 +++++++
 ....apache.kafka.connect.transforms.Transformation |  16 ++
 .../test/plugins/VersionedTransformation.java      |  63 +++++
 17 files changed, 1131 insertions(+), 10 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
index a1ccf209e69..fa9efeed7d3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
@@ -77,12 +77,12 @@ public class PluginsRecommenders {
         return headerConverterPluginVersionRecommender;
     }
 
-    public TransformationPluginRecommender 
transformationPluginRecommender(String classOrAlias) {
-        return new TransformationPluginRecommender(classOrAlias);
+    public TransformationPluginRecommender 
transformationPluginRecommender(String classOrAliasConfig) {
+        return new TransformationPluginRecommender(classOrAliasConfig);
     }
 
-    public PredicatePluginRecommender predicatePluginRecommender(String 
classOrAlias) {
-        return new PredicatePluginRecommender(classOrAlias);
+    public PredicatePluginRecommender predicatePluginRecommender(String 
classOrAliasConfig) {
+        return new PredicatePluginRecommender(classOrAliasConfig);
     }
 
     public class ConnectorPluginVersionRecommender implements 
ConfigDef.Recommender {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
new file mode 100644
index 00000000000..b770de74b80
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+    private static Plugins setUpPlugins(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+        String pluginPath = 
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+        Map<String, String> configs = new HashMap<>();
+        configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+        configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+        return new Plugins(configs);
+    }
+
+    private void assertPluginLoad(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+            throws InvalidVersionSpecificationException, 
ClassNotFoundException {
+
+        Plugins plugins = setUpPlugins(artifacts, mode);
+
+        for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : 
artifacts.entrySet()) {
+            String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+            for (VersionedPluginBuilder.BuildInfo buildInfo : 
entry.getValue()) {
+                ClassLoader pluginLoader = 
plugins.pluginLoader(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(PluginClassLoader.class, 
pluginLoader);
+                Assertions.assertTrue(((PluginClassLoader) 
pluginLoader).location().contains(pluginLocation));
+                Object p = plugins.newPlugin(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(Versioned.class, p);
+                Assertions.assertEquals(buildInfo.version(), ((Versioned) 
p).version());
+            }
+        }
+    }
+
+    private void assertCorrectLatestPluginVersion(
+            Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts,
+            PluginDiscoveryMode mode,
+            String latestVersion
+    ) {
+        Plugins plugins = setUpPlugins(artifacts, mode);
+        List<String> classes = artifacts.values().stream()
+                .flatMap(List::stream)
+                .map(VersionedPluginBuilder.BuildInfo::plugin)
+                .map(VersionedPluginBuilder.VersionedTestPlugin::className)
+                .distinct()
+                .toList();
+        for (String className : classes) {
+            String version = plugins.latestVersion(className, 
PluginType.values());
+            Assertions.assertEquals(latestVersion, version);
+        }
+    }
+
+    private static Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
buildIsolatedArtifacts(
+            String[] versions,
+            VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes
+    ) throws IOException {
+        Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new 
HashMap<>();
+        for (String v : versions) {
+            for (VersionedPluginBuilder.VersionedTestPlugin pluginType: 
pluginTypes) {
+                VersionedPluginBuilder builder = new VersionedPluginBuilder();
+                builder.include(pluginType, v);
+                artifacts.put(builder.build(pluginType + "-" + v), 
builder.buildInfos());
+            }
+        }
+        return artifacts;
+    }
+
+    public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION;
+    public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
DEFAULT_ISOLATED_ARTIFACTS;
+    public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
DEFAULT_COMBINED_ARTIFACT;
+    public static final Plugins MULTI_VERSION_PLUGINS;
+    public static final Map<VersionedPluginBuilder.VersionedTestPlugin, 
String> DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+
+    static {
+
+        String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0", 
"2.3.0", "4.3.0"};
+        try {
+            DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts(
+                defaultIsolatedArtifactsVersions, 
VersionedPluginBuilder.VersionedTestPlugin.values()
+            );
+            DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0";
+            DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>();
+
+            VersionedPluginBuilder builder = new VersionedPluginBuilder();
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
 k -> "0.0.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
 k -> "0.1.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
 k -> "0.2.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
 k -> "0.3.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
 k -> "0.4.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
 k -> "0.5.0"));
+            DEFAULT_COMBINED_ARTIFACT = 
Collections.singletonMap(builder.build("all_versioned_artifact"), 
builder.buildInfos());
+
+            Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new 
HashMap<>();
+            artifacts.putAll(DEFAULT_COMBINED_ARTIFACT);
+            artifacts.putAll(DEFAULT_ISOLATED_ARTIFACTS);
+            MULTI_VERSION_PLUGINS = setUpPlugins(artifacts, 
PluginDiscoveryMode.SERVICE_LOAD);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testVersionedPluginLoaded() throws 
InvalidVersionSpecificationException, ClassNotFoundException {
+        assertPluginLoad(DEFAULT_COMBINED_ARTIFACT, 
PluginDiscoveryMode.SERVICE_LOAD);
+        assertPluginLoad(DEFAULT_COMBINED_ARTIFACT, 
PluginDiscoveryMode.ONLY_SCAN);
+    }
+
+    @Test
+    public void testMultipleIsolatedVersionedPluginLoading() throws 
InvalidVersionSpecificationException, ClassNotFoundException {
+        assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.SERVICE_LOAD);
+        assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.ONLY_SCAN);
+    }
+
+    @Test
+    public void testLatestVersion() {
+        assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.SERVICE_LOAD, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION);
+        assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.ONLY_SCAN, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION);
+    }
+
+    @Test
+    public void testBundledPluginLoading() throws 
InvalidVersionSpecificationException, ClassNotFoundException {
+
+        Plugins plugins = MULTI_VERSION_PLUGINS;
+        // get the connector loader of the combined artifact which includes 
all plugin types
+        ClassLoader connectorLoader = plugins.pluginLoader(
+            
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
+            PluginUtils.connectorVersionRequirement("0.1.0")
+        );
+        Assertions.assertInstanceOf(PluginClassLoader.class, connectorLoader);
+
+        List<VersionedPluginBuilder.VersionedTestPlugin> pluginTypes = List.of(
+            VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+            VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+            VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+            VersionedPluginBuilder.VersionedTestPlugin.PREDICATE
+        );
+        // should match the version used in setUp for creating the combined 
artifact
+        List<String> versions = 
pluginTypes.stream().map(DEFAULT_COMBINED_ARTIFACT_VERSIONS::get).toList();
+        for (int i = 0; i < 4; i++) {
+            String className = pluginTypes.get(i).className();
+            // when using the connector loader, the version and plugin 
returned should be from the ones in the combined artifact
+            String version = plugins.pluginVersion(className, connectorLoader, 
PluginType.values());
+            Assertions.assertEquals(versions.get(i), version);
+            Object p = plugins.newPlugin(className, null, connectorLoader);
+            Assertions.assertInstanceOf(Versioned.class, p);
+            Assertions.assertEquals(versions.get(i), ((Versioned) 
p).version());
+
+            String latestVersion = plugins.latestVersion(className, 
PluginType.values());
+            Assertions.assertEquals(DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION, 
latestVersion);
+        }
+    }
+
+    @Test
+    public void testCorrectVersionRange() throws IOException, 
InvalidVersionSpecificationException, ClassNotFoundException {
+        Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = 
buildIsolatedArtifacts(
+            new String[]{"1.0.0", "1.1.0", "1.1.2", "2.0.0", "2.0.2", "3.0.0", 
"4.0.0"},
+            VersionedPluginBuilder.VersionedTestPlugin.values()
+        );
+
+        Plugins plugins = setUpPlugins(artifacts, 
PluginDiscoveryMode.SERVICE_LOAD);
+        Map<VersionRange, String> requiredVersions = new HashMap<>();
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("latest"), 
"4.0.0");
+        requiredVersions.put(PluginUtils.connectorVersionRequirement(null), 
"4.0.0");
+        requiredVersions.put(PluginUtils.connectorVersionRequirement("1.0.0"), 
"1.0.0");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("[2.0.2]"), 
"2.0.2");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("[1.1.0,3.0.1]"), 
"3.0.0");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,2.0.0)"), 
"1.1.2");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,1.0.0]"), 
"1.0.0");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("[2.0.0,)"), 
"4.0.0");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,2.0.0],[2.0.3, 
2.0.4)"), "2.0.0");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("(2.0.0,3.0.0)"), 
"2.0.2");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,1.1.0),[4.1.1,)"),
 "1.0.0");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("[1.1.0,1.1.0]"), 
"1.1.0");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("(,1.1.0),(2.0.0, 
2.0.2]"), "2.0.2");
+        
requiredVersions.put(PluginUtils.connectorVersionRequirement("[1.1.0,1.1.3)"), 
"1.1.2");
+
+        for (Map.Entry<VersionRange, String> entry : 
requiredVersions.entrySet()) {
+            for (VersionedPluginBuilder.VersionedTestPlugin pluginType: 
VersionedPluginBuilder.VersionedTestPlugin.values()) {
+                Object p = plugins.newPlugin(pluginType.className(), 
entry.getKey());
+                Assertions.assertInstanceOf(Versioned.class, p);
+                Assertions.assertEquals(entry.getValue(), ((Versioned) 
p).version(),
+                    String.format("Provided Version Range %s for class %s 
should return plugin version %s instead of %s",
+                        entry.getKey(), pluginType.className(), 
entry.getValue(), ((Versioned) p).version()));
+            }
+        }
+    }
+
+    @Test
+    public void testInvalidVersionRange() throws IOException, 
InvalidVersionSpecificationException {
+        String[] validVersions = new String[]{"1.0.0", "1.1.0", "1.1.2", 
"2.0.0", "2.0.2", "3.0.0", "4.0.0"};
+        Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = 
buildIsolatedArtifacts(
+            validVersions,
+            VersionedPluginBuilder.VersionedTestPlugin.values()
+        );
+
+        Plugins plugins = setUpPlugins(artifacts, 
PluginDiscoveryMode.SERVICE_LOAD);
+        Set<VersionRange> invalidVersions = new HashSet<>();
+        invalidVersions.add(PluginUtils.connectorVersionRequirement("0.9.0"));
+        
invalidVersions.add(PluginUtils.connectorVersionRequirement("[4.0.1,)"));
+        
invalidVersions.add(PluginUtils.connectorVersionRequirement("(4.0.0,)"));
+        
invalidVersions.add(PluginUtils.connectorVersionRequirement("[4.0.1]"));
+        invalidVersions.add(PluginUtils.connectorVersionRequirement("(2.0.0, 
2.0.1)"));
+        
invalidVersions.add(PluginUtils.connectorVersionRequirement("(,1.0.0)"));
+        invalidVersions.add(PluginUtils.connectorVersionRequirement("(1.1.0, 
1.1.2)"));
+        invalidVersions.add(PluginUtils.connectorVersionRequirement("(1.1.0, 
1.1.2),[1.1.3, 2.0.0)"));
+
+        for (VersionRange versionRange : invalidVersions) {
+            for (VersionedPluginBuilder.VersionedTestPlugin pluginType: 
VersionedPluginBuilder.VersionedTestPlugin.values()) {
+                VersionedPluginLoadingException e = 
Assertions.assertThrows(VersionedPluginLoadingException.class, () -> {
+                    plugins.newPlugin(pluginType.className(), versionRange);
+                }, String.format("Provided Version Range %s for class %s 
should throw VersionedPluginLoadingException", versionRange, 
pluginType.className()));
+                Assertions.assertEquals(e.availableVersions(), 
List.of(validVersions));
+            }
+        }
+    }
+
+    @Test
+    public void testVersionedConverter() {
+        Plugins plugins = setUpPlugins(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.SERVICE_LOAD);
+        Map<String, String> converterConfig = new HashMap<>();
+        converterConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+        converterConfig.put(WorkerConfig.KEY_CONVERTER_VERSION, "1.1.0");
+        converterConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+        converterConfig.put(WorkerConfig.VALUE_CONVERTER_VERSION, "2.3.0");
+        converterConfig.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
+        converterConfig.put(WorkerConfig.HEADER_CONVERTER_VERSION, "4.3.0");
+
+        AbstractConfig config;
+        try (LoaderSwap swap = 
plugins.safeLoaderSwapper().apply(plugins.delegatingLoader())) {
+            config = new PluginsTest.TestableWorkerConfig(converterConfig);
+        }
+
+        Converter keyConverter = plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
+        Assertions.assertEquals(keyConverter.getClass().getName(), 
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+        Assertions.assertInstanceOf(Versioned.class, keyConverter);
+        Assertions.assertEquals("1.1.0", ((Versioned) keyConverter).version());
+
+        Converter valueConverter = plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION);
+        Assertions.assertEquals(valueConverter.getClass().getName(), 
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+        Assertions.assertInstanceOf(Versioned.class, valueConverter);
+        Assertions.assertEquals("2.3.0", ((Versioned) 
valueConverter).version());
+
+        HeaderConverter headerConverter = plugins.newHeaderConverter(config, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION);
+        Assertions.assertEquals(headerConverter.getClass().getName(), 
VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
+        Assertions.assertInstanceOf(Versioned.class, headerConverter);
+        Assertions.assertEquals("4.3.0", ((Versioned) 
headerConverter).version());
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java
new file mode 100644
index 00000000000..35e6f43b3f6
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+import static 
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_ISOLATED_ARTIFACTS;
+import static 
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.MULTI_VERSION_PLUGINS;
+
+public class PluginRecommenderTest {
+
+    private Set<String> allVersionsOf(String classOrAlias) {
+        Set<String> versions = DEFAULT_ISOLATED_ARTIFACTS.values().stream()
+            .flatMap(List::stream)
+            .filter(b -> b.plugin().className().equals(classOrAlias))
+            .map(VersionedPluginBuilder.BuildInfo::version)
+            .collect(Collectors.toSet());
+        
Arrays.stream(VersionedPluginBuilder.VersionedTestPlugin.values()).filter(p -> 
p.className().equals(classOrAlias))
+            .forEach(r -> 
versions.add(DEFAULT_COMBINED_ARTIFACT_VERSIONS.get(r)));
+        return versions;
+    }
+
+    @Test
+    public void testConnectorVersionRecommenders() {
+        PluginsRecommenders recommender = new 
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+        for (String connectorClass : Arrays.asList(
+            
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
+            
VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR.className())
+        ) {
+            Set<String> versions = 
recommender.connectorPluginVersionRecommender().validValues(
+                ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass)
+            ).stream().map(Object::toString).collect(Collectors.toSet());
+            Set<String> allVersions = allVersionsOf(connectorClass);
+            Assertions.assertEquals(allVersions.size(), versions.size());
+            allVersions.forEach(v -> 
Assertions.assertTrue(versions.contains(v), "Missing version " + v + " for 
connector " + connectorClass));
+        }
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testConverterVersionRecommenders() throws 
ClassNotFoundException {
+        PluginsRecommenders recommender = new 
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+        Map<String, Object> config = new HashMap<>();
+        Class converterClass = 
MULTI_VERSION_PLUGINS.pluginClass(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+        config.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, converterClass);
+        config.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, 
converterClass);
+        Set<String> allVersions = 
allVersionsOf(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
+        for (ConfigDef.Recommender r : 
Arrays.asList(recommender.keyConverterPluginVersionRecommender(), 
recommender.valueConverterPluginVersionRecommender())) {
+            Set<String> versions = r.validValues(null, 
config).stream().map(Object::toString).collect(Collectors.toSet());
+            Assertions.assertEquals(allVersions.size(), versions.size());
+            allVersions.forEach(v -> 
Assertions.assertTrue(versions.contains(v), "Missing version " + v + " for 
converter"));
+        }
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testHeaderConverterVersionRecommenders() throws 
ClassNotFoundException {
+        PluginsRecommenders recommender = new 
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+        Map<String, Object> config = new HashMap<>();
+        Class headerConverterClass = 
MULTI_VERSION_PLUGINS.pluginClass(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
+        config.put(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG, 
headerConverterClass);
+        Set<String> versions = 
recommender.headerConverterPluginVersionRecommender().validValues(null, 
config).stream().map(Object::toString).collect(Collectors.toSet());
+        Set<String> allVersions = 
allVersionsOf(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
+        Assertions.assertEquals(allVersions.size(), versions.size());
+        allVersions.forEach(v -> Assertions.assertTrue(versions.contains(v), 
"Missing version " + v + " for header converter"));
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testTransformationVersionRecommenders() throws 
ClassNotFoundException {
+        PluginsRecommenders recommender = new 
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+        Class transformationClass = 
MULTI_VERSION_PLUGINS.pluginClass(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION.className());
+        Set<String> versions = 
recommender.transformationPluginRecommender("transforms.t1.type")
+            .validValues("transforms.t1.type", 
Collections.singletonMap("transforms.t1.type", transformationClass))
+            .stream().map(Object::toString).collect(Collectors.toSet());
+        Set<String> allVersions = 
allVersionsOf(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION.className());
+        Assertions.assertEquals(allVersions.size(), versions.size());
+        allVersions.forEach(v -> Assertions.assertTrue(versions.contains(v), 
"Missing version " + v + " for transformation"));
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testPredicateVersionRecommenders() throws 
ClassNotFoundException {
+        PluginsRecommenders recommender = new 
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+        Class predicateClass = 
MULTI_VERSION_PLUGINS.pluginClass(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE.className());
+        Set<String> versions = 
recommender.predicatePluginRecommender("predicates.p1.type")
+            .validValues("predicates.p1.type", 
Collections.singletonMap("predicates.p1.type", predicateClass))
+            .stream().map(Object::toString).collect(Collectors.toSet());
+        Set<String> allVersions = 
allVersionsOf(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE.className());
+        Assertions.assertEquals(allVersions.size(), versions.size());
+        allVersions.forEach(v -> Assertions.assertTrue(versions.contains(v), 
"Missing version " + v + " for predicate"));
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testConverterPluginRecommender() {
+        PluginsRecommenders recommender = new 
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+        Set<String> converters = 
recommender.converterPluginRecommender().validValues(null, null)
+            .stream().map(c -> ((Class) 
c).getName()).collect(Collectors.toSet());
+        
Assertions.assertTrue(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className()));
+        // some sanity checks to ensure that other plugin types are not 
included
+        
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className()));
+        
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR.className()));
+        
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className()));
+        
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION.className()));
+        
Assertions.assertFalse(converters.contains(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE.className()));
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testHeaderConverterPluginRecommender() {
+        PluginsRecommenders recommender = new 
PluginsRecommenders(MULTI_VERSION_PLUGINS);
+        Set<String> headerConverters = 
recommender.headerConverterPluginRecommender().validValues(null, null)
+            .stream().map(c -> ((Class) 
c).getName()).collect(Collectors.toSet());
+        
Assertions.assertTrue(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className()));
+        // some sanity checks to ensure that other plugin types are not 
included
+        
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className()));
+        
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR.className()));
+        
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className()));
+        
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION.className()));
+        
Assertions.assertFalse(headerConverters.contains(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE.className()));
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index d4da3987a64..14a90197a8d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -298,7 +298,7 @@ public class TestPlugins {
                 if (pluginJars.containsKey(testPackage)) {
                     log.debug("Skipping recompilation of {}", 
testPackage.resourceDir());
                 }
-                pluginJars.put(testPackage, 
createPluginJar(testPackage.resourceDir(), testPackage.removeRuntimeClasses()));
+                pluginJars.put(testPackage, 
createPluginJar(testPackage.resourceDir(), testPackage.removeRuntimeClasses(), 
Collections.emptyMap()));
             }
         } catch (Throwable e) {
             log.error("Could not set up plugin test jars", e);
@@ -385,10 +385,11 @@ public class TestPlugins {
                 .toArray(TestPlugin[]::new);
     }
 
-    private static Path createPluginJar(String resourceDir, Predicate<String> 
removeRuntimeClasses) throws IOException {
+
+    static Path createPluginJar(String resourceDir, Predicate<String> 
removeRuntimeClasses, Map<String, String> replacements) throws IOException {
         Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir);
         Path binDir = Files.createTempDirectory(resourceDir + ".bin.");
-        compileJavaSources(inputDir, binDir);
+        compileJavaSources(inputDir, binDir, replacements);
         Path jarFile = Files.createTempFile(resourceDir + ".", ".jar");
         try (JarOutputStream jar = openJarFile(jarFile)) {
             writeJar(jar, inputDir, removeRuntimeClasses);
@@ -448,7 +449,7 @@ public class TestPlugins {
      * @param sourceDir Directory containing java source files
      * @throws IOException if the files cannot be compiled
      */
-    private static void compileJavaSources(Path sourceDir, Path binDir) throws 
IOException {
+    private static void compileJavaSources(Path sourceDir, Path binDir, 
Map<String, String> replacements) throws IOException {
         JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
         List<File> sourceFiles;
         try (Stream<Path> stream = Files.walk(sourceDir)) {
@@ -456,13 +457,14 @@ public class TestPlugins {
                     .filter(Files::isRegularFile)
                     .map(Path::toFile)
                     .filter(file -> file.getName().endsWith(".java"))
+                    .map(file -> replacements.isEmpty() ? file : 
copyAndReplace(file, replacements))
                     .collect(Collectors.toList());
         }
+
         StringWriter writer = new StringWriter();
         List<String> options = Arrays.asList(
             "-d", binDir.toString() // Write class output to a different 
directory.
         );
-
         try (StandardJavaFileManager fileManager = 
compiler.getStandardFileManager(null, null, null)) {
             boolean success = compiler.getTask(
                 writer,
@@ -478,6 +480,21 @@ public class TestPlugins {
         }
     }
 
+    private static File copyAndReplace(File source, Map<String, String> 
replacements) throws RuntimeException {
+        try {
+            String content = Files.readString(source.toPath());
+            for (Map.Entry<String, String> entry : replacements.entrySet()) {
+                content = content.replace(entry.getKey(), entry.getValue());
+            }
+            File tmpFile = new File(System.getProperty("java.io.tmpdir") + 
File.separator + source.getName());
+            Files.writeString(tmpFile.toPath(), content);
+            tmpFile.deleteOnExit();
+            return tmpFile;
+        } catch (IOException e) {
+            throw new RuntimeException("Could not copy and replace file: " + 
source, e);
+        }
+    }
+
     private static void writeJar(JarOutputStream jar, Path inputDir, 
Predicate<String> removeRuntimeClasses) throws IOException {
         List<Path> paths;
         try (Stream<Path> stream = Files.walk(inputDir)) {
@@ -503,5 +520,4 @@ public class TestPlugins {
             }
         }
     }
-
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java
new file mode 100644
index 00000000000..d2f349c2011
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VersionedPluginBuilder {
+
+    private static final String VERSION_PLACEHOLDER = 
"PLACEHOLDER_FOR_VERSION";
+
+    public enum VersionedTestPlugin {
+
+        SINK_CONNECTOR("versioned-sink-connector", 
"test.plugins.VersionedSinkConnector"),
+        SOURCE_CONNECTOR("versioned-source-connector", 
"test.plugins.VersionedSourceConnector"),
+        CONVERTER("versioned-converter", "test.plugins.VersionedConverter"),
+        HEADER_CONVERTER("versioned-header-converter", 
"test.plugins.VersionedHeaderConverter"),
+        TRANSFORMATION("versioned-transformation", 
"test.plugins.VersionedTransformation"),
+        PREDICATE("versioned-predicate", "test.plugins.VersionedPredicate");
+
+        private final String resourceDir;
+        private final String className;
+
+        VersionedTestPlugin(String resourceDir, String className) {
+            this.resourceDir = resourceDir;
+            this.className = className;
+        }
+
+        public String resourceDir() {
+            return resourceDir;
+        }
+
+        public String className() {
+            return className;
+        }
+    }
+
+    public static class BuildInfo {
+
+        private final VersionedTestPlugin plugin;
+        private final String version;
+        private String location;
+
+        private BuildInfo(VersionedTestPlugin plugin, String version) {
+            this.plugin = plugin;
+            this.version = version;
+        }
+
+        private void setLocation(String location) {
+            this.location = location;
+        }
+
+        public VersionedTestPlugin plugin() {
+            return plugin;
+        }
+
+        public String version() {
+            return version;
+        }
+
+        public String location() {
+            return location;
+        }
+    }
+
+    private final List<BuildInfo> pluginBuilds;
+
+    public VersionedPluginBuilder() {
+        pluginBuilds = new ArrayList<>();
+    }
+
+    public VersionedPluginBuilder include(VersionedTestPlugin plugin, String 
version) {
+        pluginBuilds.add(new BuildInfo(plugin, version));
+        return this;
+    }
+
+    public synchronized Path build(String pluginDir) throws IOException {
+        Path pluginDirPath = Files.createTempDirectory(pluginDir);
+        pluginDirPath.toFile().deleteOnExit();
+        Path subDir = Files.createDirectory(pluginDirPath.resolve("lib"));
+        subDir.toFile().deleteOnExit();
+        for (BuildInfo buildInfo : pluginBuilds) {
+            Path jarFile = 
TestPlugins.createPluginJar(buildInfo.plugin.resourceDir(), ignored -> false, 
Collections.singletonMap(VERSION_PLACEHOLDER, buildInfo.version));
+            Path targetJar = 
subDir.resolve(jarFile.getFileName()).toAbsolutePath();
+            buildInfo.setLocation(targetJar.toString());
+            targetJar.toFile().deleteOnExit();
+            Files.move(jarFile, targetJar);
+        }
+        return pluginDirPath.toAbsolutePath();
+    }
+
+    public List<BuildInfo> buildInfos() {
+        return pluginBuilds;
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
 
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
new file mode 100644
index 00000000000..d37bb90859a
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedConverter
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
new file mode 100644
index 00000000000..766f29330ef
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * Converter to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public class VersionedConverter implements Converter, Versioned {
+
+    public VersionedConverter() {
+        super();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+    }
+
+    @Override
+    public byte[] fromConnectData(final String topic, final Schema schema, 
final Object value) {
+        return new byte[0];
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(final String topic, final byte[] 
value) {
+        return null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef()
+                // version specific config will have the defaul value 
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin 
compilation
+                // this will help with testing differnt configdef for 
different version of converter
+                .define("version-specific-config", ConfigDef.Type.STRING, 
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+                .define("other-config", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "other docs");
+    }
+
+    @Override
+    public String version() {
+        return "PLACEHOLDER_FOR_VERSION";
+    }
+
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
 
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
new file mode 100644
index 00000000000..25e4b7665be
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedHeaderConverter
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
new file mode 100644
index 00000000000..c0ef947e669
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import java.util.Map;
+
+/**
+ * Header Converter to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public class VersionedHeaderConverter implements HeaderConverter, Versioned {
+
+    public VersionedHeaderConverter() {
+        super();
+    }
+
+    @Override
+    public SchemaAndValue toConnectHeader(String topic, String headerKey, 
byte[] value) {
+        return null;
+    }
+
+    @Override
+    public byte[] fromConnectHeader(String topic, String headerKey, Schema 
schema, Object value) {
+        return new byte[0];
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef()
+                // version specific config will have the defaul value 
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin 
compilation
+                // this will help with testing differnt configdef for 
different version of header converter
+                .define("version-specific-config", ConfigDef.Type.STRING, 
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+                .define("other-config", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "other docs");
+    }
+
+    @Override
+    public String version() {
+        return "PLACEHOLDER_FOR_VERSION";
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
 
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
new file mode 100644
index 00000000000..af841817aba
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedPredicate
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
new file mode 100644
index 00000000000..2e92c79c351
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+import java.util.Map;
+
+/**
+ /**
+ * Predicate to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public class VersionedPredicate<R extends ConnectRecord<R>> implements 
Predicate<R>, Versioned {
+
+    @Override
+    public String version() {
+        return "PLACEHOLDER_FOR_VERSION";
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef()
+                // version specific config will have the defaul value 
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin 
compilation
+                // this will help with testing differnt configdef for 
different version of the predicate
+                .define("version-specific-config", ConfigDef.Type.STRING, 
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+                .define("other-config", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "other docs");
+    }
+
+    @Override
+    public boolean test(R record) {
+        return false;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+}
\ No newline at end of file
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
 
b/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
new file mode 100644
index 00000000000..a5c560853f0
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedSinkConnector
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/test/plugins/VersionedSinkConnector.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/test/plugins/VersionedSinkConnector.java
new file mode 100644
index 00000000000..9710099368b
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-sink-connector/test/plugins/VersionedSinkConnector.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+
+/**
+ * VersionedSamplingSourceConnector is a test connector that extends 
SamplingConnector and overrides the version method.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public class VersionedSinkConnector extends SinkConnector {
+
+    public VersionedSinkConnector() {
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return VersionedSinkConnectorTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            configs.add(Collections.singletonMap("task-config-version", 
"PLACEHOLDER_FOR_VERSION"));
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef()
+                // version specific config will have the defaul value 
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin 
compilation
+                // this will help with testing differnt configdef for 
different version of connector
+                .define("version-specific-config", ConfigDef.Type.STRING, 
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+                .define("other-config", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "other docs");
+    }
+
+    @Override
+    public String version() {
+        return "PLACEHOLDER_FOR_VERSION";
+    }
+
+    public static class VersionedSinkConnectorTask extends SinkTask {
+
+        @Override
+        public String version() {
+            return "PLACEHOLDER_FOR_VERSION";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+        }
+
+        @Override
+        public void put(Collection<SinkRecord> records) {
+        }
+
+        @Override
+        public void stop() {
+        }
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/META-INF/services/org.apache.kafka.connect.source.SourceConnector
 
b/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/META-INF/services/org.apache.kafka.connect.source.SourceConnector
new file mode 100644
index 00000000000..efee272749d
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/META-INF/services/org.apache.kafka.connect.source.SourceConnector
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedSourceConnector
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/test/plugins/VersionedSourceConnector.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/test/plugins/VersionedSourceConnector.java
new file mode 100644
index 00000000000..4ef066b4c8c
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-source-connector/test/plugins/VersionedSourceConnector.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+
+/**
+ * VersionedSamplingSourceConnector is a test connector that extends 
SamplingConnector and overrides the version method.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public class VersionedSourceConnector extends SourceConnector {
+
+    public VersionedSourceConnector() {
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return VersionedSourceConnectorTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            configs.add(Collections.singletonMap("task-config-version", 
"PLACEHOLDER_FOR_VERSION"));
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef()
+                // version specific config will have the defaul value 
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin 
compilation
+                // this will help with testing differnt configdef for 
different version of connector
+                .define("version-specific-config", ConfigDef.Type.STRING, 
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+                .define("other-config", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "other docs");
+    }
+
+    @Override
+    public String version() {
+        return "PLACEHOLDER_FOR_VERSION";
+    }
+
+    public static class VersionedSourceConnectorTask extends SourceTask {
+
+        @Override
+        public String version() {
+            return "PLACEHOLDER_FOR_VERSION";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+        }
+
+        @Override
+        public List<SourceRecord> poll() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void stop() {
+        }
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/META-INF/services/org.apache.kafka.connect.transforms.Transformation
 
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/META-INF/services/org.apache.kafka.connect.transforms.Transformation
new file mode 100644
index 00000000000..7fed78370ff
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/META-INF/services/org.apache.kafka.connect.transforms.Transformation
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.VersionedTransformation
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
new file mode 100644
index 00000000000..0422834d027
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+
+/**
+ * Transformation to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public class VersionedTransformation<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+
+
+    @Override
+    public R apply(R record) {
+        return null;
+    }
+
+    @Override
+    public String version() {
+        return "PLACEHOLDER_FOR_VERSION";
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef()
+                // version specific config will have the defaul value 
(PLACEHOLDER_FOR_VERSION) replaced with the actual version during plugin 
compilation
+                // this will help with testing differnt configdef for 
different version of the transformation
+                .define("version-specific-config", ConfigDef.Type.STRING, 
"PLACEHOLDER_FOR_VERSION", ConfigDef.Importance.HIGH, "version specific docs")
+                .define("other-config", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "other docs");
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+}

Reply via email to