This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 2edb95f  KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing 
warnings (#8571)
2edb95f is described below

commit 2edb95f5ac948173fee7cf342fb34067c80cbb4c
Author: Tom Bentley <[email protected]>
AuthorDate: Wed Jul 7 17:24:31 2021 +0100

    KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing warnings (#8571)
    
    Reviewers: Konstantine Karantasis <[email protected]>
---
 build.gradle                                       |  3 +-
 .../kafka/connect/connector/ConnectRecord.java     |  2 +-
 .../apache/kafka/connect/data/ConnectSchema.java   | 12 ++++----
 .../connect/rest/ConnectRestExtensionContext.java  |  2 +-
 .../apache/kafka/connect/json/JsonConverter.java   |  2 +-
 .../kafka/connect/runtime/TransformationChain.java |  2 +-
 .../runtime/isolation/DelegatingClassLoader.java   | 33 ++++++++++++++++------
 .../connect/runtime/isolation/PluginDesc.java      |  2 +-
 .../runtime/isolation/PluginScanResult.java        | 16 +++++------
 .../kafka/connect/runtime/isolation/Plugins.java   |  4 +--
 .../runtime/rest/ConnectRestConfigurable.java      |  2 +-
 .../rest/ConnectRestExtensionContextImpl.java      |  6 ++--
 .../runtime/rest/resources/LoggingResource.java    |  4 +--
 .../connect/storage/KafkaStatusBackingStore.java   |  6 ++--
 .../kafka/connect/runtime/AbstractHerderTest.java  | 23 +++++++++++----
 .../kafka/connect/runtime/ConnectorConfigTest.java | 12 ++++----
 .../connect/runtime/isolation/PluginDescTest.java  |  4 +++
 .../resources/ConnectorPluginsResourceTest.java    | 26 ++++++++---------
 .../rest/resources/ConnectorsResourceTest.java     | 12 ++++----
 .../storage/KafkaConfigBackingStoreTest.java       |  6 ++--
 .../kafka/connect/transforms/ReplaceFieldTest.java |  7 +++--
 .../transforms/predicates/HasHeaderKeyTest.java    |  2 +-
 22 files changed, 110 insertions(+), 78 deletions(-)

diff --git a/build.gradle b/build.gradle
index 208d6d5..b589525 100644
--- a/build.gradle
+++ b/build.gradle
@@ -222,7 +222,8 @@ subprojects {
     options.encoding = 'UTF-8'
     options.compilerArgs << "-Xlint:all"
     // temporary exclusions until all the warnings are fixed
-    options.compilerArgs << "-Xlint:-rawtypes"
+    if (!project.path.startsWith(":connect"))
+      options.compilerArgs << "-Xlint:-rawtypes"
     options.compilerArgs << "-Xlint:-serial"
     options.compilerArgs << "-Xlint:-try"
     options.compilerArgs << "-Werror"
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index b181209..1cc756b 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -155,7 +155,7 @@ public abstract class ConnectRecord<R extends 
ConnectRecord<R>> {
         if (o == null || getClass() != o.getClass())
             return false;
 
-        ConnectRecord that = (ConnectRecord) o;
+        ConnectRecord<?> that = (ConnectRecord<?>) o;
 
         return Objects.equals(kafkaPartition, that.kafkaPartition)
                && Objects.equals(topic, that.topic)
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
index d977487..5e99a0a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
@@ -32,11 +32,11 @@ public class ConnectSchema implements Schema {
     /**
      * Maps Schema.Types to a list of Java classes that can be used to 
represent them.
      */
-    private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new 
EnumMap<>(Type.class);
+    private static final Map<Type, List<Class<?>>> SCHEMA_TYPE_CLASSES = new 
EnumMap<>(Type.class);
     /**
      * Maps known logical types to a list of Java classes that can be used to 
represent them.
      */
-    private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new 
HashMap<>();
+    private static final Map<String, List<Class<?>>> LOGICAL_TYPE_CLASSES = 
new HashMap<>();
 
     /**
      * Maps the Java classes to the corresponding Schema.Type.
@@ -60,7 +60,7 @@ public class ConnectSchema implements Schema {
         SCHEMA_TYPE_CLASSES.put(Type.MAP, 
Collections.singletonList(Map.class));
         SCHEMA_TYPE_CLASSES.put(Type.STRUCT, 
Collections.singletonList(Struct.class));
 
-        for (Map.Entry<Type, List<Class>> schemaClasses : 
SCHEMA_TYPE_CLASSES.entrySet()) {
+        for (Map.Entry<Type, List<Class<?>>> schemaClasses : 
SCHEMA_TYPE_CLASSES.entrySet()) {
             for (Class<?> schemaClass : schemaClasses.getValue())
                 JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, 
schemaClasses.getKey());
         }
@@ -221,7 +221,7 @@ public class ConnectSchema implements Schema {
             return;
         }
 
-        List<Class> expectedClasses = expectedClassesFor(schema);
+        List<Class<?>> expectedClasses = expectedClassesFor(schema);
 
         if (expectedClasses == null)
             throw new DataException("Invalid Java object for schema type " + 
schema.type()
@@ -263,8 +263,8 @@ public class ConnectSchema implements Schema {
         }
     }
 
-    private static List<Class> expectedClassesFor(Schema schema) {
-        List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
+    private static List<Class<?>> expectedClassesFor(Schema schema) {
+        List<Class<?>> expectedClasses = 
LOGICAL_TYPE_CLASSES.get(schema.name());
         if (expectedClasses == null)
             expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
         return expectedClasses;
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
index 7608597..c951627 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
@@ -33,7 +33,7 @@ public interface ConnectRestExtensionContext {
      *
      * @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never 
{@code null}
      */
-    Configurable<? extends Configurable> configurable();
+    Configurable<? extends Configurable<?>> configurable();
 
     /**
      * Provides the cluster state and health information about the connectors 
and tasks.
diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index a7bf464..10fde8f 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -610,7 +610,7 @@ public class JsonConverter implements Converter, 
HeaderConverter {
                     else
                         throw new DataException("Invalid type for bytes type: 
" + value.getClass());
                 case ARRAY: {
-                    Collection collection = (Collection) value;
+                    Collection<?> collection = (Collection<?>) value;
                     ArrayNode list = JSON_NODE_FACTORY.arrayNode();
                     for (Object elem : collection) {
                         Schema valueSchema = schema == null ? null : 
schema.valueSchema();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index 5027cb5..6777a96 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -66,7 +66,7 @@ public class TransformationChain<R extends ConnectRecord<R>> 
implements AutoClos
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        TransformationChain that = (TransformationChain) o;
+        TransformationChain<?> that = (TransformationChain<?>) o;
         return Objects.equals(transformations, that.transformations);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 43ceba3..c41ae98 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -72,15 +72,15 @@ public class DelegatingClassLoader extends URLClassLoader {
     private final SortedSet<PluginDesc<Connector>> connectors;
     private final SortedSet<PluginDesc<Converter>> converters;
     private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
-    private final SortedSet<PluginDesc<Transformation>> transformations;
-    private final SortedSet<PluginDesc<Predicate>> predicates;
+    private final SortedSet<PluginDesc<Transformation<?>>> transformations;
+    private final SortedSet<PluginDesc<Predicate<?>>> predicates;
     private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
     private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
     private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies;
     private final List<String> pluginPaths;
 
     private static final String MANIFEST_PREFIX = "META-INF/services/";
-    private static final Class[] SERVICE_LOADER_PLUGINS = new Class[] 
{ConnectRestExtension.class, ConfigProvider.class};
+    private static final Class<?>[] SERVICE_LOADER_PLUGINS = new Class<?>[] 
{ConnectRestExtension.class, ConfigProvider.class};
     private static final Set<String> PLUGIN_MANIFEST_FILES =
         Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> 
MANIFEST_PREFIX + serviceLoaderPlugin.getName())
             .collect(Collectors.toSet());
@@ -120,11 +120,11 @@ public class DelegatingClassLoader extends URLClassLoader 
{
         return headerConverters;
     }
 
-    public Set<PluginDesc<Transformation>> transformations() {
+    public Set<PluginDesc<Transformation<?>>> transformations() {
         return transformations;
     }
 
-    public Set<PluginDesc<Predicate>> predicates() {
+    public Set<PluginDesc<Predicate<?>>> predicates() {
         return predicates;
     }
 
@@ -334,14 +334,24 @@ public class DelegatingClassLoader extends URLClassLoader 
{
                 getPluginDesc(reflections, Connector.class, loader),
                 getPluginDesc(reflections, Converter.class, loader),
                 getPluginDesc(reflections, HeaderConverter.class, loader),
-                getPluginDesc(reflections, Transformation.class, loader),
-                getPluginDesc(reflections, Predicate.class, loader),
+                getTransformationPluginDesc(loader, reflections),
+                getPredicatePluginDesc(loader, reflections),
                 getServiceLoaderPluginDesc(ConfigProvider.class, loader),
                 getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
                 
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
         );
     }
 
+    @SuppressWarnings({"unchecked"})
+    private Collection<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) throws 
ReflectiveOperationException {
+        return (Collection<PluginDesc<Predicate<?>>>) (Collection<?>) 
getPluginDesc(reflections, Predicate.class, loader);
+    }
+
+    @SuppressWarnings({"unchecked"})
+    private Collection<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) throws 
ReflectiveOperationException {
+        return (Collection<PluginDesc<Transformation<?>>>) (Collection<?>) 
getPluginDesc(reflections, Transformation.class, loader);
+    }
+
     private <T> Collection<PluginDesc<T>> getPluginDesc(
             Reflections reflections,
             Class<T> klass,
@@ -359,7 +369,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         Collection<PluginDesc<T>> result = new ArrayList<>();
         for (Class<? extends T> plugin : plugins) {
             if (PluginUtils.isConcrete(plugin)) {
-                result.add(new PluginDesc<>(plugin, versionFor(plugin), 
loader));
+                result.add(pluginDesc(plugin, versionFor(plugin), loader));
             } else {
                 log.debug("Skipping {} as it is not concrete implementation", 
plugin);
             }
@@ -367,6 +377,11 @@ public class DelegatingClassLoader extends URLClassLoader {
         return result;
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
+        return new PluginDesc(plugin, version, loader);
+    }
+
     @SuppressWarnings("unchecked")
     private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
         ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
@@ -374,7 +389,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         try {
             ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
             for (T pluginImpl : serviceLoader) {
-                result.add(new PluginDesc<>((Class<? extends T>) 
pluginImpl.getClass(),
+                result.add(pluginDesc((Class<? extends T>) 
pluginImpl.getClass(),
                     versionFor(pluginImpl), loader));
             }
         } finally {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index 904537a..62a7d6c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -103,7 +103,7 @@ public class PluginDesc<T> implements 
Comparable<PluginDesc<T>> {
     }
 
     @Override
-    public int compareTo(PluginDesc other) {
+    public int compareTo(PluginDesc<T> other) {
         int nameComp = name.compareTo(other.name);
         return nameComp != 0 ? nameComp : 
encodedVersion.compareTo(other.encodedVersion);
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index ac42dbb..e98945e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -33,20 +33,20 @@ public class PluginScanResult {
     private final Collection<PluginDesc<Connector>> connectors;
     private final Collection<PluginDesc<Converter>> converters;
     private final Collection<PluginDesc<HeaderConverter>> headerConverters;
-    private final Collection<PluginDesc<Transformation>> transformations;
-    private final Collection<PluginDesc<Predicate>> predicates;
+    private final Collection<PluginDesc<Transformation<?>>> transformations;
+    private final Collection<PluginDesc<Predicate<?>>> predicates;
     private final Collection<PluginDesc<ConfigProvider>> configProviders;
     private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
     private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies;
 
-    private final List<Collection> allPlugins;
+    private final List<Collection<?>> allPlugins;
 
     public PluginScanResult(
             Collection<PluginDesc<Connector>> connectors,
             Collection<PluginDesc<Converter>> converters,
             Collection<PluginDesc<HeaderConverter>> headerConverters,
-            Collection<PluginDesc<Transformation>> transformations,
-            Collection<PluginDesc<Predicate>> predicates,
+            Collection<PluginDesc<Transformation<?>>> transformations,
+            Collection<PluginDesc<Predicate<?>>> predicates,
             Collection<PluginDesc<ConfigProvider>> configProviders,
             Collection<PluginDesc<ConnectRestExtension>> restExtensions,
             Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies
@@ -76,11 +76,11 @@ public class PluginScanResult {
         return headerConverters;
     }
 
-    public Collection<PluginDesc<Transformation>> transformations() {
+    public Collection<PluginDesc<Transformation<?>>> transformations() {
         return transformations;
     }
 
-    public Collection<PluginDesc<Predicate>> predicates() {
+    public Collection<PluginDesc<Predicate<?>>> predicates() {
         return predicates;
     }
 
@@ -98,7 +98,7 @@ public class PluginScanResult {
 
     public boolean isEmpty() {
         boolean isEmpty = true;
-        for (Collection plugins : allPlugins) {
+        for (Collection<?> plugins : allPlugins) {
             isEmpty = isEmpty && plugins.isEmpty();
         }
         return isEmpty;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 5ad8dac..2ea29c0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -156,11 +156,11 @@ public class Plugins {
         return delegatingLoader.converters();
     }
 
-    public Set<PluginDesc<Transformation>> transformations() {
+    public Set<PluginDesc<Transformation<?>>> transformations() {
         return delegatingLoader.transformations();
     }
 
-    public Set<PluginDesc<Predicate>> predicates() {
+    public Set<PluginDesc<Predicate<?>>> predicates() {
         return delegatingLoader.predicates();
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
index 0d5cbd6..f33ce19 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
@@ -81,7 +81,7 @@ public class ConnectRestConfigurable implements 
Configurable<ResourceConfig> {
     }
 
     @Override
-    public ResourceConfig register(Object component, Class... contracts) {
+    public ResourceConfig register(Object component, Class<?>... contracts) {
         if (allowedToRegister(component)) {
             resourceConfig.register(component, contracts);
         }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
index cdf282f..6d0a2a2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
@@ -24,11 +24,11 @@ import javax.ws.rs.core.Configurable;
 
 public class ConnectRestExtensionContextImpl implements 
ConnectRestExtensionContext {
 
-    private Configurable<? extends Configurable> configurable;
+    private Configurable<? extends Configurable<?>> configurable;
     private ConnectClusterState clusterState;
 
     public ConnectRestExtensionContextImpl(
-        Configurable<? extends Configurable> configurable,
+        Configurable<? extends Configurable<?>> configurable,
         ConnectClusterState clusterState
     ) {
         this.configurable = configurable;
@@ -36,7 +36,7 @@ public class ConnectRestExtensionContextImpl implements 
ConnectRestExtensionCont
     }
 
     @Override
-    public Configurable<? extends Configurable> configurable() {
+    public Configurable<? extends Configurable<?>> configurable() {
         return configurable;
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
index 0579660..ce9ce14 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
@@ -139,10 +139,10 @@ public class LoggingResource {
         } else {
             childLoggers = new ArrayList<>();
             Logger ancestorLogger = lookupLogger(namedLogger);
-            Enumeration en = currentLoggers();
+            Enumeration<Logger> en = currentLoggers();
             boolean present = false;
             while (en.hasMoreElements()) {
-                Logger current = (Logger) en.nextElement();
+                Logger current = en.nextElement();
                 if (current.getName().startsWith(namedLogger)) {
                     childLoggers.add(current);
                 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index eadbe18..44902c0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -302,7 +302,7 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
         });
     }
 
-    private <V extends AbstractStatus> void send(final String key,
+    private <V extends AbstractStatus<?>> void send(final String key,
                                                  final V status,
                                                  final CacheEntry<V> entry,
                                                  final boolean safeWrite) {
@@ -492,7 +492,7 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
         }
     }
 
-    private byte[] serialize(AbstractStatus status) {
+    private byte[] serialize(AbstractStatus<?> status) {
         Struct struct = new Struct(STATUS_SCHEMA_V0);
         struct.put(STATE_KEY_NAME, status.state().name());
         if (status.trace() != null)
@@ -645,7 +645,7 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
         }
     }
 
-    private static class CacheEntry<T extends AbstractStatus> {
+    private static class CacheEntry<T extends AbstractStatus<?>> {
         private T value = null;
         private int sequence = 0;
         private boolean deleted = false;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 79c7d59..8c8d00d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -455,13 +455,14 @@ public class AbstractHerderTest {
         verifyAll();
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
     @Test()
     public void testConfigValidationTransformsExtendResults() throws Throwable 
{
         AbstractHerder herder = 
createConfigValidationHerder(TestSourceConnector.class, 
noneConnectorClientConfigOverridePolicy);
 
         // 2 transform aliases defined -> 2 plugin lookups
-        Set<PluginDesc<Transformation>> transformations = new HashSet<>();
-        transformations.add(new PluginDesc<>(SampleTransformation.class, 
"1.0", classLoader));
+        Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
+        transformations.add(transformationPluginDesc());
         
EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
 
         replayAll();
@@ -512,12 +513,12 @@ public class AbstractHerderTest {
         AbstractHerder herder = 
createConfigValidationHerder(TestSourceConnector.class, 
noneConnectorClientConfigOverridePolicy);
 
         // 2 transform aliases defined -> 2 plugin lookups
-        Set<PluginDesc<Transformation>> transformations = new HashSet<>();
-        transformations.add(new PluginDesc<>(SampleTransformation.class, 
"1.0", classLoader));
+        Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
+        transformations.add(transformationPluginDesc());
         
EasyMock.expect(plugins.transformations()).andReturn(transformations).times(1);
 
-        Set<PluginDesc<Predicate>> predicates = new HashSet<>();
-        predicates.add(new PluginDesc<>(SamplePredicate.class, "1.0", 
classLoader));
+        Set<PluginDesc<Predicate<?>>> predicates = new HashSet<>();
+        predicates.add(predicatePluginDesc());
         EasyMock.expect(plugins.predicates()).andReturn(predicates).times(2);
 
         replayAll();
@@ -579,6 +580,16 @@ public class AbstractHerderTest {
         verifyAll();
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private PluginDesc<Predicate<?>> predicatePluginDesc() {
+        return new PluginDesc(SamplePredicate.class, "1.0", classLoader);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private PluginDesc<Transformation<?>> transformationPluginDesc() {
+        return new PluginDesc(SampleTransformation.class, "1.0", classLoader);
+    }
+
     @Test()
     public void testConfigValidationPrincipalOnlyOverride() throws Throwable {
         AbstractHerder herder = 
createConfigValidationHerder(TestSourceConnector.class, new 
PrincipalConnectorClientConfigOverridePolicy());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index b7b7ee7..4abdbea 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -43,7 +43,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
 
     public static final Plugins MOCK_PLUGINS = new Plugins(new HashMap<>()) {
         @Override
-        public Set<PluginDesc<Transformation>> transformations() {
+        public Set<PluginDesc<Transformation<?>>> transformations() {
             return Collections.emptySet();
         }
     };
@@ -149,7 +149,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
         final List<Transformation<R>> transformations = 
config.transformations();
         assertEquals(1, transformations.size());
-        final SimpleTransformation xform = (SimpleTransformation) 
transformations.get(0);
+        final SimpleTransformation<R> xform = (SimpleTransformation<R>) 
transformations.get(0);
         assertEquals(42, xform.magicNumber);
     }
 
@@ -177,8 +177,8 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
         final List<Transformation<R>> transformations = 
config.transformations();
         assertEquals(2, transformations.size());
-        assertEquals(42, ((SimpleTransformation) 
transformations.get(0)).magicNumber);
-        assertEquals(84, ((SimpleTransformation) 
transformations.get(1)).magicNumber);
+        assertEquals(42, ((SimpleTransformation<R>) 
transformations.get(0)).magicNumber);
+        assertEquals(84, ((SimpleTransformation<R>) 
transformations.get(1)).magicNumber);
     }
 
     @Test
@@ -427,11 +427,11 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         }
 
 
-        public static class Key extends AbstractKeyValueTransformation {
+        public static class Key<R extends ConnectRecord<R>> extends 
AbstractKeyValueTransformation<R> {
 
 
         }
-        public static class Value extends AbstractKeyValueTransformation {
+        public static class Value<R extends ConnectRecord<R>> extends 
AbstractKeyValueTransformation<R> {
 
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
index 950aa90..72a2493 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
@@ -48,6 +48,7 @@ public class PluginDescTest {
         pluginLoader = new PluginClassLoader(location, new URL[0], 
systemLoader);
     }
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testRegularPluginDesc() {
         PluginDesc<Connector> connectorDesc = new PluginDesc<>(
@@ -75,6 +76,7 @@ public class PluginDescTest {
         assertPluginDesc(transformDesc, Transformation.class, noVersion, 
pluginLoader.location());
     }
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testPluginDescWithSystemClassLoader() {
         String location = "classpath";
@@ -129,6 +131,7 @@ public class PluginDescTest {
         assertPluginDesc(converterDesc, Converter.class, nullVersion, 
location);
     }
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testPluginDescEquality() {
         PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
@@ -176,6 +179,7 @@ public class PluginDescTest {
         assertNotEquals(transformDescPluginPath, transformDescClasspath);
     }
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testPluginDescComparison() {
         PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 148b782..c1d06f9 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -67,7 +67,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import javax.ws.rs.BadRequestException;
 import java.net.URL;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -77,6 +76,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -125,7 +125,7 @@ public class ConnectorPluginsResourceTest {
         partialConfigs.add(configInfo);
 
         configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, 
null, "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", 
"test.int.config", Collections.emptyList());
-        configValueInfo = new ConfigValueInfo("test.int.config", "1", 
Arrays.asList("1", "2", "3"), Collections.emptyList(), true);
+        configValueInfo = new ConfigValueInfo("test.int.config", "1", 
asList("1", "2", "3"), Collections.emptyList(), true);
         configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
         partialConfigs.add(configInfo);
@@ -137,7 +137,7 @@ public class ConnectorPluginsResourceTest {
         partialConfigs.add(configInfo);
 
         configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, 
null, "HIGH", "Test configuration for list type.", "Test", 2, "LONG", 
"test.list.config", Collections.emptyList());
-        configValueInfo = new ConfigValueInfo("test.list.config", "a,b", 
Arrays.asList("a", "b", "c"), Collections.emptyList(), true);
+        configValueInfo = new ConfigValueInfo("test.list.config", "a,b", 
asList("a", "b", "c"), Collections.emptyList(), true);
         configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
         partialConfigs.add(configInfo);
@@ -145,13 +145,13 @@ public class ConnectorPluginsResourceTest {
         CONFIG_INFOS = new 
ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, 
Collections.singletonList("Test"), configs);
         PARTIAL_CONFIG_INFOS = new 
ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 
PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs);
 
-        Class<?>[] abstractConnectorClasses = {
+        List<Class<? extends Connector>> abstractConnectorClasses = asList(
             Connector.class,
             SourceConnector.class,
             SinkConnector.class
-        };
+        );
 
-        Class<?>[] connectorClasses = {
+        List<Class<? extends Connector>> connectorClasses = asList(
             VerifiableSourceConnector.class,
             VerifiableSinkConnector.class,
             MockSourceConnector.class,
@@ -159,17 +159,17 @@ public class ConnectorPluginsResourceTest {
             MockConnector.class,
             SchemaSourceConnector.class,
             ConnectorPluginsResourceTestConnector.class
-        };
+        );
 
         try {
-            for (Class<?> klass : abstractConnectorClasses) {
+            for (Class<? extends Connector> klass : abstractConnectorClasses) {
                 @SuppressWarnings("unchecked")
-                MockConnectorPluginDesc pluginDesc = new 
MockConnectorPluginDesc((Class<? extends Connector>) klass, "0.0.0");
+                MockConnectorPluginDesc pluginDesc = new 
MockConnectorPluginDesc(klass, "0.0.0");
                 CONNECTOR_PLUGINS.add(pluginDesc);
             }
-            for (Class<?> klass : connectorClasses) {
+            for (Class<? extends Connector> klass : connectorClasses) {
                 @SuppressWarnings("unchecked")
-                MockConnectorPluginDesc pluginDesc = new 
MockConnectorPluginDesc((Class<? extends Connector>) klass);
+                MockConnectorPluginDesc pluginDesc = new 
MockConnectorPluginDesc(klass);
                 CONNECTOR_PLUGINS.add(pluginDesc);
             }
         } catch (Exception e) {
@@ -490,7 +490,7 @@ public class ConnectorPluginsResourceTest {
 
         @Override
         public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
-            return Arrays.asList(1, 2, 3);
+            return asList(1, 2, 3);
         }
 
         @Override
@@ -502,7 +502,7 @@ public class ConnectorPluginsResourceTest {
     private static class ListRecommender implements Recommender {
         @Override
         public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
-            return Arrays.asList("a", "b", "c");
+            return asList("a", "b", "c");
         }
 
         @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 572c60a..1e94fa16 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -313,7 +313,7 @@ public class ConnectorsResourceTest {
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false";),
 EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+        
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false";),
 EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), 
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), 
new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
                     ConnectorType.SOURCE)));
 
@@ -798,7 +798,7 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/";
 + CONNECTOR_NAME + "/restart?forward=true&includeTasks=" + 
restartRequest.includeTasks() + "&onlyFailed=" + restartRequest.onlyFailed()),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), 
null));
 
         PowerMock.replayAll();
@@ -869,7 +869,7 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/";
 + CONNECTOR_NAME + "/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), 
null));
 
         PowerMock.replayAll();
@@ -887,7 +887,7 @@ public class ConnectorsResourceTest {
         expectAndCallbackException(cb, new NotAssignedException("not owner 
test", ownerUrl));
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/";
 + CONNECTOR_NAME + "/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), 
null));
 
         PowerMock.replayAll();
@@ -920,7 +920,7 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/";
 + CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), 
null));
 
         PowerMock.replayAll();
@@ -940,7 +940,7 @@ public class ConnectorsResourceTest {
         expectAndCallbackException(cb, new NotAssignedException("not owner 
test", ownerUrl));
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/";
 + CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), 
null));
 
         PowerMock.replayAll();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 662f0f8..18d92bf 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -978,7 +978,7 @@ public class KafkaConfigBackingStoreTest {
 
     @Test
     public void testRecordToRestartRequest() throws Exception {
-        ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+        ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
                 CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty());
         Struct struct = RESTART_REQUEST_STRUCTS.get(0);
         SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), 
structToMap(struct));
@@ -990,7 +990,7 @@ public class KafkaConfigBackingStoreTest {
 
     @Test
     public void testRecordToRestartRequestOnlyFailedInconsistent() throws 
Exception {
-        ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+        ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
                 CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty());
         Struct struct = ONLY_FAILED_MISSING_STRUCT;
         SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), 
structToMap(struct));
@@ -1002,7 +1002,7 @@ public class KafkaConfigBackingStoreTest {
 
     @Test
     public void testRecordToRestartRequestIncludeTasksInconsistent() throws 
Exception {
-        ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+        ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
                 CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty());
         Struct struct = INLUDE_TASKS_MISSING_STRUCT;
         SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), 
structToMap(struct));
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index e335b40..f8641a7 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -74,6 +74,7 @@ public class ReplaceFieldTest {
         assertEquals(schema, transformedRecord.valueSchema());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void schemaless() {
         final Map<String, String> props = new HashMap<>();
@@ -91,7 +92,7 @@ public class ReplaceFieldTest {
         final SinkRecord record = new SinkRecord("test", 0, null, null, null, 
value, 0);
         final SinkRecord transformedRecord = xform.apply(record);
 
-        final Map updatedValue = (Map) transformedRecord.value();
+        final Map<String, Object> updatedValue = (Map<String, Object>) 
transformedRecord.value();
         assertEquals(3, updatedValue.size());
         assertEquals(42, updatedValue.get("xyz"));
         assertEquals(true, updatedValue.get("bar"));
@@ -144,7 +145,7 @@ public class ReplaceFieldTest {
         assertNull(transformedRecord.valueSchema());
     }
 
-
+    @SuppressWarnings("unchecked")
     @Test
     public void testExcludeBackwardsCompatibility() {
         final Map<String, String> props = new HashMap<>();
@@ -162,7 +163,7 @@ public class ReplaceFieldTest {
         final SinkRecord record = new SinkRecord("test", 0, null, null, null, 
value, 0);
         final SinkRecord transformedRecord = xform.apply(record);
 
-        final Map updatedValue = (Map) transformedRecord.value();
+        final Map<String, Object> updatedValue = (Map<String, Object>) 
transformedRecord.value();
         assertEquals(3, updatedValue.size());
         assertEquals(42, updatedValue.get("xyz"));
         assertEquals(true, updatedValue.get("bar"));
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
index 39635f3..d21c98f 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
@@ -79,7 +79,7 @@ public class HasHeaderKeyTest {
     }
 
     private SimpleConfig config(Map<String, String> props) {
-        return new SimpleConfig(new HasHeaderKey().config(), props);
+        return new SimpleConfig(new HasHeaderKey<>().config(), props);
     }
 
     private SourceRecord recordWithHeaders(String... headers) {

Reply via email to