This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3df5464 MINOR: Fix a number of warnings in mirror/mirror-client (#8074) 3df5464 is described below commit 3df5464fca06d36b806e50c9d6db047d9d612651 Author: Mickael Maison <mimai...@users.noreply.github.com> AuthorDate: Thu Mar 26 11:59:21 2020 +0000 MINOR: Fix a number of warnings in mirror/mirror-client (#8074) Reviewers: Ismael Juma <ism...@juma.me.uk>, Ryanne Dolan <ryannedo...@gmail.com>, Andrew Choi <a24c...@edu.uwaterloo.ca> --- .../java/org/apache/kafka/connect/mirror/MirrorClientConfig.java | 2 +- .../java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java | 5 +---- .../org/apache/kafka/connect/mirror/MirrorCheckpointTask.java | 2 -- .../org/apache/kafka/connect/mirror/MirrorConnectorConfig.java | 8 ++++---- .../main/java/org/apache/kafka/connect/mirror/MirrorMaker.java | 4 ++-- .../java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java | 3 +-- .../main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java | 2 -- .../src/main/java/org/apache/kafka/connect/mirror/Scheduler.java | 2 +- 8 files changed, 10 insertions(+), 18 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index 0c163d8..9292198 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -47,7 +47,7 @@ import java.util.HashMap; public class MirrorClientConfig extends AbstractConfig { public static final String REPLICATION_POLICY_CLASS = "replication.policy.class"; private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention."; - public static final Class REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class; + public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class; public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator"; private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java index f934319..49da62d 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java @@ -25,9 +25,7 @@ import java.util.Set; import java.util.concurrent.TimeoutException; import java.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - + /** Convenience methods for multi-cluster environments. Wraps MirrorClient (@see MirrorClient). * <p> * Properties passed to these methods are used to construct internal Admin and Consumer clients. @@ -42,7 +40,6 @@ import org.slf4j.LoggerFactory; * </p> */ public final class RemoteClusterUtils { - private static final Logger log = LoggerFactory.getLogger(RemoteClusterUtils.class); // utility class private RemoteClusterUtils() {} diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index ac2c7ad..ab55cda 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -47,7 +47,6 @@ public class MirrorCheckpointTask extends SourceTask { private String checkpointsTopic; private Duration interval; private Duration pollTimeout; - private Duration adminTimeout; private TopicFilter topicFilter; private Set<String> consumerGroups; private ReplicationPolicy replicationPolicy; @@ -78,7 +77,6 @@ public class MirrorCheckpointTask extends SourceTask { replicationPolicy = config.replicationPolicy(); interval = config.emitCheckpointsInterval(); pollTimeout = config.consumerPollTimeout(); - adminTimeout = config.adminTimeout(); offsetSyncStore = new OffsetSyncStore(config); sourceAdminClient = AdminClient.create(config.sourceAdminConfig()); metrics = config.metrics(); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java index 527c1eb..72dd435 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java @@ -76,7 +76,7 @@ public class MirrorConnectorConfig extends AbstractConfig { public static final String TARGET_CLUSTER_ALIAS_DEFAULT = "target"; private static final String TARGET_CLUSTER_ALIAS_DOC = "Alias of target cluster. Used in metrics reporting."; public static final String REPLICATION_POLICY_CLASS = MirrorClientConfig.REPLICATION_POLICY_CLASS; - public static final Class REPLICATION_POLICY_CLASS_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_CLASS_DEFAULT; + public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_CLASS_DEFAULT; private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention."; public static final String REPLICATION_POLICY_SEPARATOR = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR; private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; @@ -171,14 +171,14 @@ public class MirrorConnectorConfig extends AbstractConfig { public static final String TOPIC_FILTER_CLASS = "topic.filter.class"; private static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. Selects topics to replicate."; - public static final Class TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class; + public static final Class<?> TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class; public static final String GROUP_FILTER_CLASS = "group.filter.class"; private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use. Selects consumer groups to replicate."; - public static final Class GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class; + public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class; public static final String CONFIG_PROPERTY_FILTER_CLASS = "config.property.filter.class"; private static final String CONFIG_PROPERTY_FILTER_CLASS_DOC = "ConfigPropertyFilter to use. Selects topic config " + " properties to replicate."; - public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; + public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index 497e691..fa73f8d 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -92,7 +92,7 @@ public class MirrorMaker { private static final ConnectorClientConfigOverridePolicy CLIENT_CONFIG_OVERRIDE_POLICY = new AllConnectorClientConfigOverridePolicy(); - private static final List<Class> CONNECTOR_CLASSES = Arrays.asList( + private static final List<Class<?>> CONNECTOR_CLASSES = Arrays.asList( MirrorSourceConnector.class, MirrorHeartbeatConnector.class, MirrorCheckpointConnector.class); @@ -200,7 +200,7 @@ public class MirrorMaker { } } - private void configureConnector(SourceAndTarget sourceAndTarget, Class connectorClass) { + private void configureConnector(SourceAndTarget sourceAndTarget, Class<?> connectorClass) { checkHerder(sourceAndTarget); Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass); herders.get(sourceAndTarget) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index df5d38f..059ab78 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -69,7 +69,6 @@ public class MirrorMakerConfig extends AbstractConfig { private static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter"; private static final String BYTE_ARRAY_CONVERTER_CLASS = "org.apache.kafka.connect.converters.ByteArrayConverter"; - private static final String REPLICATION_FACTOR = "replication.factor"; static final String SOURCE_CLUSTER_PREFIX = "source.cluster."; static final String TARGET_CLUSTER_PREFIX = "target.cluster."; @@ -175,7 +174,7 @@ public class MirrorMakerConfig extends AbstractConfig { } // loads properties of the form cluster.x.y.z and source->target.x.y.z - Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class connectorClass) { + Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class<?> connectorClass) { Map<String, String> props = new HashMap<>(); props.putAll(originalsStrings()); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java index ea9d2f7..04a92b9 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java @@ -100,12 +100,10 @@ class MirrorMetrics implements AutoCloseable { private final Map<String, GroupMetrics> groupMetrics = new HashMap<>(); private final String source; private final String target; - private final Set<String> groups; MirrorMetrics(MirrorTaskConfig taskConfig) { this.target = taskConfig.targetClusterAlias(); this.source = taskConfig.sourceClusterAlias(); - this.groups = taskConfig.taskConsumerGroups(); this.metrics = new Metrics(); // for side-effect diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java index 203c01d..20f2ca7 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java @@ -39,7 +39,7 @@ class Scheduler implements AutoCloseable { this.timeout = timeout; } - Scheduler(Class clazz, Duration timeout) { + Scheduler(Class<?> clazz, Duration timeout) { this("Scheduler for " + clazz.getSimpleName(), timeout); }