This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98febb989ab KAFKA-15485: Fix "this-escape" compiler warnings
introduced by JDK 21 (1/N) (#14427)
98febb989ab is described below
commit 98febb989abd1bdb624420f21122c477f2614a08
Author: Ismael Juma <[email protected]>
AuthorDate: Sun Sep 24 05:59:29 2023 -0700
KAFKA-15485: Fix "this-escape" compiler warnings introduced by JDK 21 (1/N)
(#14427)
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible
change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to
deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <[email protected]>, Satish Duggana
<[email protected]>, Chris Egerton <[email protected]>
---
.../clients/consumer/internals/AbstractCoordinator.java | 2 +-
.../consumer/internals/DefaultBackgroundThread.java | 4 ++--
.../consumer/internals/MembershipManagerImpl.java | 2 +-
.../consumer/internals/OffsetsRequestManager.java | 3 ++-
.../org/apache/kafka/clients/producer/KafkaProducer.java | 2 +-
.../org/apache/kafka/common/config/AbstractConfig.java | 2 +-
.../java/org/apache/kafka/common/config/ConfigDef.java | 5 +++--
.../java/org/apache/kafka/common/metrics/Metrics.java | 1 +
.../org/apache/kafka/common/protocol/types/Schema.java | 1 +
.../kafka/common/record/LazyDownConversionRecords.java | 2 +-
.../apache/kafka/common/record/MemoryRecordsBuilder.java | 2 +-
.../apache/kafka/common/requests/DeleteAclsResponse.java | 2 +-
.../kafka/common/requests/DescribeAclsResponse.java | 2 +-
.../security/authenticator/SaslClientAuthenticator.java | 1 +
.../internals/OAuthBearerSaslClientProvider.java | 1 +
.../internals/OAuthBearerSaslServerProvider.java | 1 +
.../internals/unsecured/OAuthBearerUnsecuredJws.java | 16 ++++++++--------
.../plain/internals/PlainSaslServerProvider.java | 2 +-
.../scram/internals/ScramSaslClientProvider.java | 2 +-
.../scram/internals/ScramSaslServerProvider.java | 2 +-
.../kafka/common/utils/ImplicitLinkedHashCollection.java | 1 +
.../java/org/apache/kafka/common/utils/KafkaThread.java | 2 ++
.../org/apache/kafka/common/utils/PureJavaCrc32C.java | 2 +-
.../org/apache/kafka/clients/admin/MockAdminClient.java | 4 +++-
.../org/apache/kafka/common/network/NioEchoServer.java | 1 +
.../org/apache/kafka/common/network/PlaintextSender.java | 1 +
.../java/org/apache/kafka/common/network/SslSender.java | 1 +
.../common/security/ssl/DefaultSslEngineFactoryTest.java | 1 +
.../kafka/common/security/ssl/mock/TestProvider.java | 1 +
.../org/apache/kafka/common/utils/MockScheduler.java | 1 +
.../apache/kafka/connect/json/JsonConverterConfig.java | 1 +
.../kafka/connect/mirror/MirrorConnectorConfig.java | 1 +
.../apache/kafka/connect/mirror/MirrorMakerConfig.java | 1 +
.../kafka/connect/runtime/SourceConnectorConfig.java | 1 +
.../java/org/apache/kafka/connect/runtime/Worker.java | 1 +
.../org/apache/kafka/connect/runtime/WorkerConfig.java | 1 +
.../apache/kafka/connect/runtime/WorkerConnector.java | 6 +++---
.../org/apache/kafka/connect/runtime/WorkerInfo.java | 4 ++--
.../connect/runtime/distributed/DistributedConfig.java | 1 +
.../apache/kafka/connect/runtime/isolation/Plugins.java | 1 +
.../apache/kafka/connect/runtime/rest/RestServer.java | 6 +++---
.../kafka/connect/storage/KafkaConfigBackingStore.java | 1 +
.../src/main/java/kafka/log/remote/RemoteLogManager.java | 7 ++++---
.../org/apache/kafka/message/MessageDataGenerator.java | 2 +-
.../apache/kafka/image/loader/MetadataBatchLoader.java | 2 +-
.../org/apache/kafka/raft/MockExpirationService.java | 1 +
.../apache/kafka/server/fault/FaultHandlerException.java | 2 ++
.../org/apache/kafka/server/util/ShutdownableThread.java | 1 +
.../org/apache/kafka/server/util/timer/TimerTask.java | 2 +-
.../apache/kafka/server/util/timer/TimerTaskEntry.java | 1 +
.../java/org/apache/kafka/timeline/TimelineInteger.java | 1 +
.../java/org/apache/kafka/timeline/TimelineLong.java | 1 +
.../java/org/apache/kafka/timeline/TimelineObject.java | 1 +
.../test/java/org/apache/kafka/server/util/MockTime.java | 1 +
.../storage/FileBasedRemoteLogMetadataCache.java | 3 ++-
.../storage/serialization/RemoteLogMetadataSerde.java | 6 +++---
.../storage/internals/epoch/LeaderEpochFileCache.java | 1 +
.../kafka/storage/internals/log/AbstractIndex.java | 1 +
.../apache/kafka/storage/internals/log/LogConfig.java | 2 +-
.../apache/kafka/storage/internals/log/OffsetIndex.java | 1 +
.../apache/kafka/storage/internals/log/TimeIndex.java | 1 +
.../main/java/org/apache/kafka/streams/KafkaStreams.java | 1 +
.../java/org/apache/kafka/streams/StreamsBuilder.java | 1 +
.../java/org/apache/kafka/streams/StreamsConfig.java | 1 +
.../java/org/apache/kafka/streams/TopologyConfig.java | 1 +
.../streams/kstream/internals/MaterializedInternal.java | 1 +
.../processor/internals/GlobalStateManagerImpl.java | 2 +-
.../processor/internals/ProcessorContextImpl.java | 1 +
.../kafka/streams/processor/internals/StreamTask.java | 2 +-
.../kafka/streams/processor/internals/StreamThread.java | 1 +
.../processor/internals/tasks/DefaultTaskManager.java | 1 +
.../kafka/streams/integration/utils/KafkaEmbedded.java | 2 +-
.../processor/internals/StoreChangelogReaderTest.java | 1 +
.../internals/MeteredTimestampedKeyValueStoreTest.java | 1 +
.../internals/TimestampedWindowStoreBuilderTest.java | 3 ++-
.../apache/kafka/test/InternalMockProcessorContext.java | 1 +
.../java/org/apache/kafka/test/MockRestoreConsumer.java | 1 +
.../kafka/trogdor/workload/ConfigurableProducerSpec.java | 1 +
.../kafka/trogdor/workload/ConnectionStressSpec.java | 1 +
.../apache/kafka/trogdor/workload/ConsumeBenchSpec.java | 1 +
.../apache/kafka/trogdor/workload/ProduceBenchSpec.java | 1 +
.../kafka/trogdor/workload/RoundTripWorkloadSpec.java | 1 +
.../kafka/trogdor/workload/SustainedConnectionSpec.java | 1 +
83 files changed, 110 insertions(+), 49 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 4265c4dc672..bb3c6b93030 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1291,7 +1291,7 @@ public abstract class AbstractCoordinator implements
Closeable {
}
}
- protected Meter createMeter(Metrics metrics, String groupName, String
baseName, String descriptiveName) {
+ protected final Meter createMeter(Metrics metrics, String groupName,
String baseName, String descriptiveName) {
return new Meter(new WindowedCount(),
metrics.metricName(baseName + "-rate", groupName,
String.format("The number of %s per second",
descriptiveName)),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 8150f56debb..90e27559f90 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -266,11 +266,11 @@ public class DefaultBackgroundThread extends KafkaThread {
return this.running;
}
- public void wakeup() {
+ public final void wakeup() {
networkClientDelegate.wakeup();
}
- public void close() {
+ public final void close() {
this.running = false;
this.wakeup();
Utils.closeQuietly(networkClientDelegate, "network client utils");
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index b23738dda77..ad425f41d72 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -77,7 +77,7 @@ public class MembershipManagerImpl implements
MembershipManager {
* @param assignorSelection New assignor selection
* @throws IllegalArgumentException If the provided assignor selection is
null
*/
- public void setAssignorSelection(AssignorSelection assignorSelection) {
+ public final void setAssignorSelection(AssignorSelection
assignorSelection) {
if (assignorSelection == null) {
throw new IllegalArgumentException("Assignor selection cannot be
null");
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 3a4420a556b..0e333618255 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -84,6 +84,7 @@ public class OffsetsRequestManager implements RequestManager,
ClusterResourceLis
private final ApiVersions apiVersions;
private final NetworkClientDelegate networkClientDelegate;
+ @SuppressWarnings("this-escape")
public OffsetsRequestManager(final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
final IsolationLevel isolationLevel,
@@ -620,4 +621,4 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
int requestsToSend() {
return requestsToSend.size();
}
-}
\ No newline at end of file
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 0b119b113a0..463bd5ca0ba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -333,7 +333,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
// visible for testing
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "this-escape"})
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 2280813db1e..84bae97a03a 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -101,7 +101,7 @@ public class AbstractConfig {
* the constructor to resolve any variables in
{@code originals}; may be null or empty
* @param doLog whether the configurations should be logged
*/
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "this-escape"})
public AbstractConfig(ConfigDef definition, Map<?, ?> originals,
Map<String, ?> configProviderProps, boolean doLog) {
/* check that all the keys are really strings */
for (Map.Entry<?, ?> entry : originals.entrySet())
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 5954cbf29c9..0523b4a4ca9 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -1172,10 +1172,11 @@ public class ConfigDef {
boolean internalConfig) {
this.name = name;
this.type = type;
- this.defaultValue = NO_DEFAULT_VALUE.equals(defaultValue) ?
NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+ boolean hasDefault = !NO_DEFAULT_VALUE.equals(defaultValue);
+ this.defaultValue = hasDefault ? parseType(name, defaultValue,
type) : NO_DEFAULT_VALUE;
this.validator = validator;
this.importance = importance;
- if (this.validator != null && hasDefault())
+ if (this.validator != null && hasDefault)
this.validator.ensureValid(name, this.defaultValue);
this.documentation = documentation;
this.dependents = dependents;
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 22c1aff4886..6447cdb5c7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -154,6 +154,7 @@ public class Metrics implements Closeable {
* @param enableExpiration true if the metrics instance can garbage
collect inactive sensors, false otherwise
* @param metricsContext The metricsContext to initialize metrics reporter
with
*/
+ @SuppressWarnings("this-escape")
public Metrics(MetricConfig defaultConfig, List<MetricsReporter>
reporters, Time time, boolean enableExpiration,
MetricsContext metricsContext) {
this.config = defaultConfig;
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index aa6ffbef771..7e82e10f2da 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -53,6 +53,7 @@ public class Schema extends Type {
*
* @throws SchemaException If the given list have duplicate fields
*/
+ @SuppressWarnings("this-escape")
public Schema(boolean tolerateMissingFieldsWithDefaults, Field... fs) {
this.fields = new BoundField[fs.length];
this.fieldsByName = new HashMap<>();
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
index 56ef8e1609a..4745d01441b 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -116,7 +116,7 @@ public class LazyDownConversionRecords implements
BaseRecords {
")";
}
- public java.util.Iterator<ConvertedRecords<?>> iterator(long
maximumReadSize) {
+ public final java.util.Iterator<ConvertedRecords<?>> iterator(long
maximumReadSize) {
// We typically expect only one iterator instance to be created, so
null out the first converted batch after
// first use to make it available for GC.
ConvertedRecords firstBatch = firstConvertedBatch;
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 479b306a1d6..054d72ee822 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -224,7 +224,7 @@ public class MemoryRecordsBuilder implements AutoCloseable {
return isTransactional;
}
- public boolean hasDeleteHorizonMs() {
+ public final boolean hasDeleteHorizonMs() {
return magic >= RecordBatch.MAGIC_VALUE_V2 && deleteHorizonMs >= 0L;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index d0b596ed91a..27e7c581c33 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -65,7 +65,7 @@ public class DeleteAclsResponse extends AbstractResponse {
data.setThrottleTimeMs(throttleTimeMs);
}
- public List<DeleteAclsResponseData.DeleteAclsFilterResult> filterResults()
{
+ public final List<DeleteAclsResponseData.DeleteAclsFilterResult>
filterResults() {
return data.filterResults();
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index c602dab9951..3199e82f975 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -85,7 +85,7 @@ public class DescribeAclsResponse extends AbstractResponse {
return errorCounts(Errors.forCode(data.errorCode()));
}
- public List<DescribeAclsResource> acls() {
+ public final List<DescribeAclsResource> acls() {
return data.resources();
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index e502f80d5c0..a84c4f72901 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -167,6 +167,7 @@ public class SaslClientAuthenticator implements
Authenticator {
// Version of SaslHandshake request/responses
private short saslHandshakeVersion;
+ @SuppressWarnings("this-escape")
public SaslClientAuthenticator(Map<String, ?> configs,
AuthenticateCallbackHandler callbackHandler,
String node,
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
index 08777ef593e..519a7720c49 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslCli
public class OAuthBearerSaslClientProvider extends Provider {
private static final long serialVersionUID = 1L;
+ @SuppressWarnings("this-escape")
protected OAuthBearerSaslClientProvider() {
super("SASL/OAUTHBEARER Client Provider", 1.0, "SASL/OAUTHBEARER
Client Provider for Kafka");
put("SaslClientFactory." +
OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
index 2e179ce94c5..a902abc40b8 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslSer
public class OAuthBearerSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
+ @SuppressWarnings("this-escape")
protected OAuthBearerSaslServerProvider() {
super("SASL/OAUTHBEARER Server Provider", 1.0, "SASL/OAUTHBEARER
Server Provider for Kafka");
put("SaslServerFactory." +
OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
index fa175b3d34c..e804bfc95a8 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
@@ -121,7 +121,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
*
* @return the 3 or 5 dot-separated sections of the JWT compact
serialization
*/
- public List<String> splits() {
+ public final List<String> splits() {
return splits;
}
@@ -130,7 +130,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
*
* @return the JOSE header
*/
- public Map<String, Object> header() {
+ public final Map<String, Object> header() {
return header;
}
@@ -159,7 +159,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
*
* @return the (always non-null but possibly empty) claims
*/
- public Map<String, Object> claims() {
+ public final Map<String, Object> claims() {
return claims;
}
@@ -191,7 +191,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
* Number.class, or List.class
* @return true if the claim exists and is the given type, otherwise false
*/
- public boolean isClaimType(String claimName, Class<?> type) {
+ public final boolean isClaimType(String claimName, Class<?> type) {
Object value = rawClaim(claimName);
Objects.requireNonNull(type);
if (value == null)
@@ -215,7 +215,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
* @throws OAuthBearerIllegalTokenException
* if the claim exists but is not the given type
*/
- public <T> T claim(String claimName, Class<T> type) throws
OAuthBearerIllegalTokenException {
+ public final <T> T claim(String claimName, Class<T> type) throws
OAuthBearerIllegalTokenException {
Object value = rawClaim(claimName);
try {
return Objects.requireNonNull(type).cast(value);
@@ -233,7 +233,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
* the mandatory JWT claim name
* @return the raw claim value, if it exists, otherwise null
*/
- public Object rawClaim(String claimName) {
+ public final Object rawClaim(String claimName) {
return claims().get(Objects.requireNonNull(claimName));
}
@@ -248,7 +248,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
- public Number expirationTime() throws OAuthBearerIllegalTokenException {
+ public final Number expirationTime() throws
OAuthBearerIllegalTokenException {
return claim("exp", Number.class);
}
@@ -343,7 +343,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
}
private Set<String> calculateScope() {
- String scopeClaimName = scopeClaimName();
+ String scopeClaimName = this.scopeClaimName;
if (isClaimType(scopeClaimName, String.class)) {
String scopeClaimValue = claim(scopeClaimName, String.class);
if (Utils.isBlank(scopeClaimValue))
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
index 33c3dc5623d..128071f6c0b 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
@@ -25,7 +25,7 @@ public class PlainSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"deprecation", "this-escape"})
protected PlainSaslServerProvider() {
super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN
Server Provider for Kafka");
put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM,
PlainSaslServerFactory.class.getName());
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
index 8c5b85a1a92..08093821810 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
@@ -25,7 +25,7 @@ public class ScramSaslClientProvider extends Provider {
private static final long serialVersionUID = 1L;
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"deprecation", "this-escape"})
protected ScramSaslClientProvider() {
super("SASL/SCRAM Client Provider", 1.0, "SASL/SCRAM Client Provider
for Kafka");
for (ScramMechanism mechanism : ScramMechanism.values())
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
index 6a868600722..1c479d80b8b 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
@@ -25,7 +25,7 @@ public class ScramSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"deprecation", "this-escape"})
protected ScramSaslServerProvider() {
super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider
for Kafka");
for (ScramMechanism mechanism : ScramMechanism.values())
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
index 193ef6c2a18..100d524d34a 100644
---
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
+++
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
@@ -553,6 +553,7 @@ public class ImplicitLinkedHashCollection<E extends
ImplicitLinkedHashCollection
* @param iter We will add all the elements accessible
through this iterator
* to the set.
*/
+ @SuppressWarnings("this-escape")
public ImplicitLinkedHashCollection(Iterator<E> iter) {
clear(0);
while (iter.hasNext()) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
index 13430e446a6..2f5e15f4634 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
@@ -34,11 +34,13 @@ public class KafkaThread extends Thread {
return new KafkaThread(name, runnable, false);
}
+ @SuppressWarnings("this-escape")
public KafkaThread(final String name, boolean daemon) {
super(name);
configureThread(name, daemon);
}
+ @SuppressWarnings("this-escape")
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/PureJavaCrc32C.java
b/clients/src/main/java/org/apache/kafka/common/utils/PureJavaCrc32C.java
index 8abc93dc3b4..e78b83ee91c 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/PureJavaCrc32C.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/PureJavaCrc32C.java
@@ -49,7 +49,7 @@ public class PureJavaCrc32C implements Checksum {
}
@Override
- public void reset() {
+ public final void reset() {
crc = 0xffffffff;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 3d43d75d1d2..e2d46c324e2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -118,6 +118,7 @@ public class MockAdminClient extends AdminClient {
private Map<String, Short> minSupportedFeatureLevels =
Collections.emptyMap();
private Map<String, Short> maxSupportedFeatureLevels =
Collections.emptyMap();
+ @SuppressWarnings("this-escape")
public Builder() {
numBrokers(1);
}
@@ -217,6 +218,7 @@ public class MockAdminClient extends AdminClient {
Collections.emptyMap());
}
+ @SuppressWarnings("this-escape")
private MockAdminClient(
List<Node> brokers,
Node controller,
@@ -250,7 +252,7 @@ public class MockAdminClient extends AdminClient {
this.maxSupportedFeatureLevels = new
HashMap<>(maxSupportedFeatureLevels);
}
- synchronized public void controller(Node controller) {
+ public synchronized void controller(Node controller) {
if (!brokers.contains(controller))
throw new IllegalArgumentException("The controller node must be in
the list of brokers");
this.controller = controller;
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 5cf1bbca989..4d15d217590 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -101,6 +101,7 @@ public class NioEchoServer extends Thread {
new DelegationTokenCache(ScramMechanism.mechanismNames()));
}
+ @SuppressWarnings("this-escape")
public NioEchoServer(ListenerName listenerName, SecurityProtocol
securityProtocol, AbstractConfig config,
String serverHost, ChannelBuilder channelBuilder, CredentialCache
credentialCache,
int failedAuthenticationDelayMs, Time time, DelegationTokenCache
tokenCache) throws Exception {
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
index 88333e02a26..8fa28e9446e 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
@@ -25,6 +25,7 @@ import java.net.Socket;
*/
public class PlaintextSender extends Thread {
+ @SuppressWarnings("this-escape")
public PlaintextSender(final InetSocketAddress serverAddress, final byte[]
payload) {
super(() -> {
try (Socket connection = new Socket(serverAddress.getAddress(),
serverAddress.getPort());
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
index 928fe0d26d3..dce170d0465 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
@@ -33,6 +33,7 @@ public class SslSender extends Thread {
private final byte[] payload;
private final CountDownLatch handshaked = new CountDownLatch(1);
+ @SuppressWarnings("this-escape")
public SslSender(String tlsProtocol, InetSocketAddress serverAddress,
byte[] payload) {
this.tlsProtocol = tlsProtocol;
this.serverAddress = serverAddress;
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
index dfc3291ab62..589098ae10a 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
@@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+@SuppressWarnings("this-escape")
public class DefaultSslEngineFactoryTest {
/*
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
index fb44d3c994f..96641943ece 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
@@ -27,6 +27,7 @@ public class TestProvider extends Provider {
this("TestProvider", 0.1, "provider for test cases");
}
+ @SuppressWarnings("this-escape")
protected TestProvider(String name, double version, String info) {
super(name, version, info);
super.put(KEY_MANAGER_FACTORY, TestKeyManagerFactory.class.getName());
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index d6edd914e18..98023f8a4e0 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -42,6 +42,7 @@ public class MockScheduler implements Scheduler,
MockTime.Listener {
*/
private final TreeMap<Long, List<KafkaFutureImpl<Long>>> waiters = new
TreeMap<>();
+ @SuppressWarnings("this-escape")
public MockScheduler(MockTime time) {
this.time = time;
time.addListener(this);
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
index 462c061dde2..add8bec5b33 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
@@ -87,6 +87,7 @@ public class JsonConverterConfig extends ConverterConfig {
private final DecimalFormat decimalFormat;
private final boolean replaceNullWithDefault;
+ @SuppressWarnings("this-escape")
public JsonConverterConfig(Map<String, ?> props) {
super(CONFIG, props);
this.schemasEnabled = getBoolean(SCHEMAS_ENABLE_CONFIG);
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 2a12c0ec1b6..731ef2015c4 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -116,6 +116,7 @@ public abstract class MirrorConnectorConfig extends
AbstractConfig {
private final ReplicationPolicy replicationPolicy;
+ @SuppressWarnings("this-escape")
protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String>
props) {
super(configDef, props, true);
replicationPolicy = getConfiguredInstance(REPLICATION_POLICY_CLASS,
ReplicationPolicy.class);
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 46cfda9ac77..fd672f56a6c 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -90,6 +90,7 @@ public class MirrorMakerConfig extends AbstractConfig {
private final Map<String, String> rawProperties;
+ @SuppressWarnings("this-escape")
public MirrorMakerConfig(Map<String, String> props) {
super(config(), props, true);
plugins = new Plugins(originalsStrings());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index a730f1cba84..6e7f77bb672 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -247,6 +247,7 @@ public class SourceConnectorConfig extends ConnectorConfig {
return newDef;
}
+ @SuppressWarnings("this-escape")
public SourceConnectorConfig(Plugins plugins, Map<String, String> props,
boolean createTopics) {
super(plugins, configDef(), props);
if (createTopics && props.entrySet().stream().anyMatch(e ->
e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 2f2aa762038..3a586ea6186 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -171,6 +171,7 @@ public class Worker {
this(workerId, time, plugins, config, globalOffsetBackingStore,
Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy,
Admin::create);
}
+ @SuppressWarnings("this-escape")
Worker(
String workerId,
Time time,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index c1fd69317bc..fe28918a29a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -436,6 +436,7 @@ public class WorkerConfig extends AbstractConfig {
}
}
+ @SuppressWarnings("this-escape")
public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
logInternalConverterRemovalWarnings(props);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 680cb7a6f29..f2a289f92c7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -381,15 +381,15 @@ public class WorkerConnector implements Runnable {
}
}
- public boolean isSinkConnector() {
+ public final boolean isSinkConnector() {
return ConnectUtils.isSinkConnector(connector);
}
- public boolean isSourceConnector() {
+ public final boolean isSourceConnector() {
return ConnectUtils.isSourceConnector(connector);
}
- protected String connectorType() {
+ protected final String connectorType() {
if (isSinkConnector())
return "sink";
if (isSourceConnector())
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
index 7d132267e51..52e457a3f4c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
@@ -78,7 +78,7 @@ public class WorkerInfo {
/**
* Collect general runtime information.
*/
- protected void addRuntimeInfo() {
+ protected final void addRuntimeInfo() {
List<String> jvmArgs = RUNTIME.getInputArguments();
values.put("jvm.args", Utils.join(jvmArgs, ", "));
String[] jvmSpec = {
@@ -94,7 +94,7 @@ public class WorkerInfo {
/**
* Collect system information.
*/
- protected void addSystemInfo() {
+ protected final void addSystemInfo() {
String[] osInfo = {
OS.getName(),
OS.getArch(),
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 131c26741f7..23bf7ab1f66 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -571,6 +571,7 @@ public class DistributedConfig extends WorkerConfig {
}
// Visible for testing
+ @SuppressWarnings("this-escape")
DistributedConfig(Crypto crypto, Map<String, String> props) {
super(config(crypto), props);
this.crypto = crypto;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 8dd9afe93ec..109391dd740 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -65,6 +65,7 @@ public class Plugins {
}
// VisibleForTesting
+ @SuppressWarnings("this-escape")
Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory
factory) {
String pluginPath = WorkerConfig.pluginPath(props);
PluginDiscoveryMode discoveryMode =
WorkerConfig.pluginDiscovery(props);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 60138301600..9a5bc6f5a58 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -102,7 +102,7 @@ public abstract class RestServer {
/**
* Adds Jetty connector for each configured listener
*/
- public void createConnectors(List<String> listeners, List<String>
adminListeners) {
+ public final void createConnectors(List<String> listeners, List<String>
adminListeners) {
List<Connector> connectors = new ArrayList<>();
for (String listener : listeners) {
@@ -125,14 +125,14 @@ public abstract class RestServer {
/**
* Creates regular (non-admin) Jetty connector according to configuration
*/
- public Connector createConnector(String listener) {
+ public final Connector createConnector(String listener) {
return createConnector(listener, false);
}
/**
* Creates Jetty connector according to configuration
*/
- public Connector createConnector(String listener, boolean isAdmin) {
+ public final Connector createConnector(String listener, boolean isAdmin) {
Matcher listenerMatcher = LISTENER_PATTERN.matcher(listener);
if (!listenerMatcher.matches())
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index cf07e6c870c..ad8608b76cd 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -331,6 +331,7 @@ public class KafkaConfigBackingStore extends
KafkaTopicBasedBackingStore impleme
this(converter, config, configTransformer, adminSupplier,
clientIdBase, Time.SYSTEM);
}
+ @SuppressWarnings("this-escape")
KafkaConfigBackingStore(Converter converter, DistributedConfig config,
WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier,
String clientIdBase, Time time) {
this.lock = new Object();
this.started = false;
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5d1d7dc5ec7..484777dcf75 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -83,7 +83,6 @@ import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -227,8 +226,9 @@ public class RemoteLogManager implements Closeable {
}
}
+ @SuppressWarnings("removal")
RemoteStorageManager createRemoteStorageManager() {
- return AccessController.doPrivileged(new
PrivilegedAction<RemoteStorageManager>() {
+ return java.security.AccessController.doPrivileged(new
PrivilegedAction<RemoteStorageManager>() {
private final String classPath =
rlmConfig.remoteStorageManagerClassPath();
public RemoteStorageManager run() {
@@ -249,8 +249,9 @@ public class RemoteLogManager implements Closeable {
remoteLogStorageManager.configure(rsmProps);
}
+ @SuppressWarnings("removal")
RemoteLogMetadataManager createRemoteLogMetadataManager() {
- return AccessController.doPrivileged(new
PrivilegedAction<RemoteLogMetadataManager>() {
+ return java.security.AccessController.doPrivileged(new
PrivilegedAction<RemoteLogMetadataManager>() {
private final String classPath =
rlmConfig.remoteLogMetadataManagerClassPath();
public RemoteLogMetadataManager run() {
diff --git
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index f3b3878b70f..9267b483586 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -409,7 +409,7 @@ public final class MessageDataGenerator implements
MessageClassGenerator {
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.READABLE_CLASS);
buffer.printf("@Override%n");
- buffer.printf("public void read(Readable _readable, short _version)
{%n");
+ buffer.printf("public final void read(Readable _readable, short
_version) {%n");
buffer.incrementIndent();
VersionConditional.forVersions(parentVersions, struct.versions()).
allowMembershipCheckAlwaysFalse(false).
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
index b1d22364cc0..70648bef11a 100644
---
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
+++
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
@@ -97,7 +97,7 @@ public class MetadataBatchLoader {
*
* @param image Metadata image to reset this batch loader's state to.
*/
- public void resetToImage(MetadataImage image) {
+ public final void resetToImage(MetadataImage image) {
this.image = image;
this.hasSeenRecord = true;
this.delta = new MetadataDelta.Builder().setImage(image).build();
diff --git
a/raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java
b/raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java
index f71e1db2ad0..120611c5c03 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java
@@ -28,6 +28,7 @@ public class MockExpirationService implements
ExpirationService, MockTime.Listen
private final MockTime time;
private final PriorityQueue<ExpirationFuture<?>> queue = new
PriorityQueue<>();
+ @SuppressWarnings("this-escape")
public MockExpirationService(MockTime time) {
this.time = time;
time.addListener(this);
diff --git
a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
index ec3b7dc4b0c..2a71dce2e05 100644
---
a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
+++
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
@@ -22,6 +22,8 @@ package org.apache.kafka.server.fault;
* An exception thrown by a fault handler.
*/
public class FaultHandlerException extends RuntimeException {
+
+ @SuppressWarnings("this-escape")
public FaultHandlerException(String failureMessage, Throwable cause) {
super(failureMessage, cause);
// If a cause exception was provided, set our the stack trace its
stack trace. This is
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
b/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
index d4598cc3073..06c751e0bb2 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
@@ -46,6 +46,7 @@ public abstract class ShutdownableThread extends Thread {
this(name, isInterruptible, "[" + name + "]: ");
}
+ @SuppressWarnings("this-escape")
public ShutdownableThread(String name, boolean isInterruptible, String
logPrefix) {
super(name);
this.isInterruptible = isInterruptible;
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
index 9925248a17a..ec6e8b3d783 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
@@ -32,7 +32,7 @@ public abstract class TimerTask implements Runnable {
}
}
- void setTimerTaskEntry(TimerTaskEntry entry) {
+ final void setTimerTaskEntry(TimerTaskEntry entry) {
synchronized (this) {
// if this timerTask is already held by an existing timer task
entry,
// we will remove such an entry first.
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
index 58a68207889..02131ecb27d 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
@@ -23,6 +23,7 @@ public class TimerTaskEntry {
TimerTaskEntry next;
TimerTaskEntry prev;
+ @SuppressWarnings("this-escape")
public TimerTaskEntry(
TimerTask timerTask,
long expirationMs
diff --git
a/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
b/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
index d158890bc4f..7e98ca55e78 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
@@ -48,6 +48,7 @@ public class TimelineInteger implements Revertable {
private final SnapshotRegistry snapshotRegistry;
private int value;
+ @SuppressWarnings("this-escape")
public TimelineInteger(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.value = INIT;
diff --git
a/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
b/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
index 9b401db5b7d..1379b084004 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
@@ -48,6 +48,7 @@ public class TimelineLong implements Revertable {
private final SnapshotRegistry snapshotRegistry;
private long value;
+ @SuppressWarnings("this-escape")
public TimelineLong(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.value = INIT;
diff --git
a/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
b/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
index baa1019104a..0b4a43a249a 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
@@ -52,6 +52,7 @@ public class TimelineObject<T> implements Revertable {
private final T initialValue;
private T value;
+ @SuppressWarnings("this-escape")
public TimelineObject(SnapshotRegistry snapshotRegistry, T initialValue) {
Objects.requireNonNull(initialValue);
this.snapshotRegistry = snapshotRegistry;
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/MockTime.java
b/server-common/src/test/java/org/apache/kafka/server/util/MockTime.java
index 6d18d0891f3..c8449c15da8 100644
--- a/server-common/src/test/java/org/apache/kafka/server/util/MockTime.java
+++ b/server-common/src/test/java/org/apache/kafka/server/util/MockTime.java
@@ -30,6 +30,7 @@ public class MockTime extends
org.apache.kafka.common.utils.MockTime {
this(System.currentTimeMillis(), System.nanoTime());
}
+ @SuppressWarnings("this-escape")
public MockTime(long currentTimeMs, long currentHiResTimeNs) {
super(0L, currentTimeMs, currentHiResTimeNs);
scheduler = new MockScheduler(this);
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
index 0b0b2817061..cc992667ce8 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
@@ -38,6 +38,7 @@ public class FileBasedRemoteLogMetadataCache extends
RemoteLogMetadataCache {
private final RemoteLogMetadataSnapshotFile snapshotFile;
private final TopicIdPartition topicIdPartition;
+ @SuppressWarnings("this-escape")
public FileBasedRemoteLogMetadataCache(TopicIdPartition topicIdPartition,
Path partitionDir) {
if (!partitionDir.toFile().exists() ||
!partitionDir.toFile().isDirectory()) {
@@ -54,7 +55,7 @@ public class FileBasedRemoteLogMetadataCache extends
RemoteLogMetadataCache {
}
}
- protected void
loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapshot snapshot) {
+ protected final void
loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapshot snapshot) {
log.info("Loading snapshot for partition {} is: {}", topicIdPartition,
snapshot);
for (RemoteLogSegmentMetadataSnapshot metadataSnapshot :
snapshot.remoteLogSegmentMetadataSnapshots()) {
switch (metadataSnapshot.state()) {
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
index cda378dab79..dd04022b738 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
@@ -65,7 +65,7 @@ public class RemoteLogMetadataSerde {
return MetadataRecordType.fromId(apiKey).newMetadataRecord();
}
- protected Map<Short, RemoteLogMetadataTransform>
createRemoteLogMetadataTransforms() {
+ protected final Map<Short, RemoteLogMetadataTransform>
createRemoteLogMetadataTransforms() {
Map<Short, RemoteLogMetadataTransform> map = new HashMap<>();
map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new
RemoteLogSegmentMetadataTransform());
map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new
RemoteLogSegmentMetadataUpdateTransform());
@@ -74,7 +74,7 @@ public class RemoteLogMetadataSerde {
return map;
}
- protected Map<String, Short> createRemoteLogStorageClassToApiKeyMap() {
+ protected final Map<String, Short>
createRemoteLogStorageClassToApiKeyMap() {
Map<String, Short> map = new HashMap<>();
map.put(RemoteLogSegmentMetadata.class.getName(),
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
map.put(RemoteLogSegmentMetadataUpdate.class.getName(),
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
@@ -123,4 +123,4 @@ public class RemoteLogMetadataSerde {
remoteLogMetadataSerde.deserialize(consumerRecord.value()).toString());
}
}
-}
\ No newline at end of file
+}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
index 4cb77449579..bd07a67556a 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
@@ -56,6 +56,7 @@ public class LeaderEpochFileCache {
* @param topicPartition the associated topic partition
* @param checkpoint the checkpoint file
*/
+ @SuppressWarnings("this-escape")
public LeaderEpochFileCache(TopicPartition topicPartition,
LeaderEpochCheckpoint checkpoint) {
this.checkpoint = checkpoint;
this.topicPartition = topicPartition;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
index b70d1186a90..b53acce663d 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
@@ -72,6 +72,7 @@ public abstract class AbstractIndex implements Closeable {
* @param baseOffset the base offset of the segment that this index is
corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
+ @SuppressWarnings("this-escape")
public AbstractIndex(File file, long baseOffset, int maxIndexSize, boolean
writable) throws IOException {
Objects.requireNonNull(file);
this.file = file;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index d73a37485cb..bfe7fc3aa34 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -343,7 +343,7 @@ public class LogConfig extends AbstractConfig {
this(props, Collections.emptySet());
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"deprecation", "this-escape"})
public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
super(CONFIG, props, false);
this.props = Collections.unmodifiableMap(props);
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
index 82f57bb0c23..5db8f411365 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
@@ -65,6 +65,7 @@ public class OffsetIndex extends AbstractIndex {
this(file, baseOffset, maxIndexSize, true);
}
+ @SuppressWarnings("this-escape")
public OffsetIndex(File file, long baseOffset, int maxIndexSize, boolean
writable) throws IOException {
super(file, baseOffset, maxIndexSize, writable);
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
index e1ade8c5314..52a51f2ebfb 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
@@ -60,6 +60,7 @@ public class TimeIndex extends AbstractIndex {
this(file, baseOffset, maxIndexSize, true);
}
+ @SuppressWarnings("this-escape")
public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean
writable) throws IOException {
super(file, baseOffset, maxIndexSize, writable);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 73031d9dcb0..c7b6c05d840 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -865,6 +865,7 @@ public class KafkaStreams implements AutoCloseable {
this(topologyMetadata, applicationConfigs, clientSupplier,
Time.SYSTEM);
}
+ @SuppressWarnings("this-escape")
private KafkaStreams(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 27ef8496a35..1c2cde831fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -85,6 +85,7 @@ public class StreamsBuilder {
*
* @param topologyConfigs the streams configs that apply at the
topology level. Please refer to {@link TopologyConfig} for more detail
*/
+ @SuppressWarnings("this-escape")
public StreamsBuilder(final TopologyConfig topologyConfigs) {
topology = getNewTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index ddd9f40b533..b8defc5ee0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1346,6 +1346,7 @@ public class StreamsConfig extends AbstractConfig {
this(props, true);
}
+ @SuppressWarnings("this-escape")
protected StreamsConfig(final Map<?, ?> props,
final boolean doLog) {
super(CONFIG, props, doLog);
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index 61c1b0bf7a1..83516197347 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -125,6 +125,7 @@ public class TopologyConfig extends AbstractConfig {
this(null, globalAppConfigs, new Properties());
}
+ @SuppressWarnings("this-escape")
public TopologyConfig(final String topologyName, final StreamsConfig
globalAppConfigs, final Properties topologyOverrides) {
super(CONFIG, topologyOverrides, false);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index e81934716b2..740ce25e5df 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -33,6 +33,7 @@ public class MaterializedInternal<K, V, S extends StateStore>
extends Materializ
this(materialized, null, null);
}
+ @SuppressWarnings("this-escape")
public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 57166526413..4c3cc4b83e7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -435,7 +435,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
return Collections.unmodifiableMap(checkpointFileCache);
}
- public String changelogFor(final String storeName) {
+ public final String changelogFor(final String storeName) {
return storeToChangelogTopic.get(storeName);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index ffa5dcaf73b..b484d26f0fe 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -59,6 +59,7 @@ public class ProcessorContextImpl extends
AbstractProcessorContext<Object, Objec
final Map<String, DirtyEntryFlushListener> cacheNameToFlushListener = new
HashMap<>();
+ @SuppressWarnings("this-escape")
public ProcessorContextImpl(final TaskId id,
final StreamsConfig config,
final ProcessorStateManager stateMgr,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f12b7e89561..dffd0ecef29 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -110,7 +110,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
private boolean hasPendingTxCommit = false;
private Optional<Long> timeCurrentIdlingStarted;
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({"rawtypes", "this-escape"})
public StreamTask(final TaskId id,
final Set<TopicPartition> inputPartitions,
final ProcessorTopology topology,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 8d5fe3d5a0a..6dd4d136cf4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -465,6 +465,7 @@ public class StreamThread extends Thread {
}
}
+ @SuppressWarnings("this-escape")
public StreamThread(final Time time,
final StreamsConfig config,
final Admin adminClient,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index ee9d792c3e6..4f2d22d4711 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -72,6 +72,7 @@ public class DefaultTaskManager implements TaskManager {
}
}
+ @SuppressWarnings("this-escape")
public DefaultTaskManager(final Time time,
final String clientId,
final TasksRegistry tasks,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 8936938bc52..7f945ffe06d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -63,7 +63,7 @@ public class KafkaEmbedded {
* broker should listen to. Note that you cannot change the
`log.dirs` setting
* currently.
*/
- @SuppressWarnings("WeakerAccess")
+ @SuppressWarnings({"WeakerAccess", "this-escape"})
public KafkaEmbedded(final Properties config, final MockTime time) throws
IOException {
tmpFolder = org.apache.kafka.test.TestUtils.tempDirectory();
logDir =
org.apache.kafka.test.TestUtils.tempDirectory(tmpFolder.toPath(), "log");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 2848f0dcd5e..c764d6ede2a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -91,6 +91,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
+@SuppressWarnings("this-escape")
public class StoreChangelogReaderTest extends EasyMockSupport {
@Rule
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index fc11a12144a..8c1209f7ab1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -74,6 +74,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@SuppressWarnings("this-escape")
public class MeteredTimestampedKeyValueStoreTest {
@Rule
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
index 504a426ed5e..8fb4de1824d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -51,6 +51,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
@RunWith(Parameterized.class)
+@SuppressWarnings("this-escape")
public class TimestampedWindowStoreBuilderTest {
private static final String TIMESTAMP_STORE_NAME = "Timestamped Store";
private static final String TIMEORDERED_STORE_NAME = "TimeOrdered Store";
@@ -257,4 +258,4 @@ public class TimestampedWindowStoreBuilderTest {
assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't
be null"));
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 5192a1f6782..237d1c665b8 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -213,6 +213,7 @@ public class InternalMockProcessorContext<KOut, VOut>
this(stateDir, keySerde, valueSerde, metrics, config,
collectorSupplier, cache, time, new TaskId(0, 0));
}
+ @SuppressWarnings("this-escape")
public InternalMockProcessorContext(final File stateDir,
final Serde<?> keySerde,
final Serde<?> valueSerde,
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index 93e126f6df6..77678bc383e 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -41,6 +41,7 @@ public class MockRestoreConsumer<K, V> extends
MockConsumer<byte[], byte[]> {
private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new
ArrayList<>();
+ @SuppressWarnings("this-escape")
public MockRestoreConsumer(final Serializer<K> keySerializer, final
Serializer<V> valueSerializer) {
super(OffsetResetStrategy.EARLIEST);
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
index 5235fc3254d..29d4a451f57 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
@@ -119,6 +119,7 @@ public class ConfigurableProducerSpec extends TaskSpec {
private final TopicsSpec activeTopic;
private final int activePartition;
+ @SuppressWarnings("this-escape")
@JsonCreator
public ConfigurableProducerSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long
durationMs,
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
index 6141d30b60b..8738091b499 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
@@ -46,6 +46,7 @@ public class ConnectionStressSpec extends TaskSpec {
FETCH_METADATA
}
+ @SuppressWarnings("this-escape")
@JsonCreator
public ConnectionStressSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index 6909d2d2f69..294b251dedd 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -104,6 +104,7 @@ public class ConsumeBenchSpec extends TaskSpec {
private final int threadsPerWorker;
private final Optional<RecordProcessor> recordProcessor;
+ @SuppressWarnings("this-escape")
@JsonCreator
public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 9f6a907c37f..540ae763e66 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -74,6 +74,7 @@ public class ProduceBenchSpec extends TaskSpec {
private final boolean useConfiguredPartitioner;
private final boolean skipFlush;
+ @SuppressWarnings("this-escape")
@JsonCreator
public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index fd30e8eb78b..296c47f23d3 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -42,6 +42,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
+ @SuppressWarnings("this-escape")
@JsonCreator
public RoundTripWorkloadSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java
index 1783a800f8f..94ce71eea12 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java
@@ -80,6 +80,7 @@ public class SustainedConnectionSpec extends TaskSpec {
private final int numThreads;
private final int refreshRateMs;
+ @SuppressWarnings("this-escape")
@JsonCreator
public SustainedConnectionSpec(
@JsonProperty("startMs") long startMs,