This is an automated email from the ASF dual-hosted git repository.
tombentley pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 2edb95f KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing
warnings (#8571)
2edb95f is described below
commit 2edb95f5ac948173fee7cf342fb34067c80cbb4c
Author: Tom Bentley <[email protected]>
AuthorDate: Wed Jul 7 17:24:31 2021 +0100
KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing warnings (#8571)
Reviewers: Konstantine Karantasis <[email protected]>
---
build.gradle | 3 +-
.../kafka/connect/connector/ConnectRecord.java | 2 +-
.../apache/kafka/connect/data/ConnectSchema.java | 12 ++++----
.../connect/rest/ConnectRestExtensionContext.java | 2 +-
.../apache/kafka/connect/json/JsonConverter.java | 2 +-
.../kafka/connect/runtime/TransformationChain.java | 2 +-
.../runtime/isolation/DelegatingClassLoader.java | 33 ++++++++++++++++------
.../connect/runtime/isolation/PluginDesc.java | 2 +-
.../runtime/isolation/PluginScanResult.java | 16 +++++------
.../kafka/connect/runtime/isolation/Plugins.java | 4 +--
.../runtime/rest/ConnectRestConfigurable.java | 2 +-
.../rest/ConnectRestExtensionContextImpl.java | 6 ++--
.../runtime/rest/resources/LoggingResource.java | 4 +--
.../connect/storage/KafkaStatusBackingStore.java | 6 ++--
.../kafka/connect/runtime/AbstractHerderTest.java | 23 +++++++++++----
.../kafka/connect/runtime/ConnectorConfigTest.java | 12 ++++----
.../connect/runtime/isolation/PluginDescTest.java | 4 +++
.../resources/ConnectorPluginsResourceTest.java | 26 ++++++++---------
.../rest/resources/ConnectorsResourceTest.java | 12 ++++----
.../storage/KafkaConfigBackingStoreTest.java | 6 ++--
.../kafka/connect/transforms/ReplaceFieldTest.java | 7 +++--
.../transforms/predicates/HasHeaderKeyTest.java | 2 +-
22 files changed, 110 insertions(+), 78 deletions(-)
diff --git a/build.gradle b/build.gradle
index 208d6d5..b589525 100644
--- a/build.gradle
+++ b/build.gradle
@@ -222,7 +222,8 @@ subprojects {
options.encoding = 'UTF-8'
options.compilerArgs << "-Xlint:all"
// temporary exclusions until all the warnings are fixed
- options.compilerArgs << "-Xlint:-rawtypes"
+ if (!project.path.startsWith(":connect"))
+ options.compilerArgs << "-Xlint:-rawtypes"
options.compilerArgs << "-Xlint:-serial"
options.compilerArgs << "-Xlint:-try"
options.compilerArgs << "-Werror"
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index b181209..1cc756b 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -155,7 +155,7 @@ public abstract class ConnectRecord<R extends
ConnectRecord<R>> {
if (o == null || getClass() != o.getClass())
return false;
- ConnectRecord that = (ConnectRecord) o;
+ ConnectRecord<?> that = (ConnectRecord<?>) o;
return Objects.equals(kafkaPartition, that.kafkaPartition)
&& Objects.equals(topic, that.topic)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
index d977487..5e99a0a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
@@ -32,11 +32,11 @@ public class ConnectSchema implements Schema {
/**
* Maps Schema.Types to a list of Java classes that can be used to
represent them.
*/
- private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new
EnumMap<>(Type.class);
+ private static final Map<Type, List<Class<?>>> SCHEMA_TYPE_CLASSES = new
EnumMap<>(Type.class);
/**
* Maps known logical types to a list of Java classes that can be used to
represent them.
*/
- private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new
HashMap<>();
+ private static final Map<String, List<Class<?>>> LOGICAL_TYPE_CLASSES =
new HashMap<>();
/**
* Maps the Java classes to the corresponding Schema.Type.
@@ -60,7 +60,7 @@ public class ConnectSchema implements Schema {
SCHEMA_TYPE_CLASSES.put(Type.MAP,
Collections.singletonList(Map.class));
SCHEMA_TYPE_CLASSES.put(Type.STRUCT,
Collections.singletonList(Struct.class));
- for (Map.Entry<Type, List<Class>> schemaClasses :
SCHEMA_TYPE_CLASSES.entrySet()) {
+ for (Map.Entry<Type, List<Class<?>>> schemaClasses :
SCHEMA_TYPE_CLASSES.entrySet()) {
for (Class<?> schemaClass : schemaClasses.getValue())
JAVA_CLASS_SCHEMA_TYPES.put(schemaClass,
schemaClasses.getKey());
}
@@ -221,7 +221,7 @@ public class ConnectSchema implements Schema {
return;
}
- List<Class> expectedClasses = expectedClassesFor(schema);
+ List<Class<?>> expectedClasses = expectedClassesFor(schema);
if (expectedClasses == null)
throw new DataException("Invalid Java object for schema type " +
schema.type()
@@ -263,8 +263,8 @@ public class ConnectSchema implements Schema {
}
}
- private static List<Class> expectedClassesFor(Schema schema) {
- List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
+ private static List<Class<?>> expectedClassesFor(Schema schema) {
+ List<Class<?>> expectedClasses =
LOGICAL_TYPE_CLASSES.get(schema.name());
if (expectedClasses == null)
expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
return expectedClasses;
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
index 7608597..c951627 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
@@ -33,7 +33,7 @@ public interface ConnectRestExtensionContext {
*
* @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never
{@code null}
*/
- Configurable<? extends Configurable> configurable();
+ Configurable<? extends Configurable<?>> configurable();
/**
* Provides the cluster state and health information about the connectors
and tasks.
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index a7bf464..10fde8f 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -610,7 +610,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
else
throw new DataException("Invalid type for bytes type:
" + value.getClass());
case ARRAY: {
- Collection collection = (Collection) value;
+ Collection<?> collection = (Collection<?>) value;
ArrayNode list = JSON_NODE_FACTORY.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null :
schema.valueSchema();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index 5027cb5..6777a96 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -66,7 +66,7 @@ public class TransformationChain<R extends ConnectRecord<R>>
implements AutoClos
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- TransformationChain that = (TransformationChain) o;
+ TransformationChain<?> that = (TransformationChain<?>) o;
return Objects.equals(transformations, that.transformations);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 43ceba3..c41ae98 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -72,15 +72,15 @@ public class DelegatingClassLoader extends URLClassLoader {
private final SortedSet<PluginDesc<Connector>> connectors;
private final SortedSet<PluginDesc<Converter>> converters;
private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
- private final SortedSet<PluginDesc<Transformation>> transformations;
- private final SortedSet<PluginDesc<Predicate>> predicates;
+ private final SortedSet<PluginDesc<Transformation<?>>> transformations;
+ private final SortedSet<PluginDesc<Predicate<?>>> predicates;
private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>>
connectorClientConfigPolicies;
private final List<String> pluginPaths;
private static final String MANIFEST_PREFIX = "META-INF/services/";
- private static final Class[] SERVICE_LOADER_PLUGINS = new Class[]
{ConnectRestExtension.class, ConfigProvider.class};
+ private static final Class<?>[] SERVICE_LOADER_PLUGINS = new Class<?>[]
{ConnectRestExtension.class, ConfigProvider.class};
private static final Set<String> PLUGIN_MANIFEST_FILES =
Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin ->
MANIFEST_PREFIX + serviceLoaderPlugin.getName())
.collect(Collectors.toSet());
@@ -120,11 +120,11 @@ public class DelegatingClassLoader extends URLClassLoader
{
return headerConverters;
}
- public Set<PluginDesc<Transformation>> transformations() {
+ public Set<PluginDesc<Transformation<?>>> transformations() {
return transformations;
}
- public Set<PluginDesc<Predicate>> predicates() {
+ public Set<PluginDesc<Predicate<?>>> predicates() {
return predicates;
}
@@ -334,14 +334,24 @@ public class DelegatingClassLoader extends URLClassLoader
{
getPluginDesc(reflections, Connector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
- getPluginDesc(reflections, Transformation.class, loader),
- getPluginDesc(reflections, Predicate.class, loader),
+ getTransformationPluginDesc(loader, reflections),
+ getPredicatePluginDesc(loader, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
}
+ @SuppressWarnings({"unchecked"})
+ private Collection<PluginDesc<Predicate<?>>>
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) throws
ReflectiveOperationException {
+ return (Collection<PluginDesc<Predicate<?>>>) (Collection<?>)
getPluginDesc(reflections, Predicate.class, loader);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private Collection<PluginDesc<Transformation<?>>>
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) throws
ReflectiveOperationException {
+ return (Collection<PluginDesc<Transformation<?>>>) (Collection<?>)
getPluginDesc(reflections, Transformation.class, loader);
+ }
+
private <T> Collection<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
@@ -359,7 +369,7 @@ public class DelegatingClassLoader extends URLClassLoader {
Collection<PluginDesc<T>> result = new ArrayList<>();
for (Class<? extends T> plugin : plugins) {
if (PluginUtils.isConcrete(plugin)) {
- result.add(new PluginDesc<>(plugin, versionFor(plugin),
loader));
+ result.add(pluginDesc(plugin, versionFor(plugin), loader));
} else {
log.debug("Skipping {} as it is not concrete implementation",
plugin);
}
@@ -367,6 +377,11 @@ public class DelegatingClassLoader extends URLClassLoader {
return result;
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String
version, ClassLoader loader) {
+ return new PluginDesc(plugin, version, loader);
+ }
+
@SuppressWarnings("unchecked")
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T>
klass, ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
@@ -374,7 +389,7 @@ public class DelegatingClassLoader extends URLClassLoader {
try {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (T pluginImpl : serviceLoader) {
- result.add(new PluginDesc<>((Class<? extends T>)
pluginImpl.getClass(),
+ result.add(pluginDesc((Class<? extends T>)
pluginImpl.getClass(),
versionFor(pluginImpl), loader));
}
} finally {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index 904537a..62a7d6c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -103,7 +103,7 @@ public class PluginDesc<T> implements
Comparable<PluginDesc<T>> {
}
@Override
- public int compareTo(PluginDesc other) {
+ public int compareTo(PluginDesc<T> other) {
int nameComp = name.compareTo(other.name);
return nameComp != 0 ? nameComp :
encodedVersion.compareTo(other.encodedVersion);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index ac42dbb..e98945e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -33,20 +33,20 @@ public class PluginScanResult {
private final Collection<PluginDesc<Connector>> connectors;
private final Collection<PluginDesc<Converter>> converters;
private final Collection<PluginDesc<HeaderConverter>> headerConverters;
- private final Collection<PluginDesc<Transformation>> transformations;
- private final Collection<PluginDesc<Predicate>> predicates;
+ private final Collection<PluginDesc<Transformation<?>>> transformations;
+ private final Collection<PluginDesc<Predicate<?>>> predicates;
private final Collection<PluginDesc<ConfigProvider>> configProviders;
private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>>
connectorClientConfigPolicies;
- private final List<Collection> allPlugins;
+ private final List<Collection<?>> allPlugins;
public PluginScanResult(
Collection<PluginDesc<Connector>> connectors,
Collection<PluginDesc<Converter>> converters,
Collection<PluginDesc<HeaderConverter>> headerConverters,
- Collection<PluginDesc<Transformation>> transformations,
- Collection<PluginDesc<Predicate>> predicates,
+ Collection<PluginDesc<Transformation<?>>> transformations,
+ Collection<PluginDesc<Predicate<?>>> predicates,
Collection<PluginDesc<ConfigProvider>> configProviders,
Collection<PluginDesc<ConnectRestExtension>> restExtensions,
Collection<PluginDesc<ConnectorClientConfigOverridePolicy>>
connectorClientConfigPolicies
@@ -76,11 +76,11 @@ public class PluginScanResult {
return headerConverters;
}
- public Collection<PluginDesc<Transformation>> transformations() {
+ public Collection<PluginDesc<Transformation<?>>> transformations() {
return transformations;
}
- public Collection<PluginDesc<Predicate>> predicates() {
+ public Collection<PluginDesc<Predicate<?>>> predicates() {
return predicates;
}
@@ -98,7 +98,7 @@ public class PluginScanResult {
public boolean isEmpty() {
boolean isEmpty = true;
- for (Collection plugins : allPlugins) {
+ for (Collection<?> plugins : allPlugins) {
isEmpty = isEmpty && plugins.isEmpty();
}
return isEmpty;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 5ad8dac..2ea29c0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -156,11 +156,11 @@ public class Plugins {
return delegatingLoader.converters();
}
- public Set<PluginDesc<Transformation>> transformations() {
+ public Set<PluginDesc<Transformation<?>>> transformations() {
return delegatingLoader.transformations();
}
- public Set<PluginDesc<Predicate>> predicates() {
+ public Set<PluginDesc<Predicate<?>>> predicates() {
return delegatingLoader.predicates();
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
index 0d5cbd6..f33ce19 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
@@ -81,7 +81,7 @@ public class ConnectRestConfigurable implements
Configurable<ResourceConfig> {
}
@Override
- public ResourceConfig register(Object component, Class... contracts) {
+ public ResourceConfig register(Object component, Class<?>... contracts) {
if (allowedToRegister(component)) {
resourceConfig.register(component, contracts);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
index cdf282f..6d0a2a2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
@@ -24,11 +24,11 @@ import javax.ws.rs.core.Configurable;
public class ConnectRestExtensionContextImpl implements
ConnectRestExtensionContext {
- private Configurable<? extends Configurable> configurable;
+ private Configurable<? extends Configurable<?>> configurable;
private ConnectClusterState clusterState;
public ConnectRestExtensionContextImpl(
- Configurable<? extends Configurable> configurable,
+ Configurable<? extends Configurable<?>> configurable,
ConnectClusterState clusterState
) {
this.configurable = configurable;
@@ -36,7 +36,7 @@ public class ConnectRestExtensionContextImpl implements
ConnectRestExtensionCont
}
@Override
- public Configurable<? extends Configurable> configurable() {
+ public Configurable<? extends Configurable<?>> configurable() {
return configurable;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
index 0579660..ce9ce14 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
@@ -139,10 +139,10 @@ public class LoggingResource {
} else {
childLoggers = new ArrayList<>();
Logger ancestorLogger = lookupLogger(namedLogger);
- Enumeration en = currentLoggers();
+ Enumeration<Logger> en = currentLoggers();
boolean present = false;
while (en.hasMoreElements()) {
- Logger current = (Logger) en.nextElement();
+ Logger current = en.nextElement();
if (current.getName().startsWith(namedLogger)) {
childLoggers.add(current);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index eadbe18..44902c0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -302,7 +302,7 @@ public class KafkaStatusBackingStore implements
StatusBackingStore {
});
}
- private <V extends AbstractStatus> void send(final String key,
+ private <V extends AbstractStatus<?>> void send(final String key,
final V status,
final CacheEntry<V> entry,
final boolean safeWrite) {
@@ -492,7 +492,7 @@ public class KafkaStatusBackingStore implements
StatusBackingStore {
}
}
- private byte[] serialize(AbstractStatus status) {
+ private byte[] serialize(AbstractStatus<?> status) {
Struct struct = new Struct(STATUS_SCHEMA_V0);
struct.put(STATE_KEY_NAME, status.state().name());
if (status.trace() != null)
@@ -645,7 +645,7 @@ public class KafkaStatusBackingStore implements
StatusBackingStore {
}
}
- private static class CacheEntry<T extends AbstractStatus> {
+ private static class CacheEntry<T extends AbstractStatus<?>> {
private T value = null;
private int sequence = 0;
private boolean deleted = false;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 79c7d59..8c8d00d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -455,13 +455,14 @@ public class AbstractHerderTest {
verifyAll();
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
@Test()
public void testConfigValidationTransformsExtendResults() throws Throwable
{
AbstractHerder herder =
createConfigValidationHerder(TestSourceConnector.class,
noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
- Set<PluginDesc<Transformation>> transformations = new HashSet<>();
- transformations.add(new PluginDesc<>(SampleTransformation.class,
"1.0", classLoader));
+ Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
+ transformations.add(transformationPluginDesc());
EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
replayAll();
@@ -512,12 +513,12 @@ public class AbstractHerderTest {
AbstractHerder herder =
createConfigValidationHerder(TestSourceConnector.class,
noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
- Set<PluginDesc<Transformation>> transformations = new HashSet<>();
- transformations.add(new PluginDesc<>(SampleTransformation.class,
"1.0", classLoader));
+ Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
+ transformations.add(transformationPluginDesc());
EasyMock.expect(plugins.transformations()).andReturn(transformations).times(1);
- Set<PluginDesc<Predicate>> predicates = new HashSet<>();
- predicates.add(new PluginDesc<>(SamplePredicate.class, "1.0",
classLoader));
+ Set<PluginDesc<Predicate<?>>> predicates = new HashSet<>();
+ predicates.add(predicatePluginDesc());
EasyMock.expect(plugins.predicates()).andReturn(predicates).times(2);
replayAll();
@@ -579,6 +580,16 @@ public class AbstractHerderTest {
verifyAll();
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private PluginDesc<Predicate<?>> predicatePluginDesc() {
+ return new PluginDesc(SamplePredicate.class, "1.0", classLoader);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private PluginDesc<Transformation<?>> transformationPluginDesc() {
+ return new PluginDesc(SampleTransformation.class, "1.0", classLoader);
+ }
+
@Test()
public void testConfigValidationPrincipalOnlyOverride() throws Throwable {
AbstractHerder herder =
createConfigValidationHerder(TestSourceConnector.class, new
PrincipalConnectorClientConfigOverridePolicy());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index b7b7ee7..4abdbea 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -43,7 +43,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
public static final Plugins MOCK_PLUGINS = new Plugins(new HashMap<>()) {
@Override
- public Set<PluginDesc<Transformation>> transformations() {
+ public Set<PluginDesc<Transformation<?>>> transformations() {
return Collections.emptySet();
}
};
@@ -149,7 +149,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS,
props);
final List<Transformation<R>> transformations =
config.transformations();
assertEquals(1, transformations.size());
- final SimpleTransformation xform = (SimpleTransformation)
transformations.get(0);
+ final SimpleTransformation<R> xform = (SimpleTransformation<R>)
transformations.get(0);
assertEquals(42, xform.magicNumber);
}
@@ -177,8 +177,8 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS,
props);
final List<Transformation<R>> transformations =
config.transformations();
assertEquals(2, transformations.size());
- assertEquals(42, ((SimpleTransformation)
transformations.get(0)).magicNumber);
- assertEquals(84, ((SimpleTransformation)
transformations.get(1)).magicNumber);
+ assertEquals(42, ((SimpleTransformation<R>)
transformations.get(0)).magicNumber);
+ assertEquals(84, ((SimpleTransformation<R>)
transformations.get(1)).magicNumber);
}
@Test
@@ -427,11 +427,11 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
}
- public static class Key extends AbstractKeyValueTransformation {
+ public static class Key<R extends ConnectRecord<R>> extends
AbstractKeyValueTransformation<R> {
}
- public static class Value extends AbstractKeyValueTransformation {
+ public static class Value<R extends ConnectRecord<R>> extends
AbstractKeyValueTransformation<R> {
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
index 950aa90..72a2493 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
@@ -48,6 +48,7 @@ public class PluginDescTest {
pluginLoader = new PluginClassLoader(location, new URL[0],
systemLoader);
}
+ @SuppressWarnings("rawtypes")
@Test
public void testRegularPluginDesc() {
PluginDesc<Connector> connectorDesc = new PluginDesc<>(
@@ -75,6 +76,7 @@ public class PluginDescTest {
assertPluginDesc(transformDesc, Transformation.class, noVersion,
pluginLoader.location());
}
+ @SuppressWarnings("rawtypes")
@Test
public void testPluginDescWithSystemClassLoader() {
String location = "classpath";
@@ -129,6 +131,7 @@ public class PluginDescTest {
assertPluginDesc(converterDesc, Converter.class, nullVersion,
location);
}
+ @SuppressWarnings("rawtypes")
@Test
public void testPluginDescEquality() {
PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
@@ -176,6 +179,7 @@ public class PluginDescTest {
assertNotEquals(transformDescPluginPath, transformDescClasspath);
}
+ @SuppressWarnings("rawtypes")
@Test
public void testPluginDescComparison() {
PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 148b782..c1d06f9 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -67,7 +67,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import javax.ws.rs.BadRequestException;
import java.net.URL;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -77,6 +76,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
@@ -125,7 +125,7 @@ public class ConnectorPluginsResourceTest {
partialConfigs.add(configInfo);
configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true,
null, "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM",
"test.int.config", Collections.emptyList());
- configValueInfo = new ConfigValueInfo("test.int.config", "1",
Arrays.asList("1", "2", "3"), Collections.emptyList(), true);
+ configValueInfo = new ConfigValueInfo("test.int.config", "1",
asList("1", "2", "3"), Collections.emptyList(), true);
configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
configs.add(configInfo);
partialConfigs.add(configInfo);
@@ -137,7 +137,7 @@ public class ConnectorPluginsResourceTest {
partialConfigs.add(configInfo);
configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true,
null, "HIGH", "Test configuration for list type.", "Test", 2, "LONG",
"test.list.config", Collections.emptyList());
- configValueInfo = new ConfigValueInfo("test.list.config", "a,b",
Arrays.asList("a", "b", "c"), Collections.emptyList(), true);
+ configValueInfo = new ConfigValueInfo("test.list.config", "a,b",
asList("a", "b", "c"), Collections.emptyList(), true);
configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
configs.add(configInfo);
partialConfigs.add(configInfo);
@@ -145,13 +145,13 @@ public class ConnectorPluginsResourceTest {
CONFIG_INFOS = new
ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT,
Collections.singletonList("Test"), configs);
PARTIAL_CONFIG_INFOS = new
ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(),
PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs);
- Class<?>[] abstractConnectorClasses = {
+ List<Class<? extends Connector>> abstractConnectorClasses = asList(
Connector.class,
SourceConnector.class,
SinkConnector.class
- };
+ );
- Class<?>[] connectorClasses = {
+ List<Class<? extends Connector>> connectorClasses = asList(
VerifiableSourceConnector.class,
VerifiableSinkConnector.class,
MockSourceConnector.class,
@@ -159,17 +159,17 @@ public class ConnectorPluginsResourceTest {
MockConnector.class,
SchemaSourceConnector.class,
ConnectorPluginsResourceTestConnector.class
- };
+ );
try {
- for (Class<?> klass : abstractConnectorClasses) {
+ for (Class<? extends Connector> klass : abstractConnectorClasses) {
@SuppressWarnings("unchecked")
- MockConnectorPluginDesc pluginDesc = new
MockConnectorPluginDesc((Class<? extends Connector>) klass, "0.0.0");
+ MockConnectorPluginDesc pluginDesc = new
MockConnectorPluginDesc(klass, "0.0.0");
CONNECTOR_PLUGINS.add(pluginDesc);
}
- for (Class<?> klass : connectorClasses) {
+ for (Class<? extends Connector> klass : connectorClasses) {
@SuppressWarnings("unchecked")
- MockConnectorPluginDesc pluginDesc = new
MockConnectorPluginDesc((Class<? extends Connector>) klass);
+ MockConnectorPluginDesc pluginDesc = new
MockConnectorPluginDesc(klass);
CONNECTOR_PLUGINS.add(pluginDesc);
}
} catch (Exception e) {
@@ -490,7 +490,7 @@ public class ConnectorPluginsResourceTest {
@Override
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
- return Arrays.asList(1, 2, 3);
+ return asList(1, 2, 3);
}
@Override
@@ -502,7 +502,7 @@ public class ConnectorPluginsResourceTest {
private static class ListRecommender implements Recommender {
@Override
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
- return Arrays.asList("a", "b", "c");
+ return asList("a", "b", "c");
}
@Override
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 572c60a..1e94fa16 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -313,7 +313,7 @@ public class ConnectorsResourceTest {
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
-
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body),
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(201, new HashMap<>(),
new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
@@ -798,7 +798,7 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb);
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=true&includeTasks=" +
restartRequest.includeTasks() + "&onlyFailed=" + restartRequest.onlyFailed()),
- EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(),
null));
PowerMock.replayAll();
@@ -869,7 +869,7 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb);
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=true"),
- EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(),
null));
PowerMock.replayAll();
@@ -887,7 +887,7 @@ public class ConnectorsResourceTest {
expectAndCallbackException(cb, new NotAssignedException("not owner
test", ownerUrl));
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=false"),
- EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(),
null));
PowerMock.replayAll();
@@ -920,7 +920,7 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb);
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
- EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(),
null));
PowerMock.replayAll();
@@ -940,7 +940,7 @@ public class ConnectorsResourceTest {
expectAndCallbackException(cb, new NotAssignedException("not owner
test", ownerUrl));
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
- EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(),
null));
PowerMock.replayAll();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 662f0f8..18d92bf 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -978,7 +978,7 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testRecordToRestartRequest() throws Exception {
- ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
Struct struct = RESTART_REQUEST_STRUCTS.get(0);
SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
@@ -990,7 +990,7 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testRecordToRestartRequestOnlyFailedInconsistent() throws
Exception {
- ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
Struct struct = ONLY_FAILED_MISSING_STRUCT;
SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
@@ -1002,7 +1002,7 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testRecordToRestartRequestIncludeTasksInconsistent() throws
Exception {
- ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
Struct struct = INLUDE_TASKS_MISSING_STRUCT;
SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index e335b40..f8641a7 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -74,6 +74,7 @@ public class ReplaceFieldTest {
assertEquals(schema, transformedRecord.valueSchema());
}
+ @SuppressWarnings("unchecked")
@Test
public void schemaless() {
final Map<String, String> props = new HashMap<>();
@@ -91,7 +92,7 @@ public class ReplaceFieldTest {
final SinkRecord record = new SinkRecord("test", 0, null, null, null,
value, 0);
final SinkRecord transformedRecord = xform.apply(record);
- final Map updatedValue = (Map) transformedRecord.value();
+ final Map<String, Object> updatedValue = (Map<String, Object>)
transformedRecord.value();
assertEquals(3, updatedValue.size());
assertEquals(42, updatedValue.get("xyz"));
assertEquals(true, updatedValue.get("bar"));
@@ -144,7 +145,7 @@ public class ReplaceFieldTest {
assertNull(transformedRecord.valueSchema());
}
-
+ @SuppressWarnings("unchecked")
@Test
public void testExcludeBackwardsCompatibility() {
final Map<String, String> props = new HashMap<>();
@@ -162,7 +163,7 @@ public class ReplaceFieldTest {
final SinkRecord record = new SinkRecord("test", 0, null, null, null,
value, 0);
final SinkRecord transformedRecord = xform.apply(record);
- final Map updatedValue = (Map) transformedRecord.value();
+ final Map<String, Object> updatedValue = (Map<String, Object>)
transformedRecord.value();
assertEquals(3, updatedValue.size());
assertEquals(42, updatedValue.get("xyz"));
assertEquals(true, updatedValue.get("bar"));
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
index 39635f3..d21c98f 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
@@ -79,7 +79,7 @@ public class HasHeaderKeyTest {
}
private SimpleConfig config(Map<String, String> props) {
- return new SimpleConfig(new HasHeaderKey().config(), props);
+ return new SimpleConfig(new HasHeaderKey<>().config(), props);
}
private SourceRecord recordWithHeaders(String... headers) {