This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f35c72c5dee KAFKA-20079: Improve Connect configurable components
discoverability (#21330)
f35c72c5dee is described below
commit f35c72c5deefd9c18a66ba2fb2ea2cf37e9a76c5
Author: Fiore Mario Vitale <[email protected]>
AuthorDate: Fri Mar 13 10:18:40 2026 +0100
KAFKA-20079: Improve Connect configurable components discoverability
(#21330)
This implements
[KIP-1273](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1273%3A+Improve+Connect+configurable+components+discoverability)
Reviewers: Mickael Maison <[email protected]>
---
.../kafka/connect/components/ConnectPlugin.java | 45 ++++++++++++++++++++++
.../kafka/connect/components/package-info.java | 2 +-
.../apache/kafka/connect/connector/Connector.java | 5 ++-
.../ConnectorClientConfigOverridePolicy.java | 24 +++++++++++-
.../kafka/connect/rest/ConnectRestExtension.java | 15 +++++++-
.../apache/kafka/connect/storage/Converter.java | 9 ++++-
.../kafka/connect/storage/HeaderConverter.java | 14 ++++---
.../connect/storage/SimpleHeaderConverter.java | 3 +-
.../kafka/connect/storage/StringConverter.java | 3 +-
.../kafka/connect/transforms/Transformation.java | 15 ++++++--
.../connect/transforms/predicates/Predicate.java | 14 ++++---
.../apache/kafka/connect/json/JsonConverter.java | 3 +-
...bstractConnectorClientConfigOverridePolicy.java | 3 +-
.../kafka/connect/converters/BooleanConverter.java | 3 +-
.../connect/converters/ByteArrayConverter.java | 3 +-
.../kafka/connect/converters/NumberConverter.java | 3 +-
.../kafka/connect/runtime/isolation/Plugins.java | 6 +++
.../ConnectorValidationIntegrationTest.java | 5 +++
.../integration/ErrorHandlingIntegrationTest.java | 3 +-
.../kafka/connect/runtime/ConnectMetricsTest.java | 15 ++++++++
.../kafka/connect/runtime/ConnectorConfigTest.java | 19 ++++-----
.../connect/runtime/ErrorHandlingTaskTest.java | 5 +--
.../runtime/SampleConverterWithHeaders.java | 3 +-
.../connect/runtime/SampleHeaderConverter.java | 3 +-
.../kafka/connect/runtime/SamplePredicate.java | 3 +-
.../connect/runtime/SampleTransformation.java | 3 +-
.../runtime/isolation/MultiVersionTest.java | 28 +++++++-------
.../connect/runtime/isolation/PluginUtilsTest.java | 7 ++--
.../connect/runtime/isolation/PluginsTest.java | 15 +-------
.../plugins/DefaultConstructorThrowsConverter.java | 6 +++
.../plugins/NoDefaultConstructorConverter.java | 6 +++
.../NoDefaultConstructorOverridePolicy.java | 5 +++
.../connect/converters/ByteArrayConverter.java | 3 +-
.../test/plugins/NonMigratedConverter.java | 5 +++
.../test/plugins/NonMigratedHeaderConverter.java | 5 +++
.../test/plugins/NonMigratedMultiPlugin.java | 5 +++
.../test/plugins/NonMigratedPredicate.java | 5 +++
.../test/plugins/NonMigratedTransformation.java | 5 +++
.../test/plugins/ReadVersionFromResource.java | 3 +-
.../test/plugins/ReadVersionFromResource.java | 3 +-
.../test/plugins/VersionedConverter.java | 3 +-
.../test/plugins/VersionedHeaderConverter.java | 3 +-
.../test/plugins/VersionedPredicate.java | 3 +-
.../test/plugins/VersionedTransformation.java | 3 +-
.../org/apache/kafka/connect/transforms/Cast.java | 3 +-
.../kafka/connect/transforms/DropHeaders.java | 3 +-
.../kafka/connect/transforms/ExtractField.java | 3 +-
.../apache/kafka/connect/transforms/Filter.java | 3 +-
.../apache/kafka/connect/transforms/Flatten.java | 3 +-
.../kafka/connect/transforms/HeaderFrom.java | 3 +-
.../kafka/connect/transforms/HoistField.java | 3 +-
.../kafka/connect/transforms/InsertField.java | 3 +-
.../kafka/connect/transforms/InsertHeader.java | 3 +-
.../apache/kafka/connect/transforms/MaskField.java | 3 +-
.../kafka/connect/transforms/RegexRouter.java | 3 +-
.../kafka/connect/transforms/ReplaceField.java | 3 +-
.../connect/transforms/SetSchemaMetadata.java | 3 +-
.../connect/transforms/TimestampConverter.java | 3 +-
.../kafka/connect/transforms/TimestampRouter.java | 3 +-
.../kafka/connect/transforms/ValueToKey.java | 3 +-
.../transforms/predicates/HasHeaderKey.java | 3 +-
.../transforms/predicates/RecordIsTombstone.java | 3 +-
.../transforms/predicates/TopicNameMatches.java | 3 +-
63 files changed, 256 insertions(+), 143 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/components/ConnectPlugin.java
b/connect/api/src/main/java/org/apache/kafka/connect/components/ConnectPlugin.java
new file mode 100644
index 00000000000..d538df4505b
--- /dev/null
+++
b/connect/api/src/main/java/org/apache/kafka/connect/components/ConnectPlugin.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.components;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+/**
+ * Interface for components that provide version and configuration
specifications.
+ * This interface establishes a common contract for all Kafka Connect
components
+ * that define a version and expose configurable properties, enabling uniform
discovery and introspection
+ * of component configurations.
+ *
+ * <p>Components implementing this interface declare their version and
configuration requirements
+ * through a {@link ConfigDef} object, which describes the configuration
properties
+ * including their names, types, default values, validators, and documentation.
+ *
+ */
+public interface ConnectPlugin extends Versioned {
+
+ /**
+ * Returns the configuration specification for this component.
+ *
+ * <p>The returned {@link ConfigDef} object describes all configuration
properties
+ * that this component accepts, including their types, default values,
validators,
+ * importance levels, and documentation strings.
+ *
+ * @return the configuration definition for this component; never null
+ */
+ ConfigDef config();
+}
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/components/package-info.java
b/connect/api/src/main/java/org/apache/kafka/connect/components/package-info.java
index 9f32051ae23..06268165b56 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/components/package-info.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/components/package-info.java
@@ -15,6 +15,6 @@
* limitations under the License.
*/
/**
- * Provides common interfaces used to describe pluggable components.
+ * Provides common interfaces used to describe pluggable, configurable and
versioned components.
*/
package org.apache.kafka.connect.components;
\ No newline at end of file
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 927e4170f77..a618ce072b0 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -19,7 +19,7 @@ package org.apache.kafka.connect.connector;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
-import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.components.ConnectPlugin;
import org.apache.kafka.connect.errors.ConnectException;
import java.util.List;
@@ -43,7 +43,7 @@ import java.util.Map;
* Tasks.
* </p>
*/
-public abstract class Connector implements Versioned {
+public abstract class Connector implements ConnectPlugin {
protected ConnectorContext context;
@@ -149,5 +149,6 @@ public abstract class Connector implements Versioned {
* Define the configuration for the connector.
* @return The ConfigDef for this connector; may not be null.
*/
+ @Override
public abstract ConfigDef config();
}
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
index 25abb06bde1..beb758273b6 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
@@ -18,7 +18,9 @@
package org.apache.kafka.connect.connector.policy;
import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.components.ConnectPlugin;
import java.util.List;
@@ -36,7 +38,7 @@ import java.util.List;
* <code>connector.client.config.override.policy</code>, and
<code>class</code> set to the
* ConnectorClientConfigOverridePolicy class name.
*/
-public interface ConnectorClientConfigOverridePolicy extends Configurable,
AutoCloseable {
+public interface ConnectorClientConfigOverridePolicy extends Configurable,
AutoCloseable, ConnectPlugin {
/**
@@ -52,4 +54,24 @@ public interface ConnectorClientConfigOverridePolicy extends
Configurable, AutoC
{@link ConfigValue#errorMessages() error} if the configuration
is not allowed by the policy; never null
*/
List<ConfigValue> validate(ConnectorClientConfigRequest
connectorClientConfigRequest);
+
+ /**
+ * Configuration specification for this policy override.
+ *
+ * @return the configuration definition for this policy override; never
null
+ */
+ @Override
+ default ConfigDef config() {
+ return new ConfigDef();
+ }
+
+ /**
+ * Get the version of this component.
+ *
+ * @return the version, formatted as a String. The version may not be
{@code null} or empty.
+ */
+ @Override
+ default String version() {
+ return "undefined";
+ }
}
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
index 5110a832459..eed69f861d4 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
@@ -18,7 +18,8 @@
package org.apache.kafka.connect.rest;
import org.apache.kafka.common.Configurable;
-import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.ConnectPlugin;
import org.apache.kafka.connect.health.ConnectClusterState;
import java.io.Closeable;
@@ -48,7 +49,7 @@ import java.util.Map;
* The following tags are automatically added to all metrics registered:
<code>config</code> set to
* <code>rest.extension.classes</code>, and <code>class</code> set to the
ConnectRestExtension class name.
*/
-public interface ConnectRestExtension extends Configurable, Versioned,
Closeable {
+public interface ConnectRestExtension extends Configurable, ConnectPlugin,
Closeable {
/**
* ConnectRestExtension implementations can register custom JAX-RS
resources via this method. The Connect framework
@@ -60,4 +61,14 @@ public interface ConnectRestExtension extends Configurable,
Versioned, Closeable
* ConnectRestExtensionContext#configurable()}
*/
void register(ConnectRestExtensionContext restPluginContext);
+
+ /**
+ * Configuration specification for this rest extension.
+ *
+ * @return the configuration definition for this rest extension; never null
+ */
+ @Override
+ default ConfigDef config() {
+ return new ConfigDef();
+ }
}
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
index 1b2300e2330..2dfc2cb96d2 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.connect.components.ConnectPlugin;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -37,7 +38,7 @@ import java.util.Map;
* The following tags are automatically added to all metrics registered:
<code>connector</code> set to connector name,
* <code>task</code> set to the task id and <code>converter</code> set to
either <code>key</code> or <code>value</code>.
*/
-public interface Converter extends Closeable {
+public interface Converter extends Closeable, ConnectPlugin {
/**
* Configure this class.
@@ -101,6 +102,7 @@ public interface Converter extends Closeable {
* Configuration specification for this converter.
* @return the configuration specification; may not be null
*/
+ @Override
default ConfigDef config() {
return new ConfigDef();
}
@@ -109,4 +111,9 @@ public interface Converter extends Closeable {
default void close() throws IOException {
// no op
}
+
+ @Override
+ default String version() {
+ return "undefined";
+ }
}
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
index 810905c0959..af99800431c 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
@@ -17,7 +17,7 @@
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.ConnectPlugin;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.Header;
@@ -36,7 +36,7 @@ import java.io.Closeable;
* The following tags are automatically added to all metrics registered:
<code>connector</code> set to connector name,
* <code>task</code> set to the task id and <code>converter</code> set to
<code>header</code>.
*/
-public interface HeaderConverter extends Configurable, Closeable {
+public interface HeaderConverter extends Configurable, Closeable,
ConnectPlugin {
/**
* Convert the header name and byte array value into a {@link Header}
object.
@@ -58,8 +58,12 @@ public interface HeaderConverter extends Configurable,
Closeable {
byte[] fromConnectHeader(String topic, String headerKey, Schema schema,
Object value);
/**
- * Configuration specification for this set of header converters.
- * @return the configuration specification; may not be null
+ * Get the version of this component.
+ *
+ * @return the version, formatted as a String. The version may not be
{@code null} or empty.
*/
- ConfigDef config();
+ @Override
+ default String version() {
+ return "undefined";
+ }
}
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
index 3589beb5087..7a3b2343602 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Values;
@@ -37,7 +36,7 @@ import java.util.NoSuchElementException;
* A {@link HeaderConverter} that serializes header values as strings and that
deserializes header values to the most appropriate
* numeric, boolean, array, or map representation. Schemas are not serialized,
but are inferred upon deserialization when possible.
*/
-public class SimpleHeaderConverter implements HeaderConverter, Versioned {
+public class SimpleHeaderConverter implements HeaderConverter {
private static final Logger LOG =
LoggerFactory.getLogger(SimpleHeaderConverter.class);
private static final ConfigDef CONFIG_DEF = new ConfigDef();
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
index 35322669e40..3c025aafd66 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
@@ -22,7 +22,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -42,7 +41,7 @@ import java.util.Map;
* <p>
* This implementation currently does nothing with the topic names or header
keys.
*/
-public class StringConverter implements Converter, HeaderConverter, Versioned {
+public class StringConverter implements Converter, HeaderConverter {
private final StringSerializer serializer = new StringSerializer();
private final StringDeserializer deserializer = new StringDeserializer();
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
b/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
index 1902061089c..ce05318d421 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
@@ -17,7 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.ConnectPlugin;
import org.apache.kafka.connect.connector.ConnectRecord;
import java.io.Closeable;
@@ -36,7 +36,7 @@ import java.io.Closeable;
*
* @param <R> The type of record (must be an implementation of {@link
ConnectRecord})
*/
-public interface Transformation<R extends ConnectRecord<R>> extends
Configurable, Closeable {
+public interface Transformation<R extends ConnectRecord<R>> extends
Configurable, Closeable, ConnectPlugin {
/**
* Apply transformation to the {@code record} and return another record
object (which may be {@code record} itself)
@@ -54,8 +54,15 @@ public interface Transformation<R extends ConnectRecord<R>>
extends Configurable
*/
R apply(R record);
- /** Configuration specification for this transformation. */
- ConfigDef config();
+ /**
+ * Get the version of this component.
+ *
+ * @return the version, formatted as a String. The version may not be
{@code null} or empty.
+ */
+ @Override
+ default String version() {
+ return "undefined";
+ }
/** Signal that this transformation instance will no longer will be used.
*/
@Override
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
b/connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
index c2942e8a630..c957364690f 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
@@ -17,7 +17,7 @@
package org.apache.kafka.connect.transforms.predicates;
import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.ConnectPlugin;
import org.apache.kafka.connect.connector.ConnectRecord;
/**
@@ -37,14 +37,18 @@ import org.apache.kafka.connect.connector.ConnectRecord;
*
* @param <R> The type of record.
*/
-public interface Predicate<R extends ConnectRecord<R>> extends Configurable,
AutoCloseable {
+public interface Predicate<R extends ConnectRecord<R>> extends Configurable,
AutoCloseable, ConnectPlugin {
+
/**
- * Configuration specification for this predicate.
+ * Get the version of this component.
*
- * @return the configuration definition for this predicate; never null
+ * @return the version, formatted as a String. The version may not be
{@code null} or empty.
*/
- ConfigDef config();
+ @Override
+ default String version() {
+ return "undefined";
+ }
/**
* Returns whether the given record satisfies this predicate.
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index dac2ce56741..9f5b62997a2 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
@@ -63,7 +62,7 @@ import java.util.Set;
* <p>
* This implementation currently does nothing with the topic names or header
keys.
*/
-public class JsonConverter implements Converter, HeaderConverter, Versioned {
+public class JsonConverter implements Converter, HeaderConverter {
private static final Map<Schema.Type, JsonToConnectTypeConverter>
TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
index 3c419663a42..fdbaba3e840 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
@@ -19,14 +19,13 @@ package org.apache.kafka.connect.connector.policy;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public abstract class AbstractConnectorClientConfigOverridePolicy implements
ConnectorClientConfigOverridePolicy, Versioned {
+public abstract class AbstractConnectorClientConfigOverridePolicy implements
ConnectorClientConfigOverridePolicy {
@Override
public String version() {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java
index 3c030713021..5c94256cf91 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java
@@ -22,7 +22,6 @@ import
org.apache.kafka.common.serialization.BooleanDeserializer;
import org.apache.kafka.common.serialization.BooleanSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -42,7 +41,7 @@ import java.util.Map;
* When converting from bytes to Kafka Connect format, the converter will
always return an optional
* BOOLEAN schema.
*/
-public class BooleanConverter implements Converter, HeaderConverter, Versioned
{
+public class BooleanConverter implements Converter, HeaderConverter {
private final BooleanSerializer serializer = new BooleanSerializer();
private final BooleanDeserializer deserializer = new BooleanDeserializer();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
index ec934ad56cc..83838e91a1e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.converters;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -35,7 +34,7 @@ import java.util.Map;
* <p>
* This implementation currently does nothing with the topic names or header
keys.
*/
-public class ByteArrayConverter implements Converter, HeaderConverter,
Versioned {
+public class ByteArrayConverter implements Converter, HeaderConverter {
private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
@Override
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
index 9ab569bc4fe..0deb37d14e3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -41,7 +40,7 @@ import java.util.Map;
* <p>
* This implementation currently does nothing with the topic names or header
keys.
*/
-abstract class NumberConverter<T extends Number> implements Converter,
HeaderConverter, Versioned {
+abstract class NumberConverter<T extends Number> implements Converter,
HeaderConverter {
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index cd0be18f71a..0a546c01e02 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.ConnectPlugin;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
@@ -668,6 +669,11 @@ public class Plugins {
throw new ConnectException("Version not defined for '" +
klassName + "'");
}
}
+ if (plugin instanceof ConnectPlugin connectPlugin) {
+ if (Utils.isBlank(connectPlugin.version())) {
+ throw new ConnectException("Version not defined for '" +
klassName + "'");
+ }
+ }
if (plugin instanceof Configurable) {
((Configurable) plugin).configure(config.originals());
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
index eb8b59de015..703c4015609 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
@@ -497,6 +497,11 @@ public class ConnectorValidationIntegrationTest {
public abstract static class TestConverter implements Converter,
HeaderConverter {
+ @Override
+ public String version() {
+ return "";
+ }
+
// Defined by both Converter and HeaderConverter interfaces
@Override
public ConfigDef config() {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 58caffa2b2f..fd7f14d122d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -288,7 +287,7 @@ public class ErrorHandlingIntegrationTest {
assertEquals(expected, new String(actual));
}
- public static class FaultyPassthrough<R extends ConnectRecord<R>>
implements Transformation<R>, Versioned {
+ public static class FaultyPassthrough<R extends ConnectRecord<R>>
implements Transformation<R> {
static final ConfigDef CONFIG_DEF = new ConfigDef();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 8ba0316e20c..699cba17a39 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -243,6 +243,11 @@ public class ConnectMetricsTest {
return null;
}
+ @Override
+ public String version() {
+ return "test";
+ }
+
@Override
public ConfigDef config() {
return Converter.super.config();
@@ -329,6 +334,11 @@ public class ConnectMetricsTest {
return null;
}
+ @Override
+ public String version() {
+ return "test";
+ }
+
@Override
public ConfigDef config() {
return null;
@@ -373,6 +383,11 @@ public class ConnectMetricsTest {
@Override
public void configure(Map<String, ?> configs) { }
+ @Override
+ public String version() {
+ return "test";
+ }
+
@Override
public ConfigDef config() {
return null;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 65b37892143..6e71e37d82c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@@ -61,7 +60,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
public abstract static class TestConnector extends Connector {
}
- public static class SimpleTransformation<R extends ConnectRecord<R>>
implements Transformation<R>, Versioned {
+ public static class SimpleTransformation<R extends ConnectRecord<R>>
implements Transformation<R> {
int magicNumber = 0;
@@ -399,7 +398,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
}
}
- public abstract static class AbstractTestPredicate<R extends
ConnectRecord<R>> implements Predicate<R>, Versioned {
+ public abstract static class AbstractTestPredicate<R extends
ConnectRecord<R>> implements Predicate<R> {
@Override
public String version() {
@@ -410,7 +409,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
}
- public abstract static class AbstractTransformation<R extends
ConnectRecord<R>> implements Transformation<R>, Versioned {
+ public abstract static class AbstractTransformation<R extends
ConnectRecord<R>> implements Transformation<R> {
@Override
public String version() {
@@ -419,7 +418,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
}
- public abstract static class AbstractKeyValueTransformation<R extends
ConnectRecord<R>> implements Transformation<R>, Versioned {
+ public abstract static class AbstractKeyValueTransformation<R extends
ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
return null;
@@ -446,14 +445,10 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
}
- public static class Key<R extends ConnectRecord<R>> extends
AbstractKeyValueTransformation<R> implements Versioned {
-
- @Override
- public String version() {
- return "1.0";
- }
+ public static class Key<R extends ConnectRecord<R>> extends
AbstractKeyValueTransformation<R> {
}
+
public static class Value<R extends ConnectRecord<R>> extends
AbstractKeyValueTransformation<R> {
}
@@ -485,7 +480,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
assertEquals(expectedType, configKey.type, prefix + keyName + "'
config should be a " + expectedType);
}
- public static class HasDuplicateConfigTransformation<R extends
ConnectRecord<R>> implements Transformation<R>, Versioned {
+ public static class HasDuplicateConfigTransformation<R extends
ConnectRecord<R>> implements Transformation<R> {
private static final String MUST_EXIST_KEY = "must.exist.key";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
// this configDef is duplicate. It should be removed
automatically so as to avoid duplicate config error.
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index a8e001544b3..a9e5f289732 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -488,7 +487,7 @@ public class ErrorHandlingTaskTest {
}
// Public to allow plugin discovery to complete without errors
- public static class FaultyConverter extends JsonConverter implements
Versioned {
+ public static class FaultyConverter extends JsonConverter {
private static final Logger log =
LoggerFactory.getLogger(FaultyConverter.class);
private int invocations = 0;
@@ -513,7 +512,7 @@ public class ErrorHandlingTaskTest {
}
// Public to allow plugin discovery to complete without errors
- public static class FaultyPassthrough<R extends ConnectRecord<R>>
implements Transformation<R>, Versioned {
+ public static class FaultyPassthrough<R extends ConnectRecord<R>>
implements Transformation<R> {
private static final Logger log =
LoggerFactory.getLogger(FaultyPassthrough.class);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
index 19320736dc9..a110aabbc0c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -31,7 +30,7 @@ import java.util.Map;
/**
* This is a simple Converter implementation that uses "encoding" header to
encode/decode strings via provided charset name
*/
-public class SampleConverterWithHeaders implements Converter, Versioned {
+public class SampleConverterWithHeaders implements Converter {
private static final String HEADER_ENCODING = "encoding";
@Override
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
index 3491fde3988..5a62a9cde55 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.HeaderConverter;
@@ -26,7 +25,7 @@ import org.apache.kafka.connect.storage.HeaderConverter;
import java.io.IOException;
import java.util.Map;
-public class SampleHeaderConverter implements HeaderConverter, Versioned {
+public class SampleHeaderConverter implements HeaderConverter {
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey,
byte[] value) {
return null;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
index 4d72df35ca3..ee8edfff95b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
@@ -18,13 +18,12 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Map;
-public class SamplePredicate implements Predicate<SourceRecord>, Versioned {
+public class SamplePredicate implements Predicate<SourceRecord> {
private boolean testResult;
boolean closed = false;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
index 443b488ef3b..001dc0de817 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
@@ -18,13 +18,12 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.Map;
-public class SampleTransformation<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public class SampleTransformation<R extends ConnectRecord<R>> implements
Transformation<R> {
boolean closed = false;
private R transformedRecord;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
index f2606c5846e..ae1b6f429e1 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
@@ -18,7 +18,7 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.components.ConnectPlugin;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
@@ -61,8 +61,8 @@ public class MultiVersionTest {
Assertions.assertInstanceOf(PluginClassLoader.class,
pluginLoader);
Assertions.assertTrue(((PluginClassLoader)
pluginLoader).location().contains(pluginLocation));
Object p = plugins.newPlugin(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
- Assertions.assertInstanceOf(Versioned.class, p);
- Assertions.assertEquals(buildInfo.version(), ((Versioned)
p).version());
+ Assertions.assertInstanceOf(ConnectPlugin.class, p);
+ Assertions.assertEquals(buildInfo.version(), ((ConnectPlugin)
p).version());
}
}
}
@@ -185,8 +185,8 @@ public class MultiVersionTest {
String version = plugins.pluginVersion(className, connectorLoader,
PluginType.values());
Assertions.assertEquals(versions.get(i), version);
Object p = plugins.newPlugin(className, null, connectorLoader);
- Assertions.assertInstanceOf(Versioned.class, p);
- Assertions.assertEquals(versions.get(i), ((Versioned)
p).version());
+ Assertions.assertInstanceOf(ConnectPlugin.class, p);
+ Assertions.assertEquals(versions.get(i), ((ConnectPlugin)
p).version());
String latestVersion = plugins.latestVersion(className,
PluginType.values());
Assertions.assertEquals(DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION,
latestVersion);
@@ -220,10 +220,10 @@ public class MultiVersionTest {
for (Map.Entry<VersionRange, String> entry :
requiredVersions.entrySet()) {
for (VersionedPluginBuilder.VersionedTestPlugin pluginType:
VersionedPluginBuilder.VersionedTestPlugin.values()) {
Object p = plugins.newPlugin(pluginType.className(),
entry.getKey());
- Assertions.assertInstanceOf(Versioned.class, p);
- Assertions.assertEquals(entry.getValue(), ((Versioned)
p).version(),
+ Assertions.assertInstanceOf(ConnectPlugin.class, p);
+ Assertions.assertEquals(entry.getValue(), ((ConnectPlugin)
p).version(),
String.format("Provided Version Range %s for class %s
should return plugin version %s instead of %s",
- entry.getKey(), pluginType.className(),
entry.getValue(), ((Versioned) p).version()));
+ entry.getKey(), pluginType.className(),
entry.getValue(), ((ConnectPlugin) p).version()));
}
}
}
@@ -276,17 +276,17 @@ public class MultiVersionTest {
Converter keyConverter = plugins.newConverter(config,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
Assertions.assertEquals(keyConverter.getClass().getName(),
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
- Assertions.assertInstanceOf(Versioned.class, keyConverter);
- Assertions.assertEquals("1.1.0", ((Versioned) keyConverter).version());
+ Assertions.assertInstanceOf(ConnectPlugin.class, keyConverter);
+ Assertions.assertEquals("1.1.0", keyConverter.version());
Converter valueConverter = plugins.newConverter(config,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION);
Assertions.assertEquals(valueConverter.getClass().getName(),
VersionedPluginBuilder.VersionedTestPlugin.CONVERTER.className());
- Assertions.assertInstanceOf(Versioned.class, valueConverter);
- Assertions.assertEquals("2.3.0", ((Versioned)
valueConverter).version());
+ Assertions.assertInstanceOf(ConnectPlugin.class, valueConverter);
+ Assertions.assertEquals("2.3.0", valueConverter.version());
HeaderConverter headerConverter = plugins.newHeaderConverter(config,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
WorkerConfig.HEADER_CONVERTER_VERSION);
Assertions.assertEquals(headerConverter.getClass().getName(),
VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER.className());
- Assertions.assertInstanceOf(Versioned.class, headerConverter);
- Assertions.assertEquals("4.3.0", ((Versioned)
headerConverter).version());
+ Assertions.assertInstanceOf(ConnectPlugin.class, headerConverter);
+ Assertions.assertEquals("4.3.0", headerConverter.version());
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 24ef0d535b8..31e6308b44e 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -602,7 +601,7 @@ public class PluginUtilsTest {
assertEquals(expectedAliases, actualAliases);
}
- public static class CollidingConverter implements Converter, Versioned {
+ public static class CollidingConverter implements Converter {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@@ -623,7 +622,7 @@ public class PluginUtilsTest {
}
}
- public static class CollidingHeaderConverter implements HeaderConverter,
Versioned {
+ public static class CollidingHeaderConverter implements HeaderConverter {
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey,
byte[] value) {
@@ -654,7 +653,7 @@ public class PluginUtilsTest {
}
}
- public static class Colliding<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+ public static class Colliding<R extends ConnectRecord<R>> implements
Transformation<R> {
@Override
public String version() {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 4b144288a54..23fbce00a35 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
import
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -690,7 +689,7 @@ public class PluginsTest {
}
}
- public static class TestConverter implements Converter, Configurable,
Versioned {
+ public static class TestConverter implements Converter, Configurable {
public Map<String, ?> configs;
public ConfigDef config() {
@@ -717,11 +716,6 @@ public class PluginsTest {
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
-
- @Override
- public String version() {
- return "test";
- }
}
public static class TestHeaderConverter implements HeaderConverter {
@@ -777,14 +771,9 @@ public class PluginsTest {
}
}
- public static class TestInternalConverter extends JsonConverter implements
Versioned {
+ public static class TestInternalConverter extends JsonConverter {
public Map<String, ?> configs;
- @Override
- public String version() {
- return "test";
- }
-
@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java
index 9888b454898..24295ddf59a 100644
---
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
@@ -74,4 +75,9 @@ public class DefaultConstructorThrowsConverter implements
Converter, HeaderConve
public ConfigDef config() {
return new ConfigDef();
}
+
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java
index bfdfcaa390b..526487d6df8 100644
---
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
@@ -73,4 +74,9 @@ public class NoDefaultConstructorConverter implements
Converter, HeaderConverter
public ConfigDef config() {
return new ConfigDef();
}
+
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorOverridePolicy.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorOverridePolicy.java
index 8b451017e0b..a9fc73d180e 100644
---
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorOverridePolicy.java
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorOverridePolicy.java
@@ -50,4 +50,9 @@ public class NoDefaultConstructorOverridePolicy implements
ConnectorClientConfig
public List<ConfigValue> validate(ConnectorClientConfigRequest
connectorClientConfigRequest) {
return null;
}
+
+ @Override
+ public String version() {
+ return "1.0.0";
+ }
}
diff --git
a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
index 699d71635a0..f21d4b5e459 100644
---
a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
+++
b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.converters;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -32,7 +31,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
-public class ByteArrayConverter implements Converter, HeaderConverter,
Versioned {
+public class ByteArrayConverter implements Converter, HeaderConverter {
private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
@Override
diff --git
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
index d2be39384ad..6879068f379 100644
---
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
+++
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
@@ -43,4 +43,9 @@ public final class NonMigratedConverter implements Converter {
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
return null;
}
+
+ @Override
+ public String version() {
+ return "1.0.0";
+ }
}
diff --git
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
index 9e013303a56..0adc0ede536 100644
---
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
+++
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
@@ -54,4 +54,9 @@ public class NonMigratedHeaderConverter implements
HeaderConverter {
@Override
public void configure(Map<String, ?> configs) {
}
+
+ @Override
+ public String version() {
+ return "1.0.0";
+ }
}
diff --git
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
index a82b82f2974..ea7e1aa7ceb 100644
---
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
+++
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
@@ -93,4 +93,9 @@ public final class NonMigratedMultiPlugin implements
Converter, HeaderConverter,
public List<ConfigValue> validate(ConnectorClientConfigRequest
connectorClientConfigRequest) {
return null;
}
+
+ @Override
+ public String version() {
+ return "1.0.0";
+ }
}
diff --git
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
index 3465d0b4633..730c7c3281b 100644
---
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
+++
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
@@ -48,4 +48,9 @@ public class NonMigratedPredicate implements Predicate {
@Override
public void close() {
}
+
+ @Override
+ public String version() {
+ return "1.0.0";
+ }
}
diff --git
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
index c3a732568f7..a0ccae77fda 100644
---
a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
+++
b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
@@ -47,4 +47,9 @@ public class NonMigratedTransformation implements
Transformation {
@Override
public void close() {
}
+
+ @Override
+ public String version() {
+ return "1.0.0";
+ }
}
diff --git
a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
index f68b4eb4e58..6a4936ce599 100644
---
a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
+++
b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
@@ -31,7 +31,6 @@ import java.net.URL;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.components.Versioned;
/**
* Fake plugin class for testing classloading isolation
@@ -40,7 +39,7 @@ import org.apache.kafka.connect.components.Versioned;
* Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String,
Schema, Object)}
* and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
*/
-public class ReadVersionFromResource implements Converter, Versioned {
+public class ReadVersionFromResource implements Converter {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
diff --git
a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
index 863ed9fad97..9ccce1920c5 100644
---
a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
+++
b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
@@ -31,7 +31,6 @@ import java.net.URL;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.components.Versioned;
/**
* Fake plugin class for testing classloading isolation.
@@ -40,7 +39,7 @@ import org.apache.kafka.connect.components.Versioned;
* Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String,
Schema, Object)}
* and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
*/
-public class ReadVersionFromResource implements Converter, Versioned {
+public class ReadVersionFromResource implements Converter {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
index 766f29330ef..c209b79b6f4 100644
---
a/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-converter/test/plugins/VersionedConverter.java
@@ -20,7 +20,6 @@ package test.plugins;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
@@ -29,7 +28,7 @@ import org.apache.kafka.connect.storage.Converter;
* Converter to test multiverioning of plugins.
* Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
*/
-public class VersionedConverter implements Converter, Versioned {
+public class VersionedConverter implements Converter {
public VersionedConverter() {
super();
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
index c0ef947e669..aac1a110107 100644
---
a/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-header-converter/test/plugins/VersionedHeaderConverter.java
@@ -18,7 +18,6 @@
package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.HeaderConverter;
@@ -29,7 +28,7 @@ import java.util.Map;
* Header Converter to test multiverioning of plugins.
* Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
*/
-public class VersionedHeaderConverter implements HeaderConverter, Versioned {
+public class VersionedHeaderConverter implements HeaderConverter {
public VersionedHeaderConverter() {
super();
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
index 2e92c79c351..d6bd62263bc 100644
---
a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java
@@ -19,7 +19,6 @@ package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
@@ -30,7 +29,7 @@ import java.util.Map;
* Predicate to test multiverioning of plugins.
* Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
*/
-public class VersionedPredicate<R extends ConnectRecord<R>> implements
Predicate<R>, Versioned {
+public class VersionedPredicate<R extends ConnectRecord<R>> implements
Predicate<R> {
@Override
public String version() {
diff --git
a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
index 0422834d027..ab4a0e34000 100644
---
a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
+++
b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java
@@ -19,7 +19,6 @@ package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
@@ -29,7 +28,7 @@ import java.util.Map;
* Transformation to test multiverioning of plugins.
* Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
*/
-public class VersionedTransformation<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public class VersionedTransformation<R extends ConnectRecord<R>> implements
Transformation<R> {
@Override
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 7c13ef4d785..02e89aa818f 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
@@ -55,7 +54,7 @@ import java.util.Set;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public abstract class Cast<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class Cast<R extends ConnectRecord<R>> implements
Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(Cast.class);
// TODO: Currently we only support top-level field casting. Ideally we
could use a dotted notation in the spec to
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
index cd87c33a509..13f005f9115 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
@@ -31,7 +30,7 @@ import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
-public class DropHeaders<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public class DropHeaders<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC =
"Removes one or more headers from each record.";
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
index 89f6d28c5ee..d3287ab7499 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -32,7 +31,7 @@ import java.util.Map;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
-public abstract class ExtractField<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class ExtractField<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC =
"Extract the specified field from a Struct when schema present, or
a Map in the case of schemaless data. "
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
index 80fdcbf7e9c..059ca456616 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import java.util.Map;
@@ -29,7 +28,7 @@ import java.util.Map;
* a particular {@link
org.apache.kafka.connect.transforms.predicates.Predicate}.
* @param <R> The type of record.
*/
-public class Filter<R extends ConnectRecord<R>> implements Transformation<R>,
Versioned {
+public class Filter<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Drops all records, filtering
them from subsequent transformations in the chain. " +
"This is intended to be used conditionally to filter out records
matching (or not matching) " +
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index 985902a9bd4..c0e54e44fb8 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
@@ -39,7 +38,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
-public abstract class Flatten<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class Flatten<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC =
"Flatten a nested data structure, generating names for each field
by concatenating the field names at each "
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
index 8b9030066ae..d2dbb176154 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -42,7 +41,7 @@ import java.util.Map;
import static java.lang.String.format;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
-public abstract class HeaderFrom<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class HeaderFrom<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String FIELDS_FIELD = "fields";
public static final String HEADERS_FIELD = "headers";
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
index 9924571825f..7b4dfc29130 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -31,7 +30,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.HashMap;
import java.util.Map;
-public abstract class HoistField<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class HoistField<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC =
"Wrap data using the specified field name in a Struct when schema
present, or a Map in the case of schemaless data."
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
index 695dc2ab55d..428fcb5d3e0 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -40,7 +39,7 @@ import static
org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireSinkRecord;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public abstract class InsertField<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class InsertField<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC =
"Insert field(s) using attributes from the record metadata or a
configured static value."
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
index ce218705fb3..d22a680fcb2 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Values;
@@ -29,7 +28,7 @@ import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
-public class InsertHeader<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public class InsertHeader<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC =
"Add a header to each record.";
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
index 7d37d548eb4..99ece9be309 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -41,7 +40,7 @@ import java.util.function.Function;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public abstract class MaskField<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class MaskField<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC =
"Mask specified fields with a valid null value for the field type
(i.e. 0, false, empty string, and so on)."
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
index ae1700efdcc..d91189ce1f8 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -30,7 +29,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class RegexRouter<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public class RegexRouter<R extends ConnectRecord<R>> implements
Transformation<R> {
private static final Logger log =
LoggerFactory.getLogger(RegexRouter.class);
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index 7e8f6700bf6..ca6bbb73c58 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -40,7 +39,7 @@ import java.util.Set;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public abstract class ReplaceField<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class ReplaceField<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC = "Filter or rename fields."
+ "<p/>Use the concrete transformation type designed for the
record key (<code>" + Key.class.getName() + "</code>) "
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
index 4a94dd1cddc..9d24f9a1c13 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
@@ -34,7 +33,7 @@ import java.util.Map;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireSchema;
-public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements
Transformation<R> {
private static final Logger log =
LoggerFactory.getLogger(SetSchemaMetadata.class);
public static final String OVERVIEW_DOC =
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 940bb6045a9..95c97eb6bad 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -49,7 +48,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
-public abstract class TimestampConverter<R extends ConnectRecord<R>>
implements Transformation<R>, Versioned {
+public abstract class TimestampConverter<R extends ConnectRecord<R>>
implements Transformation<R> {
public static final String OVERVIEW_DOC =
"Convert timestamps between different formats such as Unix epoch,
strings, and Connect Date/Timestamp types."
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
index bc94964e659..6739a159464 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -30,7 +29,7 @@ import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class TimestampRouter<R extends ConnectRecord<R>> implements
Transformation<R>, AutoCloseable, Versioned {
+public class TimestampRouter<R extends ConnectRecord<R>> implements
Transformation<R>, AutoCloseable {
private static final Pattern TOPIC = Pattern.compile("${topic}",
Pattern.LITERAL);
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
index 19c299e6867..e8a57f35525 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -37,7 +36,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public class ValueToKey<R extends ConnectRecord<R>> implements
Transformation<R>, Versioned {
+public class ValueToKey<R extends ConnectRecord<R>> implements
Transformation<R> {
public static final String OVERVIEW_DOC = "Replace the record key with a
new key formed from a subset of fields in the record value.";
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
index 566bdfab238..c393b1308a9 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms.predicates;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -30,7 +29,7 @@ import java.util.Map;
* A predicate which is true for records with at least one header with the
configured name.
* @param <R> The type of connect record.
*/
-public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R>,
Versioned {
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
private static final String NAME_CONFIG = "name";
public static final String OVERVIEW_DOC = "A predicate which is true for
records with at least one header with the configured name.";
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
index 848a8452b28..b709a83b093 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms.predicates;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import java.util.Map;
@@ -27,7 +26,7 @@ import java.util.Map;
* A predicate which is true for records which are tombstones (i.e. have null
value).
* @param <R> The type of connect record.
*/
-public class RecordIsTombstone<R extends ConnectRecord<R>> implements
Predicate<R>, Versioned {
+public class RecordIsTombstone<R extends ConnectRecord<R>> implements
Predicate<R> {
public static final String OVERVIEW_DOC = "A predicate which is true for
records which are tombstones (i.e. have null value).";
public static final ConfigDef CONFIG_DEF = new ConfigDef();
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
index bcd519eba5a..cd358b2642b 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.transforms.predicates;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -32,7 +31,7 @@ import java.util.regex.PatternSyntaxException;
* A predicate which is true for records with a topic name that matches the
configured regular expression.
* @param <R> The type of connect record.
*/
-public class TopicNameMatches<R extends ConnectRecord<R>> implements
Predicate<R>, Versioned {
+public class TopicNameMatches<R extends ConnectRecord<R>> implements
Predicate<R> {
private static final String PATTERN_CONFIG = "pattern";