This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f35c72c5dee KAFKA-20079: Improve Connect configurable components 
discoverability (#21330)
f35c72c5dee is described below

commit f35c72c5deefd9c18a66ba2fb2ea2cf37e9a76c5
Author: Fiore Mario Vitale <[email protected]>
AuthorDate: Fri Mar 13 10:18:40 2026 +0100

    KAFKA-20079: Improve Connect configurable components discoverability 
(#21330)
    
    This implements 
[KIP-1273](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1273%3A+Improve+Connect+configurable+components+discoverability)
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../kafka/connect/components/ConnectPlugin.java    | 45 ++++++++++++++++++++++
 .../kafka/connect/components/package-info.java     |  2 +-
 .../apache/kafka/connect/connector/Connector.java  |  5 ++-
 .../ConnectorClientConfigOverridePolicy.java       | 24 +++++++++++-
 .../kafka/connect/rest/ConnectRestExtension.java   | 15 +++++++-
 .../apache/kafka/connect/storage/Converter.java    |  9 ++++-
 .../kafka/connect/storage/HeaderConverter.java     | 14 ++++---
 .../connect/storage/SimpleHeaderConverter.java     |  3 +-
 .../kafka/connect/storage/StringConverter.java     |  3 +-
 .../kafka/connect/transforms/Transformation.java   | 15 ++++++--
 .../connect/transforms/predicates/Predicate.java   | 14 ++++---
 .../apache/kafka/connect/json/JsonConverter.java   |  3 +-
 ...bstractConnectorClientConfigOverridePolicy.java |  3 +-
 .../kafka/connect/converters/BooleanConverter.java |  3 +-
 .../connect/converters/ByteArrayConverter.java     |  3 +-
 .../kafka/connect/converters/NumberConverter.java  |  3 +-
 .../kafka/connect/runtime/isolation/Plugins.java   |  6 +++
 .../ConnectorValidationIntegrationTest.java        |  5 +++
 .../integration/ErrorHandlingIntegrationTest.java  |  3 +-
 .../kafka/connect/runtime/ConnectMetricsTest.java  | 15 ++++++++
 .../kafka/connect/runtime/ConnectorConfigTest.java | 19 ++++-----
 .../connect/runtime/ErrorHandlingTaskTest.java     |  5 +--
 .../runtime/SampleConverterWithHeaders.java        |  3 +-
 .../connect/runtime/SampleHeaderConverter.java     |  3 +-
 .../kafka/connect/runtime/SamplePredicate.java     |  3 +-
 .../connect/runtime/SampleTransformation.java      |  3 +-
 .../runtime/isolation/MultiVersionTest.java        | 28 +++++++-------
 .../connect/runtime/isolation/PluginUtilsTest.java |  7 ++--
 .../connect/runtime/isolation/PluginsTest.java     | 15 +-------
 .../plugins/DefaultConstructorThrowsConverter.java |  6 +++
 .../plugins/NoDefaultConstructorConverter.java     |  6 +++
 .../NoDefaultConstructorOverridePolicy.java        |  5 +++
 .../connect/converters/ByteArrayConverter.java     |  3 +-
 .../test/plugins/NonMigratedConverter.java         |  5 +++
 .../test/plugins/NonMigratedHeaderConverter.java   |  5 +++
 .../test/plugins/NonMigratedMultiPlugin.java       |  5 +++
 .../test/plugins/NonMigratedPredicate.java         |  5 +++
 .../test/plugins/NonMigratedTransformation.java    |  5 +++
 .../test/plugins/ReadVersionFromResource.java      |  3 +-
 .../test/plugins/ReadVersionFromResource.java      |  3 +-
 .../test/plugins/VersionedConverter.java           |  3 +-
 .../test/plugins/VersionedHeaderConverter.java     |  3 +-
 .../test/plugins/VersionedPredicate.java           |  3 +-
 .../test/plugins/VersionedTransformation.java      |  3 +-
 .../org/apache/kafka/connect/transforms/Cast.java  |  3 +-
 .../kafka/connect/transforms/DropHeaders.java      |  3 +-
 .../kafka/connect/transforms/ExtractField.java     |  3 +-
 .../apache/kafka/connect/transforms/Filter.java    |  3 +-
 .../apache/kafka/connect/transforms/Flatten.java   |  3 +-
 .../kafka/connect/transforms/HeaderFrom.java       |  3 +-
 .../kafka/connect/transforms/HoistField.java       |  3 +-
 .../kafka/connect/transforms/InsertField.java      |  3 +-
 .../kafka/connect/transforms/InsertHeader.java     |  3 +-
 .../apache/kafka/connect/transforms/MaskField.java |  3 +-
 .../kafka/connect/transforms/RegexRouter.java      |  3 +-
 .../kafka/connect/transforms/ReplaceField.java     |  3 +-
 .../connect/transforms/SetSchemaMetadata.java      |  3 +-
 .../connect/transforms/TimestampConverter.java     |  3 +-
 .../kafka/connect/transforms/TimestampRouter.java  |  3 +-
 .../kafka/connect/transforms/ValueToKey.java       |  3 +-
 .../transforms/predicates/HasHeaderKey.java        |  3 +-
 .../transforms/predicates/RecordIsTombstone.java   |  3 +-
 .../transforms/predicates/TopicNameMatches.java    |  3 +-
 63 files changed, 256 insertions(+), 143 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/components/ConnectPlugin.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/components/ConnectPlugin.java
new file mode 100644
index 00000000000..d538df4505b
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/components/ConnectPlugin.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.components;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+/**
+ * Interface for components that provide version and configuration 
specifications.
+ * This interface establishes a common contract for all Kafka Connect 
components
+ * that define a version and expose configurable properties, enabling uniform 
discovery and introspection
+ * of component configurations.
+ *
+ * <p>Components implementing this interface declare their version and 
configuration requirements
+ * through a {@link ConfigDef} object, which describes the configuration 
properties
+ * including their names, types, default values, validators, and documentation.
+ *
+ */
+public interface ConnectPlugin extends Versioned {
+
+    /**
+     * Returns the configuration specification for this component.
+     *
+     * <p>The returned {@link ConfigDef} object describes all configuration 
properties
+     * that this component accepts, including their types, default values, 
validators,
+     * importance levels, and documentation strings.
+     *
+     * @return the configuration definition for this component; never null
+     */
+    ConfigDef config();
+}
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/components/package-info.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/components/package-info.java
index 9f32051ae23..06268165b56 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/components/package-info.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/components/package-info.java
@@ -15,6 +15,6 @@
  * limitations under the License.
  */
 /**
- * Provides common interfaces used to describe pluggable components.
+ * Provides common interfaces used to describe pluggable, configurable and 
versioned components.
  */
 package org.apache.kafka.connect.components;
\ No newline at end of file
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 927e4170f77..a618ce072b0 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -19,7 +19,7 @@ package org.apache.kafka.connect.connector;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
-import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.components.ConnectPlugin;
 import org.apache.kafka.connect.errors.ConnectException;
 
 import java.util.List;
@@ -43,7 +43,7 @@ import java.util.Map;
  * Tasks.
  * </p>
  */
-public abstract class Connector implements Versioned {
+public abstract class Connector implements ConnectPlugin {
 
     protected ConnectorContext context;
 
@@ -149,5 +149,6 @@ public abstract class Connector implements Versioned {
      * Define the configuration for the connector.
      * @return The ConfigDef for this connector; may not be null.
      */
+    @Override
     public abstract ConfigDef config();
 }
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
index 25abb06bde1..beb758273b6 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
@@ -18,7 +18,9 @@
 package org.apache.kafka.connect.connector.policy;
 
 import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.components.ConnectPlugin;
 
 import java.util.List;
 
@@ -36,7 +38,7 @@ import java.util.List;
  * <code>connector.client.config.override.policy</code>, and 
<code>class</code> set to the
  * ConnectorClientConfigOverridePolicy class name.
  */
-public interface ConnectorClientConfigOverridePolicy extends Configurable, 
AutoCloseable {
+public interface ConnectorClientConfigOverridePolicy extends Configurable, 
AutoCloseable, ConnectPlugin {
 
 
     /**
@@ -52,4 +54,24 @@ public interface ConnectorClientConfigOverridePolicy extends 
Configurable, AutoC
                {@link ConfigValue#errorMessages() error} if the configuration 
is not allowed by the policy; never null
      */
     List<ConfigValue> validate(ConnectorClientConfigRequest 
connectorClientConfigRequest);
+
+    /**
+     * Configuration specification for this policy override.
+     *
+     * @return the configuration definition for this policy override; never 
null
+     */
+    @Override
+    default ConfigDef config() {
+        return new ConfigDef();
+    }
+
+    /**
+     * Get the version of this component.
+     *
+     * @return the version, formatted as a String. The version may not be 
{@code null} or empty.
+     */
+    @Override
+    default String version() {
+        return "undefined";
+    }
 }
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
index 5110a832459..eed69f861d4 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
@@ -18,7 +18,8 @@
 package org.apache.kafka.connect.rest;
 
 import org.apache.kafka.common.Configurable;
-import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.ConnectPlugin;
 import org.apache.kafka.connect.health.ConnectClusterState;
 
 import java.io.Closeable;
@@ -48,7 +49,7 @@ import java.util.Map;
  * The following tags are automatically added to all metrics registered: 
<code>config</code> set to
  * <code>rest.extension.classes</code>, and <code>class</code> set to the 
ConnectRestExtension class name.
  */
-public interface ConnectRestExtension extends Configurable, Versioned, 
Closeable {
+public interface ConnectRestExtension extends Configurable, ConnectPlugin, 
Closeable {
 
     /**
      * ConnectRestExtension implementations can register custom JAX-RS 
resources via this method. The Connect framework
@@ -60,4 +61,14 @@ public interface ConnectRestExtension extends Configurable, 
Versioned, Closeable
      *                          ConnectRestExtensionContext#configurable()}
      */
     void register(ConnectRestExtensionContext restPluginContext);
+
+    /**
+     * Configuration specification for this rest extension.
+     *
+     * @return the configuration definition for this rest extension; never null
+     */
+    @Override
+    default ConfigDef config() {
+        return new ConfigDef();
+    }
 }
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
index 1b2300e2330..2dfc2cb96d2 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.connect.components.ConnectPlugin;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 
@@ -37,7 +38,7 @@ import java.util.Map;
  * The following tags are automatically added to all metrics registered: 
<code>connector</code> set to connector name,
  * <code>task</code> set to the task id and <code>converter</code> set to 
either <code>key</code> or <code>value</code>.
  */
-public interface Converter extends Closeable {
+public interface Converter extends Closeable, ConnectPlugin {
 
     /**
      * Configure this class.
@@ -101,6 +102,7 @@ public interface Converter extends Closeable {
      * Configuration specification for this converter.
      * @return the configuration specification; may not be null
      */
+    @Override
     default ConfigDef config() {
         return new ConfigDef();
     }
@@ -109,4 +111,9 @@ public interface Converter extends Closeable {
     default void close() throws IOException  {
         // no op
     }
+
+    @Override
+    default String version() {
+        return "undefined";
+    }
 }
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
index 810905c0959..af99800431c 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.ConnectPlugin;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.header.Header;
@@ -36,7 +36,7 @@ import java.io.Closeable;
  * The following tags are automatically added to all metrics registered: 
<code>connector</code> set to connector name,
  * <code>task</code> set to the task id and <code>converter</code> set to 
<code>header</code>.
  */
-public interface HeaderConverter extends Configurable, Closeable {
+public interface HeaderConverter extends Configurable, Closeable, 
ConnectPlugin {
 
     /**
      * Convert the header name and byte array value into a {@link Header} 
object.
@@ -58,8 +58,12 @@ public interface HeaderConverter extends Configurable, 
Closeable {
     byte[] fromConnectHeader(String topic, String headerKey, Schema schema, 
Object value);
 
     /**
-     * Configuration specification for this set of header converters.
-     * @return the configuration specification; may not be null
+     * Get the version of this component.
+     *
+     * @return the version, formatted as a String. The version may not be 
{@code null} or empty.
      */
-    ConfigDef config();
+    @Override
+    default String version() {
+        return "undefined";
+    }
 }
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
index 3589beb5087..7a3b2343602 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Values;
@@ -37,7 +36,7 @@ import java.util.NoSuchElementException;
  * A {@link HeaderConverter} that serializes header values as strings and that 
deserializes header values to the most appropriate
  * numeric, boolean, array, or map representation. Schemas are not serialized, 
but are inferred upon deserialization when possible.
  */
-public class SimpleHeaderConverter implements HeaderConverter, Versioned {
+public class SimpleHeaderConverter implements HeaderConverter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SimpleHeaderConverter.class);
     private static final ConfigDef CONFIG_DEF = new ConfigDef();
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
index 35322669e40..3c025aafd66 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
@@ -22,7 +22,6 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.DataException;
@@ -42,7 +41,7 @@ import java.util.Map;
  * <p>
  * This implementation currently does nothing with the topic names or header 
keys.
  */
-public class StringConverter implements Converter, HeaderConverter, Versioned {
+public class StringConverter implements Converter, HeaderConverter {
 
     private final StringSerializer serializer = new StringSerializer();
     private final StringDeserializer deserializer = new StringDeserializer();
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
index 1902061089c..ce05318d421 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.ConnectPlugin;
 import org.apache.kafka.connect.connector.ConnectRecord;
 
 import java.io.Closeable;
@@ -36,7 +36,7 @@ import java.io.Closeable;
  *
  * @param <R> The type of record (must be an implementation of {@link 
ConnectRecord})
  */
-public interface Transformation<R extends ConnectRecord<R>> extends 
Configurable, Closeable {
+public interface Transformation<R extends ConnectRecord<R>> extends 
Configurable, Closeable, ConnectPlugin {
 
     /**
      * Apply transformation to the {@code record} and return another record 
object (which may be {@code record} itself)
@@ -54,8 +54,15 @@ public interface Transformation<R extends ConnectRecord<R>> 
extends Configurable
      */
     R apply(R record);
 
-    /** Configuration specification for this transformation. */
-    ConfigDef config();
+    /**
+     * Get the version of this component.
+     *
+     * @return the version, formatted as a String. The version may not be 
{@code null} or empty.
+     */
+    @Override
+    default String version() {
+        return "undefined";
+    }
 
     /** Signal that this transformation instance will no longer will be used. 
*/
     @Override
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
index c2942e8a630..c957364690f 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.connect.transforms.predicates;
 
 import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.ConnectPlugin;
 import org.apache.kafka.connect.connector.ConnectRecord;
 
 /**
@@ -37,14 +37,18 @@ import org.apache.kafka.connect.connector.ConnectRecord;
  *
  * @param <R> The type of record.
  */
-public interface Predicate<R extends ConnectRecord<R>> extends Configurable, 
AutoCloseable {
+public interface Predicate<R extends ConnectRecord<R>> extends Configurable, 
AutoCloseable, ConnectPlugin {
+
 
     /**
-     * Configuration specification for this predicate.
+     * Get the version of this component.
      *
-     * @return the configuration definition for this predicate; never null
+     * @return the version, formatted as a String. The version may not be 
{@code null} or empty.
      */
-    ConfigDef config();
+    @Override
+    default String version() {
+        return "undefined";
+    }
 
     /**
      * Returns whether the given record satisfies this predicate.
diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index dac2ce56741..9f5b62997a2 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.ConnectSchema;
 import org.apache.kafka.connect.data.Date;
 import org.apache.kafka.connect.data.Decimal;
@@ -63,7 +62,7 @@ import java.util.Set;
  * <p>
  * This implementation currently does nothing with the topic names or header 
keys.
  */
-public class JsonConverter implements Converter, HeaderConverter, Versioned {
+public class JsonConverter implements Converter, HeaderConverter {
 
     private static final Map<Schema.Type, JsonToConnectTypeConverter> 
TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class);
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
index 3c419663a42..fdbaba3e840 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
@@ -19,14 +19,13 @@ package org.apache.kafka.connect.connector.policy;
 
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public abstract class AbstractConnectorClientConfigOverridePolicy implements 
ConnectorClientConfigOverridePolicy, Versioned {
+public abstract class AbstractConnectorClientConfigOverridePolicy implements 
ConnectorClientConfigOverridePolicy {
 
     @Override
     public String version() {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java
index 3c030713021..5c94256cf91 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java
@@ -22,7 +22,6 @@ import 
org.apache.kafka.common.serialization.BooleanDeserializer;
 import org.apache.kafka.common.serialization.BooleanSerializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -42,7 +41,7 @@ import java.util.Map;
  * When converting from bytes to Kafka Connect format, the converter will 
always return an optional
  * BOOLEAN schema.
  */
-public class BooleanConverter implements Converter, HeaderConverter, Versioned 
{
+public class BooleanConverter implements Converter, HeaderConverter {
 
     private final BooleanSerializer serializer = new BooleanSerializer();
     private final BooleanDeserializer deserializer = new BooleanDeserializer();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
index ec934ad56cc..83838e91a1e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.converters;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.DataException;
@@ -35,7 +34,7 @@ import java.util.Map;
  * <p>
  * This implementation currently does nothing with the topic names or header 
keys.
  */
-public class ByteArrayConverter implements Converter, HeaderConverter, 
Versioned {
+public class ByteArrayConverter implements Converter, HeaderConverter {
 
     private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
index 9ab569bc4fe..0deb37d14e3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.DataException;
@@ -41,7 +40,7 @@ import java.util.Map;
  * <p>
  * This implementation currently does nothing with the topic names or header 
keys.
  */
-abstract class NumberConverter<T extends Number> implements Converter, 
HeaderConverter, Versioned {
+abstract class NumberConverter<T extends Number> implements Converter, 
HeaderConverter {
 
     private final Serializer<T> serializer;
     private final Deserializer<T> deserializer;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index cd0be18f71a..0a546c01e02 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.ConnectPlugin;
 import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
@@ -668,6 +669,11 @@ public class Plugins {
                     throw new ConnectException("Version not defined for '" + 
klassName + "'");
                 }
             }
+            if (plugin instanceof ConnectPlugin connectPlugin) {
+                if (Utils.isBlank(connectPlugin.version())) {
+                    throw new ConnectException("Version not defined for '" + 
klassName + "'");
+                }
+            }
             if (plugin instanceof Configurable) {
                 ((Configurable) plugin).configure(config.originals());
             }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
index eb8b59de015..703c4015609 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
@@ -497,6 +497,11 @@ public class ConnectorValidationIntegrationTest {
 
     public abstract static class TestConverter implements Converter, 
HeaderConverter {
 
+        @Override
+        public String version() {
+            return "";
+        }
+
         // Defined by both Converter and HeaderConverter interfaces
         @Override
         public ConfigDef config() {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 58caffa2b2f..fd7f14d122d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -288,7 +287,7 @@ public class ErrorHandlingIntegrationTest {
         assertEquals(expected, new String(actual));
     }
 
-    public static class FaultyPassthrough<R extends ConnectRecord<R>> 
implements Transformation<R>, Versioned {
+    public static class FaultyPassthrough<R extends ConnectRecord<R>> 
implements Transformation<R> {
 
         static final ConfigDef CONFIG_DEF = new ConfigDef();
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 8ba0316e20c..699cba17a39 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -243,6 +243,11 @@ public class ConnectMetricsTest {
             return null;
         }
 
+        @Override
+        public String version() {
+            return "test";
+        }
+
         @Override
         public ConfigDef config() {
             return Converter.super.config();
@@ -329,6 +334,11 @@ public class ConnectMetricsTest {
             return null;
         }
 
+        @Override
+        public String version() {
+            return "test";
+        }
+
         @Override
         public ConfigDef config() {
             return null;
@@ -373,6 +383,11 @@ public class ConnectMetricsTest {
         @Override
         public void configure(Map<String, ?> configs) { }
 
+        @Override
+        public String version() {
+            return "test";
+        }
+
         @Override
         public ConfigDef config() {
             return null;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 65b37892143..6e71e37d82c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@@ -61,7 +60,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
     public abstract static class TestConnector extends Connector {
     }
 
-    public static class SimpleTransformation<R extends ConnectRecord<R>> 
implements Transformation<R>, Versioned  {
+    public static class SimpleTransformation<R extends ConnectRecord<R>> 
implements Transformation<R>  {
 
         int magicNumber = 0;
 
@@ -399,7 +398,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         }
     }
 
-    public abstract static class AbstractTestPredicate<R extends 
ConnectRecord<R>> implements Predicate<R>, Versioned {
+    public abstract static class AbstractTestPredicate<R extends 
ConnectRecord<R>> implements Predicate<R> {
 
         @Override
         public String version() {
@@ -410,7 +409,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
 
     }
 
-    public abstract static class AbstractTransformation<R extends 
ConnectRecord<R>> implements Transformation<R>, Versioned  {
+    public abstract static class AbstractTransformation<R extends 
ConnectRecord<R>> implements Transformation<R>  {
 
         @Override
         public String version() {
@@ -419,7 +418,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
 
     }
 
-    public abstract static class AbstractKeyValueTransformation<R extends 
ConnectRecord<R>> implements Transformation<R>, Versioned  {
+    public abstract static class AbstractKeyValueTransformation<R extends 
ConnectRecord<R>> implements Transformation<R>  {
         @Override
         public R apply(R record) {
             return null;
@@ -446,14 +445,10 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         }
 
 
-        public static class Key<R extends ConnectRecord<R>> extends 
AbstractKeyValueTransformation<R> implements Versioned {
-
-            @Override
-            public String version() {
-                return "1.0";
-            }
+        public static class Key<R extends ConnectRecord<R>> extends 
AbstractKeyValueTransformation<R> {
 
         }
+
         public static class Value<R extends ConnectRecord<R>> extends 
AbstractKeyValueTransformation<R> {
 
         }
@@ -485,7 +480,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         assertEquals(expectedType, configKey.type, prefix + keyName + "' 
config should be a " + expectedType);
     }
 
-    public static class HasDuplicateConfigTransformation<R extends 
ConnectRecord<R>> implements Transformation<R>, Versioned {
+    public static class HasDuplicateConfigTransformation<R extends 
ConnectRecord<R>> implements Transformation<R> {
         private static final String MUST_EXIST_KEY = "must.exist.key";
         private static final ConfigDef CONFIG_DEF = new ConfigDef()
                 // this configDef is duplicate. It should be removed 
automatically so as to avoid duplicate config error.
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index a8e001544b3..a9e5f289732 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
@@ -488,7 +487,7 @@ public class ErrorHandlingTaskTest {
     }
 
     // Public to allow plugin discovery to complete without errors
-    public static class FaultyConverter extends JsonConverter implements 
Versioned {
+    public static class FaultyConverter extends JsonConverter {
         private static final Logger log = 
LoggerFactory.getLogger(FaultyConverter.class);
         private int invocations = 0;
 
@@ -513,7 +512,7 @@ public class ErrorHandlingTaskTest {
     }
 
     // Public to allow plugin discovery to complete without errors
-    public static class FaultyPassthrough<R extends ConnectRecord<R>> 
implements Transformation<R>, Versioned {
+    public static class FaultyPassthrough<R extends ConnectRecord<R>> 
implements Transformation<R> {
 
         private static final Logger log = 
LoggerFactory.getLogger(FaultyPassthrough.class);
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
index 19320736dc9..a110aabbc0c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.DataException;
@@ -31,7 +30,7 @@ import java.util.Map;
 /**
  * This is a simple Converter implementation that uses "encoding" header to 
encode/decode strings via provided charset name
  */
-public class SampleConverterWithHeaders implements Converter, Versioned {
+public class SampleConverterWithHeaders implements Converter {
     private static final String HEADER_ENCODING = "encoding";
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
index 3491fde3988..5a62a9cde55 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.HeaderConverter;
@@ -26,7 +25,7 @@ import org.apache.kafka.connect.storage.HeaderConverter;
 import java.io.IOException;
 import java.util.Map;
 
-public class SampleHeaderConverter implements HeaderConverter, Versioned {
+public class SampleHeaderConverter implements HeaderConverter {
     @Override
     public SchemaAndValue toConnectHeader(String topic, String headerKey, 
byte[] value) {
         return null;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
index 4d72df35ca3..ee8edfff95b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
@@ -18,13 +18,12 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 import java.util.Map;
 
-public class SamplePredicate implements Predicate<SourceRecord>, Versioned {
+public class SamplePredicate implements Predicate<SourceRecord> {
 
     private boolean testResult;
     boolean closed = false;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
index 443b488ef3b..001dc0de817 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
@@ -18,13 +18,12 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.transforms.Transformation;
 
 import java.util.Map;
 
-public class SampleTransformation<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public class SampleTransformation<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     boolean closed = false;
     private R transformedRecord;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
index f2606c5846e..ae1b6f429e1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.components.ConnectPlugin;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
@@ -61,8 +61,8 @@ public class MultiVersionTest {
                 Assertions.assertInstanceOf(PluginClassLoader.class, 
pluginLoader);
                 Assertions.assertTrue(((PluginClassLoader) 
pluginLoader).location().contains(pluginLocation));
                 Object p = plugins.newPlugin(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
-                Assertions.assertInstanceOf(Versioned.class, p);
-                Assertions.assertEquals(buildInfo.version(), ((Versioned) 
p).version());
+                Assertions.assertInstanceOf(ConnectPlugin.class, p);
+                Assertions.assertEquals(buildInfo.version(), ((ConnectPlugin) 
p).version());
             }
         }
     }
@@ -185,8 +185,8 @@ public class MultiVersionTest {
             String version = plugins.pluginVersion(className, connectorLoader, 
PluginType.values());
             Assertions.assertEquals(versions.get(i), version);
             Object p = plugins.newPlugin(className, null, connectorLoader);
-            Assertions.assertInstanceOf(Versioned.class, p);
-            Assertions.assertEquals(versions.get(i), ((Versioned) 
p).version());
+            Assertions.assertInstanceOf(ConnectPlugin.class, p);
+            Assertions.assertEquals(versions.get(i), ((ConnectPlugin) 
p).version());
 
             String latestVersion = plugins.latestVersion(className, 
PluginType.values());
             Assertions.assertEquals(DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION, 
latestVersion);
@@ -220,10 +220,10 @@ public class MultiVersionTest {
         for (Map.Entry<VersionRange, String> entry : 
requiredVersions.entrySet()) {
             for (VersionedPluginBuilder.VersionedTestPlugin pluginType: 
VersionedPluginBuilder.VersionedTestPlugin.values()) {
                 Object p = plugins.newPlugin(pluginType.className(), 
entry.getKey());
-                Assertions.assertInstanceOf(Versioned.class, p);
-                Assertions.assertEquals(entry.getValue(), ((Versioned) 
p).version(),
+                Assertions.assertInstanceOf(ConnectPlugin.class, p);
+                Assertions.assertEquals(entry.getValue(), ((ConnectPlugin) 
p).version(),
                     String.format("Provided Version Range %s for class %s 
should return plugin version %s instead of %s",
-                        entry.getKey(), pluginType.className(), 
entry.getValue(), ((Versioned) p).version()));
+                        entry.getKey(), pluginType.className(), 
entry.getValue(), ((ConnectPlugin) p).version()));
             }
         }
     }
@@ -276,17 +276,17 @@ public class MultiVersionTest {
 
         Converter keyConverter = plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
         Assertions.assertEquals(keyConverter.getClass().getName(), 
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
-        Assertions.assertInstanceOf(Versioned.class, keyConverter);
-        Assertions.assertEquals("1.1.0", ((Versioned) keyConverter).version());
+        Assertions.assertInstanceOf(ConnectPlugin.class, keyConverter);
+        Assertions.assertEquals("1.1.0", keyConverter.version());
 
         Converter valueConverter = plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION);
         Assertions.assertEquals(valueConverter.getClass().getName(), 
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
-        Assertions.assertInstanceOf(Versioned.class, valueConverter);
-        Assertions.assertEquals("2.3.0", ((Versioned) 
valueConverter).version());
+        Assertions.assertInstanceOf(ConnectPlugin.class, valueConverter);
+        Assertions.assertEquals("2.3.0", valueConverter.version());
 
         HeaderConverter headerConverter = plugins.newHeaderConverter(config, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION);
         Assertions.assertEquals(headerConverter.getClass().getName(), 
VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
-        Assertions.assertInstanceOf(Versioned.class, headerConverter);
-        Assertions.assertEquals("4.3.0", ((Versioned) 
headerConverter).version());
+        Assertions.assertInstanceOf(ConnectPlugin.class, headerConverter);
+        Assertions.assertEquals("4.3.0", headerConverter.version());
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 24ef0d535b8..31e6308b44e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -602,7 +601,7 @@ public class PluginUtilsTest {
         assertEquals(expectedAliases, actualAliases);
     }
 
-    public static class CollidingConverter implements Converter, Versioned {
+    public static class CollidingConverter implements Converter {
         @Override
         public void configure(Map<String, ?> configs, boolean isKey) {
         }
@@ -623,7 +622,7 @@ public class PluginUtilsTest {
         }
     }
 
-    public static class CollidingHeaderConverter implements HeaderConverter, 
Versioned {
+    public static class CollidingHeaderConverter implements HeaderConverter {
 
         @Override
         public SchemaAndValue toConnectHeader(String topic, String headerKey, 
byte[] value) {
@@ -654,7 +653,7 @@ public class PluginUtilsTest {
         }
     }
 
-    public static class Colliding<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+    public static class Colliding<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
         @Override
         public String version() {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 4b144288a54..23fbce00a35 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -690,7 +689,7 @@ public class PluginsTest {
         }
     }
 
-    public static class TestConverter implements Converter, Configurable, 
Versioned {
+    public static class TestConverter implements Converter, Configurable {
         public Map<String, ?> configs;
 
         public ConfigDef config() {
@@ -717,11 +716,6 @@ public class PluginsTest {
         public SchemaAndValue toConnectData(String topic, byte[] value) {
             return null;
         }
-
-        @Override
-        public String version() {
-            return "test";
-        }
     }
 
     public static class TestHeaderConverter implements HeaderConverter {
@@ -777,14 +771,9 @@ public class PluginsTest {
         }
     }
 
-    public static class TestInternalConverter extends JsonConverter implements 
Versioned {
+    public static class TestInternalConverter extends JsonConverter {
         public Map<String, ?> configs;
 
-        @Override
-        public String version() {
-            return "test";
-        }
-
         @Override
         public void configure(Map<String, ?> configs) {
             this.configs = configs;
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java
index 9888b454898..24295ddf59a 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.Converter;
@@ -74,4 +75,9 @@ public class DefaultConstructorThrowsConverter implements 
Converter, HeaderConve
     public ConfigDef config() {
         return new ConfigDef();
     }
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
 }
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java
index bfdfcaa390b..526487d6df8 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.Converter;
@@ -73,4 +74,9 @@ public class NoDefaultConstructorConverter implements 
Converter, HeaderConverter
     public ConfigDef config() {
         return new ConfigDef();
     }
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
 }
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorOverridePolicy.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorOverridePolicy.java
index 8b451017e0b..a9fc73d180e 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorOverridePolicy.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorOverridePolicy.java
@@ -50,4 +50,9 @@ public class NoDefaultConstructorOverridePolicy implements 
ConnectorClientConfig
     public List<ConfigValue> validate(ConnectorClientConfigRequest 
connectorClientConfigRequest) {
         return null;
     }
+
+    @Override
+    public String version() {
+        return "1.0.0";
+    }
 }
diff --git 
a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
index 699d71635a0..f21d4b5e459 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.converters;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -32,7 +31,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
-public class ByteArrayConverter implements Converter, HeaderConverter, 
Versioned {
+public class ByteArrayConverter implements Converter, HeaderConverter {
 
     private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
     @Override
diff --git 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
index d2be39384ad..6879068f379 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
@@ -43,4 +43,9 @@ public final class NonMigratedConverter implements Converter {
   public SchemaAndValue toConnectData(final String topic, final byte[] value) {
     return null;
   }
+
+    @Override
+    public String version() {
+        return "1.0.0";
+    }
 }
diff --git 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
index 9e013303a56..0adc0ede536 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
@@ -54,4 +54,9 @@ public class NonMigratedHeaderConverter implements 
HeaderConverter {
   @Override
   public void configure(Map<String, ?> configs) {
   }
+
+    @Override
+    public String version() {
+        return "1.0.0";
+    }
 }
diff --git 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
index a82b82f2974..ea7e1aa7ceb 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
@@ -93,4 +93,9 @@ public final class NonMigratedMultiPlugin implements 
Converter, HeaderConverter,
   public List<ConfigValue> validate(ConnectorClientConfigRequest 
connectorClientConfigRequest) {
     return null;
   }
+
+    @Override
+    public String version() {
+        return "1.0.0";
+    }
 }
diff --git 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
index 3465d0b4633..730c7c3281b 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
@@ -48,4 +48,9 @@ public class NonMigratedPredicate implements Predicate {
     @Override
     public void close() {
     }
+
+    @Override
+    public String version() {
+        return "1.0.0";
+    }
 }
diff --git 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
index c3a732568f7..a0ccae77fda 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
@@ -47,4 +47,9 @@ public class NonMigratedTransformation implements 
Transformation {
     @Override
     public void close() {
     }
+
+    @Override
+    public String version() {
+        return "1.0.0";
+    }
 }
diff --git 
a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
 
b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
index f68b4eb4e58..6a4936ce599 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
@@ -31,7 +31,6 @@ import java.net.URL;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.components.Versioned;
 
 /**
  * Fake plugin class for testing classloading isolation
@@ -40,7 +39,7 @@ import org.apache.kafka.connect.components.Versioned;
  * Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, 
Schema, Object)}
  * and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
  */
-public class ReadVersionFromResource implements Converter, Versioned {
+public class ReadVersionFromResource implements Converter {
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
 
diff --git 
a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
 
b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
index 863ed9fad97..9ccce1920c5 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
@@ -31,7 +31,6 @@ import java.net.URL;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.components.Versioned;
 
 /**
  * Fake plugin class for testing classloading isolation.
@@ -40,7 +39,7 @@ import org.apache.kafka.connect.components.Versioned;
  * Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, 
Schema, Object)}
  * and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
  */
-public class ReadVersionFromResource implements Converter, Versioned {
+public class ReadVersionFromResource implements Converter {
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
 
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
index 766f29330ef..c209b79b6f4 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
@@ -20,7 +20,6 @@ package test.plugins;
 import java.util.Map;
 
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.Converter;
@@ -29,7 +28,7 @@ import org.apache.kafka.connect.storage.Converter;
  * Converter to test multiverioning of plugins.
  * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
  */
-public class VersionedConverter implements Converter, Versioned {
+public class VersionedConverter implements Converter {
 
     public VersionedConverter() {
         super();
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
index c0ef947e669..aac1a110107 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
@@ -18,7 +18,6 @@
 package test.plugins;
 
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.HeaderConverter;
@@ -29,7 +28,7 @@ import java.util.Map;
  * Header Converter to test multiverioning of plugins.
  * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
  */
-public class VersionedHeaderConverter implements HeaderConverter, Versioned {
+public class VersionedHeaderConverter implements HeaderConverter {
 
     public VersionedHeaderConverter() {
         super();
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
index 2e92c79c351..d6bd62263bc 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
@@ -19,7 +19,6 @@ package test.plugins;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
@@ -30,7 +29,7 @@ import java.util.Map;
  * Predicate to test multiverioning of plugins.
  * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
  */
-public class VersionedPredicate<R extends ConnectRecord<R>> implements 
Predicate<R>, Versioned {
+public class VersionedPredicate<R extends ConnectRecord<R>> implements 
Predicate<R> {
 
     @Override
     public String version() {
diff --git 
a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
 
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
index 0422834d027..ab4a0e34000 100644
--- 
a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
+++ 
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
@@ -19,7 +19,6 @@ package test.plugins;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.transforms.Transformation;
 
@@ -29,7 +28,7 @@ import java.util.Map;
  * Transformation to test multiverioning of plugins.
  * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
  */
-public class VersionedTransformation<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public class VersionedTransformation<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
 
     @Override
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 7c13ef4d785..02e89aa818f 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.ConnectSchema;
 import org.apache.kafka.connect.data.Date;
@@ -55,7 +54,7 @@ import java.util.Set;
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
 
-public abstract class Cast<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class Cast<R extends ConnectRecord<R>> implements 
Transformation<R> {
     private static final Logger log = LoggerFactory.getLogger(Cast.class);
 
     // TODO: Currently we only support top-level field casting. Ideally we 
could use a dotted notation in the spec to
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
index cd87c33a509..13f005f9115 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.header.Header;
@@ -31,7 +30,7 @@ import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
 
-public class DropHeaders<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public class DropHeaders<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC =
             "Removes one or more headers from each record.";
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
index 89f6d28c5ee..d3287ab7499 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -32,7 +31,7 @@ import java.util.Map;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
-public abstract class ExtractField<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class ExtractField<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC =
             "Extract the specified field from a Struct when schema present, or 
a Map in the case of schemaless data. "
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
index 80fdcbf7e9c..059ca456616 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 
 import java.util.Map;
@@ -29,7 +28,7 @@ import java.util.Map;
  * a particular {@link 
org.apache.kafka.connect.transforms.predicates.Predicate}.
  * @param <R> The type of record.
  */
-public class Filter<R extends ConnectRecord<R>> implements Transformation<R>, 
Versioned {
+public class Filter<R extends ConnectRecord<R>> implements Transformation<R> {
 
     public static final String OVERVIEW_DOC = "Drops all records, filtering 
them from subsequent transformations in the chain. " +
             "This is intended to be used conditionally to filter out records 
matching (or not matching) " +
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index 985902a9bd4..c0e54e44fb8 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.ConnectSchema;
 import org.apache.kafka.connect.data.Field;
@@ -39,7 +38,7 @@ import java.util.Map;
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
-public abstract class Flatten<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class Flatten<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC =
             "Flatten a nested data structure, generating names for each field 
by concatenating the field names at each "
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
index 8b9030066ae..d2dbb176154 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -42,7 +41,7 @@ import java.util.Map;
 import static java.lang.String.format;
 import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
 
-public abstract class HeaderFrom<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String FIELDS_FIELD = "fields";
     public static final String HEADERS_FIELD = "headers";
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
index 9924571825f..7b4dfc29130 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
@@ -31,7 +30,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
 import java.util.HashMap;
 import java.util.Map;
 
-public abstract class HoistField<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class HoistField<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC =
             "Wrap data using the specified field name in a Struct when schema 
present, or a Map in the case of schemaless data."
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
index 695dc2ab55d..428fcb5d3e0 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -40,7 +39,7 @@ import static 
org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireSinkRecord;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
 
-public abstract class InsertField<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class InsertField<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC =
             "Insert field(s) using attributes from the record metadata or a 
configured static value."
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
index ce218705fb3..d22a680fcb2 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Values;
@@ -29,7 +28,7 @@ import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
 
-public class InsertHeader<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public class InsertHeader<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC =
             "Add a header to each record.";
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
index 7d37d548eb4..99ece9be309 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -41,7 +40,7 @@ import java.util.function.Function;
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
 
-public abstract class MaskField<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class MaskField<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC =
             "Mask specified fields with a valid null value for the field type 
(i.e. 0, false, empty string, and so on)."
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
index ae1700efdcc..d91189ce1f8 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.transforms.util.RegexValidator;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -30,7 +29,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class RegexRouter<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public class RegexRouter<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     private static final Logger log = 
LoggerFactory.getLogger(RegexRouter.class);
 
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index 7e8f6700bf6..ca6bbb73c58 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -40,7 +39,7 @@ import java.util.Set;
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
 
-public abstract class ReplaceField<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class ReplaceField<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC = "Filter or rename fields."
             + "<p/>Use the concrete transformation type designed for the 
record key (<code>" + Key.class.getName() + "</code>) "
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
index 4a94dd1cddc..9d24f9a1c13 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.transforms;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.ConnectSchema;
 import org.apache.kafka.connect.data.Field;
@@ -34,7 +33,7 @@ import java.util.Map;
 
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireSchema;
 
-public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements 
Transformation<R> {
     private static final Logger log = 
LoggerFactory.getLogger(SetSchemaMetadata.class);
 
     public static final String OVERVIEW_DOC =
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 940bb6045a9..95c97eb6bad 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -49,7 +48,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
-public abstract class TimestampConverter<R extends ConnectRecord<R>> 
implements Transformation<R>, Versioned {
+public abstract class TimestampConverter<R extends ConnectRecord<R>> 
implements Transformation<R> {
 
     public static final String OVERVIEW_DOC =
             "Convert timestamps between different formats such as Unix epoch, 
strings, and Connect Date/Timestamp types."
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
index bc94964e659..6739a159464 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -30,7 +29,7 @@ import java.util.TimeZone;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class TimestampRouter<R extends ConnectRecord<R>> implements 
Transformation<R>, AutoCloseable, Versioned {
+public class TimestampRouter<R extends ConnectRecord<R>> implements 
Transformation<R>, AutoCloseable {
 
     private static final Pattern TOPIC = Pattern.compile("${topic}", 
Pattern.LITERAL);
 
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
index 19c299e6867..e8a57f35525 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -37,7 +36,7 @@ import java.util.Map;
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
 
-public class ValueToKey<R extends ConnectRecord<R>> implements 
Transformation<R>, Versioned {
+public class ValueToKey<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
     public static final String OVERVIEW_DOC = "Replace the record key with a 
new key formed from a subset of fields in the record value.";
 
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
index 566bdfab238..c393b1308a9 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms.predicates;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -30,7 +29,7 @@ import java.util.Map;
  * A predicate which is true for records with at least one header with the 
configured name.
  * @param <R> The type of connect record.
  */
-public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R>, 
Versioned {
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
 
     private static final String NAME_CONFIG = "name";
     public static final String OVERVIEW_DOC = "A predicate which is true for 
records with at least one header with the configured name.";
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
index 848a8452b28..b709a83b093 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms.predicates;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 
 import java.util.Map;
@@ -27,7 +26,7 @@ import java.util.Map;
  * A predicate which is true for records which are tombstones (i.e. have null 
value).
  * @param <R> The type of connect record.
  */
-public class RecordIsTombstone<R extends ConnectRecord<R>> implements 
Predicate<R>, Versioned {
+public class RecordIsTombstone<R extends ConnectRecord<R>> implements 
Predicate<R> {
 
     public static final String OVERVIEW_DOC = "A predicate which is true for 
records which are tombstones (i.e. have null value).";
     public static final ConfigDef CONFIG_DEF = new ConfigDef();
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
index bcd519eba5a..cd358b2642b 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.transforms.predicates;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.transforms.util.RegexValidator;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -32,7 +31,7 @@ import java.util.regex.PatternSyntaxException;
  * A predicate which is true for records with a topic name that matches the 
configured regular expression.
  * @param <R> The type of connect record.
  */
-public class TopicNameMatches<R extends ConnectRecord<R>> implements 
Predicate<R>, Versioned {
+public class TopicNameMatches<R extends ConnectRecord<R>> implements 
Predicate<R> {
 
     private static final String PATTERN_CONFIG = "pattern";
 


Reply via email to