Repository: kafka Updated Branches: refs/heads/trunk ca2979f84 -> f812a8fd9
KAFKA-4977: Fix findbugs issues in connect/runtime Author: Colin P. Mccabe <[email protected]> Reviewers: Konstantine Karantasis <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #2763 from cmccabe/KAFKA-4977 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f812a8fd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f812a8fd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f812a8fd Branch: refs/heads/trunk Commit: f812a8fd93c37cb27db07505dd5f8f29463906a1 Parents: ca2979f Author: Colin P. Mccabe <[email protected]> Authored: Mon Apr 3 17:57:12 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Apr 3 17:57:12 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/connect/runtime/Worker.java | 6 ++-- .../runtime/distributed/DistributedHerder.java | 15 +++++++++ .../rest/entities/ConnectorStateInfo.java | 16 +++++++++ .../kafka/connect/tools/SchemaSourceTask.java | 6 ++-- .../kafka/connect/util/ConnectorTaskId.java | 2 +- gradle/findbugs-exclude.xml | 34 ++++++++++++++++++++ 6 files changed, 71 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 1801e1b..400ae08 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -102,9 +102,9 @@ public class Worker { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the // worker, but this may compromise the delivery guarantees of Kafka Connect. - producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString()); - producerProps.put(ProducerConfig.RETRIES_CONFIG, ((Integer) Integer.MAX_VALUE).toString()); - producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString()); + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); + producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)); + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); // User-specified overrides http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index cf30aca..e908d0b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; @@ -1116,6 +1117,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable { final int cmp = Long.compare(at, o.at); return cmp == 0 ? Long.compare(seq, o.seq) : cmp; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof HerderRequest)) + return false; + HerderRequest other = (HerderRequest) o; + return compareTo(other) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(at, seq); + } } private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java index defe2bb..c19e20b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import java.util.Objects; public class ConnectorStateInfo { @@ -103,6 +104,21 @@ public class ConnectorStateInfo { public int compareTo(TaskState that) { return Integer.compare(this.id, that.id); } + + @Override + public boolean equals(Object o) { + if (o == this) + return true; + if (!(o instanceof TaskState)) + return false; + TaskState other = (TaskState) o; + return compareTo(other) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java index 2955fb5..6a51b52 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java @@ -159,9 +159,7 @@ public class SchemaSourceTask extends SourceTask { count++; return result; } else { - synchronized (this) { - this.wait(); - } + throttler.throttle(); return new ArrayList<>(); } } @@ -170,4 +168,4 @@ public class SchemaSourceTask extends SourceTask { public void stop() { throttler.wakeup(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java index b62f87c..03a51f2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java @@ -79,6 +79,6 @@ public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId int connectorCmp = connector.compareTo(o.connector); if (connectorCmp != 0) return connectorCmp; - return ((Integer) task).compareTo(o.task); + return Integer.compare(task, o.task); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/gradle/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml index 932eb3e..833d5c6 100644 --- a/gradle/findbugs-exclude.xml +++ b/gradle/findbugs-exclude.xml @@ -61,4 +61,38 @@ benchmarking. --> <Package name="org.apache.kafka.jmh.cache.generated"/> </Match> + + <Match> + <!-- Suppress warnings about comparing a config string to + ConfigDef.NO_DEFAULT_VALUE using object equality. This is intentional. --> + <Class name="org.apache.kafka.connect.runtime.AbstractHerder"/> + <Method name="convertConfigKey"/> + <Bug pattern="ES_COMPARING_STRINGS_WITH_EQ"/> + </Match> + + <Match> + <!-- Suppress warnings about ignoring the return value of await. + This is done intentionally because we use other clues to determine + if the wait was cut short. --> + <Class name="org.apache.kafka.connect.runtime.WorkerSourceTask"/> + <Method name="execute"/> + <Bug pattern="RV_RETURN_VALUE_IGNORED"/> + </Match> + + <Match> + <!-- Suppress some warnings about intentional switch statement fallthrough. --> + <Class name="org.apache.kafka.connect.runtime.WorkerConnector"/> + <Or> + <Method name="doStart"/> + <Method name="pause"/> + </Or> + <Bug pattern="SF_SWITCH_FALLTHROUGH"/> + </Match> + + <Match> + <!-- Suppress some inconsistent synchronization warnings. TODO: fix these. See + KAFKA-4994. --> + <Class name="org.apache.kafka.connect.storage.OffsetStorageWriter"/> + <Bug pattern="IS2_INCONSISTENT_SYNC"/> + </Match> </FindBugsFilter>
