Repository: kafka Updated Branches: refs/heads/trunk 97e61d4ae -> d5fb7364a
KAFKA-4993; Fix findbugs warnings in kafka-clients Author: Colin P. Mccabe <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]>, Ismael Juma <[email protected]> Closes #2779 from cmccabe/KAFKA-4993 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d5fb7364 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d5fb7364 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d5fb7364 Branch: refs/heads/trunk Commit: d5fb7364aebf293c621b804a4585eb9ef1001864 Parents: 97e61d4 Author: Colin P. Mccabe <[email protected]> Authored: Wed Apr 5 22:17:32 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Apr 5 22:17:32 2017 +0100 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 20 +++--- .../producer/internals/ProducerBatch.java | 7 ++- .../kafka/common/config/AbstractConfig.java | 6 +- .../kafka/common/metrics/stats/Histogram.java | 3 +- .../common/security/kerberos/KerberosLogin.java | 2 +- .../org/apache/kafka/common/utils/Bytes.java | 2 + gradle/findbugs-exclude.xml | 66 +++++++++++++++++--- 7 files changed, 82 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d5fb7364/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 1ebce76..ffafddc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -323,15 +323,19 @@ public abstract class AbstractCoordinator implements Closeable { } private void closeHeartbeatThread() { - if (heartbeatThread != null) { + HeartbeatThread thread = null; + synchronized (this) { + if (heartbeatThread == null) + return; heartbeatThread.close(); - - try { - heartbeatThread.join(); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for consumer heartbeat thread to close"); - throw new InterruptException(e); - } + thread = heartbeatThread; + heartbeatThread = null; + } + try { + thread.join(); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for consumer heartbeat thread to close"); + throw new InterruptException(e); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d5fb7364/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 9621794..bacf0a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,7 +50,7 @@ public final class ProducerBatch { private final List<Thunk> thunks = new ArrayList<>(); private final MemoryRecordsBuilder recordsBuilder; - private volatile int attempts; + private final AtomicInteger attempts = new AtomicInteger(0); int recordCount; int maxRecordSize; private long lastAttemptMs; @@ -181,11 +182,11 @@ public final class ProducerBatch { } int attempts() { - return attempts; + return attempts.get(); } void reenqueued(long now) { - attempts++; + attempts.getAndIncrement(); lastAttemptMs = Math.max(lastAppendTime, now); lastAppendTime = Math.max(lastAppendTime, now); retry = true; http://git-wip-us.apache.org/repos/asf/kafka/blob/d5fb7364/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 386ba31..aa8cf0d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -55,9 +55,9 @@ public class AbstractConfig { @SuppressWarnings("unchecked") public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) { /* check that all the keys are really strings */ - for (Object key : originals.keySet()) - if (!(key instanceof String)) - throw new ConfigException(key.toString(), originals.get(key), "Key must be a string."); + for (Map.Entry<?, ?> entry : originals.entrySet()) + if (!(entry.getKey() instanceof String)) + throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); this.originals = (Map<String, ?>) originals; this.values = definition.parse(this.originals); this.used = Collections.synchronizedSet(new HashSet<String>()); http://git-wip-us.apache.org/repos/asf/kafka/blob/d5fb7364/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java index 7d4123e..3b1426e 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java @@ -126,7 +126,8 @@ public class Histogram { public LinearBinScheme(int numBins, double max) { this.bins = numBins; this.max = max; - this.scale = max / (numBins * (numBins - 1) / 2); + int denom = numBins * (numBins - 1) / 2; + this.scale = max / denom; } public int bins() { http://git-wip-us.apache.org/repos/asf/kafka/blob/d5fb7364/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java index 1a579ac..fe30a01 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java @@ -351,8 +351,8 @@ public class KerberosLogin extends AbstractLogin { if (!hasSufficientTimeElapsed()) { return; } - log.info("Initiating logout for {}", principal); synchronized (KerberosLogin.class) { + log.info("Initiating logout for {}", principal); // register most recent relogin attempt lastLogin = currentElapsedTime(); //clear up the kerberos state. But the tokens are not cleared! As per http://git-wip-us.apache.org/repos/asf/kafka/blob/d5fb7364/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java index 4099155..cc794c5 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java @@ -75,6 +75,8 @@ public class Bytes implements Comparable<Bytes> { public boolean equals(Object other) { if (this == other) return true; + if (other == null) + return false; // we intentionally use the function to compute hashcode here if (this.hashCode() != other.hashCode()) http://git-wip-us.apache.org/repos/asf/kafka/blob/d5fb7364/gradle/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml index 833d5c6..cdb6894 100644 --- a/gradle/findbugs-exclude.xml +++ b/gradle/findbugs-exclude.xml @@ -63,14 +63,6 @@ </Match> <Match> - <!-- Suppress warnings about comparing a config string to - ConfigDef.NO_DEFAULT_VALUE using object equality. This is intentional. --> - <Class name="org.apache.kafka.connect.runtime.AbstractHerder"/> - <Method name="convertConfigKey"/> - <Bug pattern="ES_COMPARING_STRINGS_WITH_EQ"/> - </Match> - - <Match> <!-- Suppress warnings about ignoring the return value of await. This is done intentionally because we use other clues to determine if the wait was cut short. --> @@ -95,4 +87,62 @@ <Class name="org.apache.kafka.connect.storage.OffsetStorageWriter"/> <Bug pattern="IS2_INCONSISTENT_SYNC"/> </Match> + + <Match> + <!-- Suppress a warning about intentional switch statement fallthrough. --> + <Class name="org.apache.kafka.common.security.authenticator.SaslClientAuthenticator"/> + <Method name="authenticate"/> + <Bug pattern="SF_SWITCH_FALLTHROUGH"/> + </Match> + + <Match> + <!-- Suppress a spurious warning about a missing default case. --> + <Or> + <Class name="org.apache.kafka.common.utils.Crc32"/> + <Class name="org.apache.kafka.common.utils.PureJavaCrc32C"/> + </Or> + <Method name="update"/> + <Bug pattern="SF_SWITCH_NO_DEFAULT"/> + </Match> + + <Match> + <!-- Suppress a warning about intentional missing default cases and fallthroughs. --> + <Class name="org.apache.kafka.common.utils.Utils"/> + <Method name="murmur2"/> + <Or> + <Bug pattern="SF_SWITCH_NO_DEFAULT"/> + <Bug pattern="SF_SWITCH_FALLTHROUGH"/> + </Or> + </Match> + + <Match> + <!-- Suppress a spurious warning about locks not being released on all paths. + This happens because there is an 'if' statement that checks if we have the lock before + releasing it. --> + <Class name="org.apache.kafka.clients.producer.internals.BufferPool"/> + <Method name="allocate"/> + <Bug pattern="UL_UNRELEASED_LOCK"/> + </Match> + + <Match> + <!-- Suppress warnings about synchronizing on the UnsentRequests + ConcurrentHashMap. This is done deliberately. --> + <Package name="org.apache.kafka.clients.consumer.internals"/> + <Source name="ConsumerNetworkClient.java"/> + <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/> + </Match> + + <Match> + <!-- Suppress inconsistent synchronization warnings about KerberosLogin#login. + See KAFKA-4991 for details. --> + <Class name="org.apache.kafka.common.security.kerberos.KerberosLogin"/> + <Bug pattern="IS2_INCONSISTENT_SYNC"/> + </Match> + + <Match> + <!-- Suppress inconsistent synchronization warnings about + AbstractCoordinator#coordinator. See KAFKA-4992 for details.--> + <Class name="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"/> + <Bug pattern="IS2_INCONSISTENT_SYNC"/> + </Match> </FindBugsFilter>
