This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af537587464 KAFKA-17814 Use `final` declaration to replace the 
suppression `this-escape` (#17613)
af537587464 is described below

commit af53758746485daa06c4d4366036c04f7f9f0e63
Author: Linsiyuan9 <[email protected]>
AuthorDate: Sun Nov 3 15:00:02 2024 +0800

    KAFKA-17814 Use `final` declaration to replace the suppression 
`this-escape` (#17613)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/consumer/internals/OffsetsRequestManager.java  | 3 +--
 .../internals/metrics/ConsumerRebalanceMetricsManager.java       | 3 +--
 .../consumer/internals/metrics/ShareRebalanceMetricsManager.java | 3 +--
 .../src/main/java/org/apache/kafka/common/metrics/Metrics.java   | 3 +--
 .../main/java/org/apache/kafka/common/protocol/types/Schema.java | 3 +--
 .../oauthbearer/internals/OAuthBearerSaslClientProvider.java     | 5 ++---
 .../oauthbearer/internals/OAuthBearerSaslServerProvider.java     | 5 ++---
 .../common/security/plain/internals/PlainSaslServerProvider.java | 5 ++---
 .../common/security/scram/internals/ScramSaslClientProvider.java | 5 ++---
 .../common/security/scram/internals/ScramSaslServerProvider.java | 5 ++---
 .../java/org/apache/kafka/clients/admin/MockAdminClient.java     | 6 ++----
 .../test/java/org/apache/kafka/common/network/NioEchoServer.java | 3 +--
 .../java/org/apache/kafka/common/network/PlaintextSender.java    | 3 +--
 .../src/test/java/org/apache/kafka/common/network/SslSender.java | 3 +--
 .../kafka/common/security/ssl/DefaultSslEngineFactoryTest.java   | 3 +--
 .../org/apache/kafka/common/security/ssl/mock/TestProvider.java  | 5 ++---
 .../test/java/org/apache/kafka/common/utils/MockScheduler.java   | 3 +--
 .../java/org/apache/kafka/connect/json/JsonConverterConfig.java  | 3 +--
 .../java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java  | 3 +--
 .../org/apache/kafka/connect/runtime/SourceConnectorConfig.java  | 9 ++++-----
 .../src/main/java/org/apache/kafka/connect/runtime/Worker.java   | 7 +++----
 .../kafka/connect/runtime/distributed/DistributedConfig.java     | 3 +--
 .../kafka/connect/runtime/standalone/StandaloneHerder.java       | 3 +--
 .../apache/kafka/connect/storage/KafkaConfigBackingStore.java    | 3 +--
 .../java/org/apache/kafka/coordinator/group/GroupConfig.java     | 3 +--
 .../test/java/org/apache/kafka/raft/MockExpirationService.java   | 3 +--
 .../org/apache/kafka/server/fault/FaultHandlerException.java     | 3 +--
 .../java/org/apache/kafka/server/util/timer/TimerTaskEntry.java  | 3 +--
 .../src/main/java/org/apache/kafka/timeline/TimelineInteger.java | 3 +--
 .../src/main/java/org/apache/kafka/timeline/TimelineLong.java    | 3 +--
 .../src/main/java/org/apache/kafka/timeline/TimelineObject.java  | 3 +--
 .../src/test/java/org/apache/kafka/server/util/MockTime.java     | 3 +--
 .../kafka/storage/internals/epoch/LeaderEpochFileCache.java      | 3 +--
 .../java/org/apache/kafka/storage/internals/log/OffsetIndex.java | 3 +--
 .../kafka/storage/internals/log/RemoteStorageThreadPool.java     | 3 +--
 .../src/main/java/org/apache/kafka/streams/TopologyConfig.java   | 3 +--
 .../kafka/streams/kstream/internals/MaterializedInternal.java    | 3 +--
 .../kafka/streams/processor/internals/ProcessorContextImpl.java  | 3 +--
 .../streams/processor/internals/tasks/DefaultTaskManager.java    | 3 +--
 .../streams/processor/internals/StoreChangelogReaderTest.java    | 1 -
 .../state/internals/MeteredTimestampedKeyValueStoreTest.java     | 1 -
 .../state/internals/TimestampedWindowStoreBuilderTest.java       | 1 -
 .../src/test/java/org/apache/kafka/test/MockRestoreConsumer.java | 3 +--
 .../apache/kafka/trogdor/workload/ConfigurableProducerSpec.java  | 3 +--
 .../org/apache/kafka/trogdor/workload/ConnectionStressSpec.java  | 3 +--
 .../java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java | 3 +--
 .../java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java | 3 +--
 .../org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java | 3 +--
 .../apache/kafka/trogdor/workload/SustainedConnectionSpec.java   | 3 +--
 49 files changed, 58 insertions(+), 108 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 5942bedfb0c..6d296149b70 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -77,7 +77,7 @@ import static 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.reg
  * {@link ConsumerMetadata}, so this implements {@link 
ClusterResourceListener} to get notified
  * when the cluster metadata is updated.
  */
-public class OffsetsRequestManager implements RequestManager, 
ClusterResourceListener {
+public final class OffsetsRequestManager implements RequestManager, 
ClusterResourceListener {
 
     private final ConsumerMetadata metadata;
     private final IsolationLevel isolationLevel;
@@ -109,7 +109,6 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
      */
     private PendingFetchCommittedRequest pendingOffsetFetchEvent;
 
-    @SuppressWarnings("this-escape")
     public OffsetsRequestManager(final SubscriptionState subscriptionState,
                                  final ConsumerMetadata metadata,
                                  final IsolationLevel isolationLevel,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java
index 027a93c574d..c312edd54b6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
 
-public class ConsumerRebalanceMetricsManager extends RebalanceMetricsManager {
+public final class ConsumerRebalanceMetricsManager extends 
RebalanceMetricsManager {
     private final Sensor successfulRebalanceSensor;
     private final Sensor failedRebalanceSensor;
 
@@ -47,7 +47,6 @@ public class ConsumerRebalanceMetricsManager extends 
RebalanceMetricsManager {
     private long lastRebalanceEndMs = -1L;
     private long lastRebalanceStartMs = -1L;
 
-    @SuppressWarnings("this-escape")
     public ConsumerRebalanceMetricsManager(Metrics metrics) {
         super(CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java
index f1be3b25031..0760eaa6d5c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java
@@ -28,14 +28,13 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
 
-public class ShareRebalanceMetricsManager extends RebalanceMetricsManager {
+public final class ShareRebalanceMetricsManager extends 
RebalanceMetricsManager {
     private final Sensor rebalanceSensor;
     public final MetricName rebalanceTotal;
     public final MetricName rebalanceRatePerHour;
     private long lastRebalanceEndMs = -1L;
     private long lastRebalanceStartMs = -1L;
 
-    @SuppressWarnings("this-escape")
     public ShareRebalanceMetricsManager(Metrics metrics) {
         super(CONSUMER_SHARE_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 02a48f8d115..13d8db4b0cb 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -67,7 +67,7 @@ import static java.util.Collections.emptyList;
  * sensor.record(messageSize);
  * </pre>
  */
-public class Metrics implements Closeable {
+public final class Metrics implements Closeable {
 
     private final MetricConfig config;
     private final ConcurrentMap<MetricName, KafkaMetric> metrics;
@@ -155,7 +155,6 @@ public class Metrics implements Closeable {
      * @param enableExpiration true if the metrics instance can garbage 
collect inactive sensors, false otherwise
      * @param metricsContext The metricsContext to initialize metrics reporter 
with
      */
-    @SuppressWarnings("this-escape")
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> 
reporters, Time time, boolean enableExpiration,
                    MetricsContext metricsContext) {
         this.config = defaultConfig;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 9c3dd945bcf..325a77fe43f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -24,7 +24,7 @@ import java.util.Objects;
 /**
  * The schema for a compound record definition
  */
-public class Schema extends Type {
+public final class Schema extends Type {
     private static final Object[] NO_VALUES = new Object[0];
 
     private final BoundField[] fields;
@@ -53,7 +53,6 @@ public class Schema extends Type {
      *
      * @throws SchemaException If the given list have duplicate fields
      */
-    @SuppressWarnings("this-escape")
     public Schema(boolean tolerateMissingFieldsWithDefaults, Field... fs) {
         this.fields = new BoundField[fs.length];
         this.fieldsByName = new HashMap<>();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
index b6443dc7809..bfd6becba2a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
@@ -22,11 +22,10 @@ import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslCli
 import java.security.Provider;
 import java.security.Security;
 
-public class OAuthBearerSaslClientProvider extends Provider {
+public final class OAuthBearerSaslClientProvider extends Provider {
     private static final long serialVersionUID = 1L;
 
-    @SuppressWarnings("this-escape")
-    protected OAuthBearerSaslClientProvider() {
+    private OAuthBearerSaslClientProvider() {
         super("SASL/OAUTHBEARER Client Provider", "1.0", "SASL/OAUTHBEARER 
Client Provider for Kafka");
         put("SaslClientFactory." + 
OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
                 OAuthBearerSaslClientFactory.class.getName());
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
index b666521468f..a35e8cc46bf 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
@@ -22,11 +22,10 @@ import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslSer
 import java.security.Provider;
 import java.security.Security;
 
-public class OAuthBearerSaslServerProvider extends Provider {
+public final class OAuthBearerSaslServerProvider extends Provider {
     private static final long serialVersionUID = 1L;
 
-    @SuppressWarnings("this-escape")
-    protected OAuthBearerSaslServerProvider() {
+    private OAuthBearerSaslServerProvider() {
         super("SASL/OAUTHBEARER Server Provider", "1.0", "SASL/OAUTHBEARER 
Server Provider for Kafka");
         put("SaslServerFactory." + 
OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
                 OAuthBearerSaslServerFactory.class.getName());
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
index 333f1c6bba7..86895c00934 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
@@ -21,12 +21,11 @@ import 
org.apache.kafka.common.security.plain.internals.PlainSaslServer.PlainSas
 import java.security.Provider;
 import java.security.Security;
 
-public class PlainSaslServerProvider extends Provider {
+public final class PlainSaslServerProvider extends Provider {
 
     private static final long serialVersionUID = 1L;
 
-    @SuppressWarnings("this-escape")
-    protected PlainSaslServerProvider() {
+    private PlainSaslServerProvider() {
         super("Simple SASL/PLAIN Server Provider", "1.0", "Simple SASL/PLAIN 
Server Provider for Kafka");
         put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, 
PlainSaslServerFactory.class.getName());
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
index e6bcfdaddd5..42b68fca577 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
@@ -21,12 +21,11 @@ import 
org.apache.kafka.common.security.scram.internals.ScramSaslClient.ScramSas
 import java.security.Provider;
 import java.security.Security;
 
-public class ScramSaslClientProvider extends Provider {
+public final class ScramSaslClientProvider extends Provider {
 
     private static final long serialVersionUID = 1L;
 
-    @SuppressWarnings("this-escape")
-    protected ScramSaslClientProvider() {
+    private ScramSaslClientProvider() {
         super("SASL/SCRAM Client Provider", "1.0", "SASL/SCRAM Client Provider 
for Kafka");
         for (ScramMechanism mechanism : ScramMechanism.values())
             put("SaslClientFactory." + mechanism.mechanismName(), 
ScramSaslClientFactory.class.getName());
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
index bf6175f2e07..a2409e03b04 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
@@ -21,12 +21,11 @@ import 
org.apache.kafka.common.security.scram.internals.ScramSaslServer.ScramSas
 import java.security.Provider;
 import java.security.Security;
 
-public class ScramSaslServerProvider extends Provider {
+public final class ScramSaslServerProvider extends Provider {
 
     private static final long serialVersionUID = 1L;
 
-    @SuppressWarnings("this-escape")
-    protected ScramSaslServerProvider() {
+    private ScramSaslServerProvider() {
         super("SASL/SCRAM Server Provider", "1.0", "SASL/SCRAM Server Provider 
for Kafka");
         for (ScramMechanism mechanism : ScramMechanism.values())
             put("SaslServerFactory." + mechanism.mechanismName(), 
ScramSaslServerFactory.class.getName());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 8b6e1267c2f..2eef603c4a0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -129,7 +129,6 @@ public class MockAdminClient extends AdminClient {
         private Map<String, Short> maxSupportedFeatureLevels = 
Collections.emptyMap();
         private Map<String, String> defaultGroupConfigs = 
Collections.emptyMap();
 
-        @SuppressWarnings("this-escape")
         public Builder() {
             numBrokers(1);
         }
@@ -145,7 +144,7 @@ public class MockAdminClient extends AdminClient {
             return this;
         }
 
-        public Builder numBrokers(int numBrokers) {
+        public final Builder numBrokers(int numBrokers) {
             if (brokers.size() >= numBrokers) {
                 brokers = brokers.subList(0, numBrokers);
                 brokerLogDirs = brokerLogDirs.subList(0, numBrokers);
@@ -236,7 +235,6 @@ public class MockAdminClient extends AdminClient {
             Collections.emptyMap());
     }
 
-    @SuppressWarnings("this-escape")
     private MockAdminClient(
         List<Node> brokers,
         Node controller,
@@ -274,7 +272,7 @@ public class MockAdminClient extends AdminClient {
         this.maxSupportedFeatureLevels = new 
HashMap<>(maxSupportedFeatureLevels);
     }
 
-    public synchronized void controller(Node controller) {
+    public final synchronized void controller(Node controller) {
         if (!brokers.contains(controller))
             throw new IllegalArgumentException("The controller node must be in 
the list of brokers");
         this.controller = controller;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 3667af0f725..9bdc7c38c44 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -61,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * with the configured security protocol.
  *
  */
-public class NioEchoServer extends Thread {
+public final class NioEchoServer extends Thread {
     private static final Logger LOG = 
LoggerFactory.getLogger(NioEchoServer.class);
 
     public enum MetricType {
@@ -103,7 +103,6 @@ public class NioEchoServer extends Thread {
                 new DelegationTokenCache(ScramMechanism.mechanismNames()));
     }
 
-    @SuppressWarnings("this-escape")
     public NioEchoServer(ListenerName listenerName, SecurityProtocol 
securityProtocol, AbstractConfig config,
             String serverHost, ChannelBuilder channelBuilder, CredentialCache 
credentialCache,
             int failedAuthenticationDelayMs, Time time, DelegationTokenCache 
tokenCache) throws Exception {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java 
b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
index 8fa28e9446e..9a77a042db1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
@@ -23,9 +23,8 @@ import java.net.Socket;
 /**
  * test helper class that will connect to a given server address, write out 
the given payload and disconnect
  */
-public class PlaintextSender extends Thread {
+public final class PlaintextSender extends Thread {
 
-    @SuppressWarnings("this-escape")
     public PlaintextSender(final InetSocketAddress serverAddress, final byte[] 
payload) {
         super(() -> {
             try (Socket connection = new Socket(serverAddress.getAddress(), 
serverAddress.getPort());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
index 353a503c94a..68b981f813c 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
@@ -27,14 +27,13 @@ import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
-public class SslSender extends Thread {
+public final class SslSender extends Thread {
 
     private final String tlsProtocol;
     private final InetSocketAddress serverAddress;
     private final byte[] payload;
     private final CountDownLatch handshaked = new CountDownLatch(1);
 
-    @SuppressWarnings("this-escape")
     public SslSender(String tlsProtocol, InetSocketAddress serverAddress, 
byte[] payload) {
         this.tlsProtocol = tlsProtocol;
         this.serverAddress = serverAddress;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
index 75c1d6c0e7a..232d4d7327b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
@@ -34,7 +34,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-@SuppressWarnings("this-escape")
 public class DefaultSslEngineFactoryTest {
 
     /*
@@ -196,7 +195,7 @@ public class DefaultSslEngineFactoryTest {
 
     private static final Password KEY_PASSWORD = new Password("key-password");
 
-    private DefaultSslEngineFactory factory = sslEngineFactory();
+    private DefaultSslEngineFactory factory;
     Map<String, Object> configs = new HashMap<>();
 
     @BeforeEach
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
index f3ad9ba50ba..354543bcdd5 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.security.ssl.mock;
 
 import java.security.Provider;
 
-public class TestProvider extends Provider {
+public final class TestProvider extends Provider {
 
     private static final String KEY_MANAGER_FACTORY = 
String.format("KeyManagerFactory.%s", TestKeyManagerFactory.ALGORITHM);
     private static final String TRUST_MANAGER_FACTORY = 
String.format("TrustManagerFactory.%s", TestTrustManagerFactory.ALGORITHM);
@@ -27,8 +27,7 @@ public class TestProvider extends Provider {
         this("TestProvider", "0.1", "provider for test cases");
     }
 
-    @SuppressWarnings("this-escape")
-    protected TestProvider(String name, String version, String info) {
+    private TestProvider(String name, String version, String info) {
         super(name, version, info);
         super.put(KEY_MANAGER_FACTORY, TestKeyManagerFactory.class.getName());
         super.put(TRUST_MANAGER_FACTORY, 
TestTrustManagerFactory.class.getName());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java 
b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index 0021caff173..882318ec003 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 
-public class MockScheduler implements Scheduler, MockTime.Listener {
+public final class MockScheduler implements Scheduler, MockTime.Listener {
     private static final Logger log = 
LoggerFactory.getLogger(MockScheduler.class);
 
     /**
@@ -42,7 +42,6 @@ public class MockScheduler implements Scheduler, 
MockTime.Listener {
      */
     private final TreeMap<Long, List<KafkaFutureImpl<Long>>> waiters = new 
TreeMap<>();
 
-    @SuppressWarnings("this-escape")
     public MockScheduler(MockTime time) {
         this.time = time;
         time.addListener(this);
diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
index 6f11717d500..f02d54ac263 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
@@ -28,7 +28,7 @@ import java.util.Map;
 /**
  * Configuration options for {@link JsonConverter} instances.
  */
-public class JsonConverterConfig extends ConverterConfig {
+public final class JsonConverterConfig extends ConverterConfig {
 
     public static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
     public static final boolean SCHEMAS_ENABLE_DEFAULT = true;
@@ -87,7 +87,6 @@ public class JsonConverterConfig extends ConverterConfig {
     private final DecimalFormat decimalFormat;
     private final boolean replaceNullWithDefault;
 
-    @SuppressWarnings("this-escape")
     public JsonConverterConfig(Map<String, ?> props) {
         super(CONFIG, props);
         this.schemasEnabled = getBoolean(SCHEMAS_ENABLE_CONFIG);
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 014d9767288..aba62cf8464 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -59,7 +59,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidStrin
  *      --->%---
  * </pre>
  */
-public class MirrorMakerConfig extends AbstractConfig {
+public final class MirrorMakerConfig extends AbstractConfig {
 
     public static final String CLUSTERS_CONFIG = "clusters";
     private static final String CLUSTERS_DOC = "List of cluster aliases.";
@@ -89,7 +89,6 @@ public class MirrorMakerConfig extends AbstractConfig {
 
     private final Map<String, String> rawProperties;
 
-    @SuppressWarnings("this-escape")
     public MirrorMakerConfig(Map<String, String> props) {
         super(config(), props, true);
         plugins = new Plugins(originalsStrings());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index a707b5a97a0..bc797563b10 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -49,11 +49,11 @@ import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.DEF
 import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
 import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
 
-public class SourceConnectorConfig extends ConnectorConfig {
+public final class SourceConnectorConfig extends ConnectorConfig {
 
     private static final Logger log = 
LoggerFactory.getLogger(SourceConnectorConfig.class);
 
-    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+    static final String TOPIC_CREATION_GROUP = "Topic Creation";
 
     public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
 
@@ -62,7 +62,7 @@ public class SourceConnectorConfig extends ConnectorConfig {
             + "created by source connectors";
     private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
 
-    protected static final String EXACTLY_ONCE_SUPPORT_GROUP = "Exactly Once 
Support";
+    static final String EXACTLY_ONCE_SUPPORT_GROUP = "Exactly Once Support";
 
     public enum ExactlyOnceSupportLevel {
         REQUESTED,
@@ -103,7 +103,7 @@ public class SourceConnectorConfig extends ConnectorConfig {
             + TRANSACTION_BOUNDARY_CONFIG + " is specified.";
     private static final String TRANSACTION_BOUNDARY_INTERVAL_DISPLAY = 
"Transaction boundary interval";
 
-    protected static final String OFFSETS_TOPIC_GROUP = "offsets.topic";
+    static final String OFFSETS_TOPIC_GROUP = "offsets.topic";
 
     public static final String OFFSETS_TOPIC_CONFIG = "offsets.storage.topic";
     private static final String OFFSETS_TOPIC_DOC = "The name of a separate 
offsets topic to use for this connector. "
@@ -248,7 +248,6 @@ public class SourceConnectorConfig extends ConnectorConfig {
         return newDef;
     }
 
-    @SuppressWarnings("this-escape")
     public SourceConnectorConfig(Plugins plugins, Map<String, String> props, 
boolean createTopics) {
         super(plugins, configDef(), props);
         if (createTopics && props.entrySet().stream().anyMatch(e -> 
e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 0a44028a30d..591e9816a7a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -134,14 +134,14 @@ import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CON
  * Since each task has a dedicated thread, this is mainly just a container for 
them.
  * </p>
  */
-public class Worker {
+public final class Worker {
 
     public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
     public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
 
     private static final Logger log = LoggerFactory.getLogger(Worker.class);
 
-    protected Herder herder;
+    Herder herder;
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
@@ -173,7 +173,6 @@ public class Worker {
         this(workerId, time, plugins, config, globalOffsetBackingStore, 
Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy, 
Admin::create);
     }
 
-    @SuppressWarnings("this-escape")
     Worker(
             String workerId,
             Time time,
@@ -222,7 +221,7 @@ public class Worker {
         return workerConfigTransformer;
     }
 
-    protected Herder herder() {
+    Herder herder() {
         return herder;
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 0c7d70f45cc..9150000223b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -57,7 +57,7 @@ import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
 /**
  * Provides configuration for Kafka Connect workers running in distributed 
mode.
  */
-public class DistributedConfig extends WorkerConfig {
+public final class DistributedConfig extends WorkerConfig {
 
     private static final Logger log = 
LoggerFactory.getLogger(DistributedConfig.class);
 
@@ -588,7 +588,6 @@ public class DistributedConfig extends WorkerConfig {
     }
 
     // Visible for testing
-    @SuppressWarnings("this-escape")
     DistributedConfig(Crypto crypto, Map<String, String> props) {
         super(config(crypto), props);
         this.crypto = crypto;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 56d6cafe25f..19b539c3405 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -69,7 +69,7 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * Single process, in-memory "herder". Useful for a standalone Kafka Connect 
process.
  */
-public class StandaloneHerder extends AbstractHerder {
+public final class StandaloneHerder extends AbstractHerder {
     private static final Logger log = 
LoggerFactory.getLogger(StandaloneHerder.class);
 
     private final AtomicLong requestSeqNum = new AtomicLong();
@@ -92,7 +92,6 @@ public class StandaloneHerder extends AbstractHerder {
     }
 
     // visible for testing
-    @SuppressWarnings("this-escape")
     StandaloneHerder(Worker worker,
                      String workerId,
                      String kafkaClusterId,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 16ccf22f22e..63b33a792cf 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -192,7 +192,7 @@ import static 
org.apache.kafka.connect.util.ConnectUtils.className;
  * rebalance must be deferred.
  * </p>
  */
-public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore 
implements ConfigBackingStore {
+public final class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore 
implements ConfigBackingStore {
     private static final Logger log = 
LoggerFactory.getLogger(KafkaConfigBackingStore.class);
 
     public static final String TARGET_STATE_PREFIX = "target-state-";
@@ -339,7 +339,6 @@ public class KafkaConfigBackingStore extends 
KafkaTopicBasedBackingStore impleme
         this(converter, config, configTransformer, adminSupplier, 
clientIdBase, Time.SYSTEM);
     }
 
-    @SuppressWarnings("this-escape")
     KafkaConfigBackingStore(Converter converter, DistributedConfig config, 
WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, 
String clientIdBase, Time time) {
         this.lock = new Object();
         this.started = false;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index e3872628438..b65e297bb04 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -36,7 +36,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.Type.INT;
  * Group configuration related parameters and supporting methods like 
validation, etc. are
  * defined in this class.
  */
-public class GroupConfig extends AbstractConfig {
+public final class GroupConfig extends AbstractConfig {
 
     public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
 
@@ -90,7 +90,6 @@ public class GroupConfig extends AbstractConfig {
             MEDIUM,
             ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC);
 
-    @SuppressWarnings("this-escape")
     public GroupConfig(Map<?, ?> props) {
         super(CONFIG, props, false);
         this.consumerSessionTimeoutMs = 
getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java 
b/raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java
index 120611c5c03..f54f9c073dc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java
@@ -23,12 +23,11 @@ import java.util.PriorityQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class MockExpirationService implements ExpirationService, 
MockTime.Listener {
+public final class MockExpirationService implements ExpirationService, 
MockTime.Listener {
     private final AtomicLong idGenerator = new AtomicLong(0);
     private final MockTime time;
     private final PriorityQueue<ExpirationFuture<?>> queue = new 
PriorityQueue<>();
 
-    @SuppressWarnings("this-escape")
     public MockExpirationService(MockTime time) {
         this.time = time;
         time.addListener(this);
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
 
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
index 2a71dce2e05..a84325e6d01 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
@@ -21,9 +21,8 @@ package org.apache.kafka.server.fault;
 /**
  * An exception thrown by a fault handler.
  */
-public class FaultHandlerException extends RuntimeException {
+public final class FaultHandlerException extends RuntimeException {
 
-    @SuppressWarnings("this-escape")
     public FaultHandlerException(String failureMessage, Throwable cause) {
         super(failureMessage, cause);
         // If a cause exception was provided, set our the stack trace its 
stack trace. This is
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
index 02131ecb27d..7a84a2755f7 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
@@ -16,14 +16,13 @@
  */
 package org.apache.kafka.server.util.timer;
 
-public class TimerTaskEntry {
+public final class TimerTaskEntry {
     public final TimerTask timerTask;
     public final long expirationMs;
     volatile TimerTaskList list;
     TimerTaskEntry next;
     TimerTaskEntry prev;
 
-    @SuppressWarnings("this-escape")
     public TimerTaskEntry(
         TimerTask timerTask,
         long expirationMs
diff --git 
a/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java 
b/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
index c4d7298a22d..d26e5dbcd62 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
  * <br>
  * This class requires external synchronization.
  */
-public class TimelineInteger implements Revertable {
+public final class TimelineInteger implements Revertable {
     public static final int INIT = 0;
 
     static class IntegerContainer implements Delta {
@@ -48,7 +48,6 @@ public class TimelineInteger implements Revertable {
     private final SnapshotRegistry snapshotRegistry;
     private int value;
 
-    @SuppressWarnings("this-escape")
     public TimelineInteger(SnapshotRegistry snapshotRegistry) {
         this.snapshotRegistry = snapshotRegistry;
         this.value = INIT;
diff --git 
a/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java 
b/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
index b882d6455f1..a51d832482b 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
  * <br>
  * This class requires external synchronization.
  */
-public class TimelineLong implements Revertable {
+public final class TimelineLong implements Revertable {
     public static final long INIT = 0;
 
     static class LongContainer implements Delta {
@@ -48,7 +48,6 @@ public class TimelineLong implements Revertable {
     private final SnapshotRegistry snapshotRegistry;
     private long value;
 
-    @SuppressWarnings("this-escape")
     public TimelineLong(SnapshotRegistry snapshotRegistry) {
         this.snapshotRegistry = snapshotRegistry;
         this.value = INIT;
diff --git 
a/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java 
b/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
index f41343c9b3b..d5cf03d2364 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
@@ -26,7 +26,7 @@ import java.util.Objects;
  * <br>
  * This class requires external synchronization.
  */
-public class TimelineObject<T> implements Revertable {
+public final class TimelineObject<T> implements Revertable {
     static class ObjectContainer<T> implements Delta {
         private T value;
 
@@ -52,7 +52,6 @@ public class TimelineObject<T> implements Revertable {
     private final T initialValue;
     private T value;
 
-    @SuppressWarnings("this-escape")
     public TimelineObject(SnapshotRegistry snapshotRegistry, T initialValue) {
         Objects.requireNonNull(initialValue);
         this.snapshotRegistry = snapshotRegistry;
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/MockTime.java 
b/server-common/src/test/java/org/apache/kafka/server/util/MockTime.java
index c8449c15da8..b4d37dc72cc 100644
--- a/server-common/src/test/java/org/apache/kafka/server/util/MockTime.java
+++ b/server-common/src/test/java/org/apache/kafka/server/util/MockTime.java
@@ -23,14 +23,13 @@ package org.apache.kafka.server.util;
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-public class MockTime extends org.apache.kafka.common.utils.MockTime {
+public final class MockTime extends org.apache.kafka.common.utils.MockTime {
     public final MockScheduler scheduler;
 
     public MockTime() {
         this(System.currentTimeMillis(), System.nanoTime());
     }
 
-    @SuppressWarnings("this-escape")
     public MockTime(long currentTimeMs, long currentHiResTimeNs) {
         super(0L, currentTimeMs, currentHiResTimeNs);
         scheduler = new MockScheduler(this);
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
index 8961957306d..120c5925fee 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
@@ -51,7 +51,7 @@ import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UND
  * Hence, it is instantiater's responsibility to ensure restoring the cache to 
the correct state after instantiating
  * this class from checkpoint (which might contain stale epoch entries right 
after instantiation).
  */
-public class LeaderEpochFileCache {
+public final class LeaderEpochFileCache {
     private final TopicPartition topicPartition;
     private final LeaderEpochCheckpointFile checkpoint;
     private final Scheduler scheduler;
@@ -66,7 +66,6 @@ public class LeaderEpochFileCache {
      * @param checkpoint     the checkpoint file
      * @param scheduler      the scheduler to use for async I/O operations
      */
-    @SuppressWarnings("this-escape")
     public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpointFile checkpoint, Scheduler scheduler) {
         this.checkpoint = checkpoint;
         this.topicPartition = topicPartition;
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
index 09bde2a88cb..fe872a5cd7f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
@@ -51,7 +51,7 @@ import java.util.Optional;
  * All external APIs translate from relative offsets to full offsets, so users 
of this class do not interact with the internal
  * storage format.
  */
-public class OffsetIndex extends AbstractIndex {
+public final class OffsetIndex extends AbstractIndex {
     private static final Logger log = 
LoggerFactory.getLogger(OffsetIndex.class);
     private static final int ENTRY_SIZE = 8;
 
@@ -66,7 +66,6 @@ public class OffsetIndex extends AbstractIndex {
         this(file, baseOffset, maxIndexSize, true);
     }
 
-    @SuppressWarnings("this-escape")
     public OffsetIndex(File file, long baseOffset, int maxIndexSize, boolean 
writable) throws IOException {
         super(file, baseOffset, maxIndexSize, writable);
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index 1163e819d1a..849ac556155 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -33,11 +33,10 @@ import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
 
-public class RemoteStorageThreadPool extends ThreadPoolExecutor {
+public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteStorageThreadPool.class);
     private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
 
-    @SuppressWarnings("this-escape")
     public RemoteStorageThreadPool(String threadNamePrefix,
                                    int numThreads,
                                    int maxPendingTasks) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index 5b893e3c9de..2e62cdcccee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -71,7 +71,7 @@ import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSi
  * topology builders via the {@link 
org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) 
StreamsBuilder(TopologyConfig)} method.
  */
 @SuppressWarnings("deprecation")
-public class TopologyConfig extends AbstractConfig {
+public final class TopologyConfig extends AbstractConfig {
     private static final ConfigDef CONFIG;
     static {
         CONFIG = new ConfigDef()
@@ -151,7 +151,6 @@ public class TopologyConfig extends AbstractConfig {
         this(null, globalAppConfigs, new Properties());
     }
 
-    @SuppressWarnings("this-escape")
     public TopologyConfig(final String topologyName, final StreamsConfig 
globalAppConfigs, final Properties topologyOverrides) {
         super(CONFIG, topologyOverrides, false);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 07a9e918ff9..cf6ce76f8d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -28,7 +28,7 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
 
-public class MaterializedInternal<K, V, S extends StateStore> extends 
Materialized<K, V, S> {
+public final class MaterializedInternal<K, V, S extends StateStore> extends 
Materialized<K, V, S> {
 
     private final boolean queryable;
 
@@ -36,7 +36,6 @@ public class MaterializedInternal<K, V, S extends StateStore> 
extends Materializ
         this(materialized, null, null);
     }
 
-    @SuppressWarnings("this-escape")
     public MaterializedInternal(final Materialized<K, V, S> materialized,
                                 final InternalNameProvider nameProvider,
                                 final String generatedStorePrefix) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index dc946ba3b6c..e526f89d78e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -49,7 +49,7 @@ import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDur
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
 
-public class ProcessorContextImpl extends AbstractProcessorContext<Object, 
Object> implements RecordCollector.Supplier {
+public final class ProcessorContextImpl extends 
AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier {
     // the below are null for standby tasks
     private StreamTask streamTask;
     private RecordCollector collector;
@@ -59,7 +59,6 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext<Object, Objec
 
     final Map<String, DirtyEntryFlushListener> cacheNameToFlushListener = new 
HashMap<>();
 
-    @SuppressWarnings("this-escape")
     public ProcessorContextImpl(final TaskId id,
                                 final StreamsConfig config,
                                 final ProcessorStateManager stateMgr,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index ec2b6920ce5..51ec80d05ac 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -51,7 +51,7 @@ import java.util.stream.Collectors;
  * 3. Neither 1 or 2, i.e. it stays idle. This is possible if we do not have 
enough executors or because those tasks
  *    are not processable (e.g. because no records fetched) yet.
  */
-public class DefaultTaskManager implements TaskManager {
+public final class DefaultTaskManager implements TaskManager {
 
     private final Time time;
     private final Logger log;
@@ -73,7 +73,6 @@ public class DefaultTaskManager implements TaskManager {
         }
     }
 
-    @SuppressWarnings("this-escape")
     public DefaultTaskManager(final Time time,
                               final String clientId,
                               final TasksRegistry tasks,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e5a9c8f8abc..0e85e392e31 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -94,7 +94,6 @@ import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
-@SuppressWarnings("this-escape")
 public class StoreChangelogReaderTest {
 
     @Mock
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 35c1a5e2cc8..54ee48fd4bb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -72,7 +72,6 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("this-escape")
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
 public class MeteredTimestampedKeyValueStoreTest {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
index 2f7efe44aad..d785499dff0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -45,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("this-escape")
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
 public class TimestampedWindowStoreBuilderTest {
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java 
b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index 0bc1457a029..345a5824a20 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -30,7 +30,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 
-public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
+public final class MockRestoreConsumer<K, V> extends MockConsumer<byte[], 
byte[]> {
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
 
@@ -41,7 +41,6 @@ public class MockRestoreConsumer<K, V> extends 
MockConsumer<byte[], byte[]> {
 
     private final ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new 
ArrayList<>();
 
-    @SuppressWarnings("this-escape")
     public MockRestoreConsumer(final Serializer<K> keySerializer, final 
Serializer<V> valueSerializer) {
         super(OffsetResetStrategy.EARLIEST);
 
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
index 64332e13f5b..8f6556842f5 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
@@ -108,7 +108,7 @@ import java.util.Optional;
  *   * The average throughput was 500 messages/second, with a window of 100ms 
and a deviation of 50 messages/second.
  */
 
-public class ConfigurableProducerSpec extends TaskSpec {
+public final class ConfigurableProducerSpec extends TaskSpec {
     private final String producerNode;
     private final String bootstrapServers;
     private final Optional<FlushGenerator> flushGenerator;
@@ -121,7 +121,6 @@ public class ConfigurableProducerSpec extends TaskSpec {
     private final TopicsSpec activeTopic;
     private final int activePartition;
 
-    @SuppressWarnings("this-escape")
     @JsonCreator
     public ConfigurableProducerSpec(@JsonProperty("startMs") long startMs,
                                     @JsonProperty("durationMs") long 
durationMs,
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
index aa6cf7e5dae..e5dfe629ec5 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
@@ -34,7 +34,7 @@ import java.util.TreeSet;
  * The specification for a task which connects and disconnects many times a
  * second to stress the broker.
  */
-public class ConnectionStressSpec extends TaskSpec {
+public final class ConnectionStressSpec extends TaskSpec {
     private final List<String> clientNodes;
     private final String bootstrapServers;
     private final Map<String, String> commonClientConf;
@@ -47,7 +47,6 @@ public class ConnectionStressSpec extends TaskSpec {
         FETCH_METADATA
     }
 
-    @SuppressWarnings("this-escape")
     @JsonCreator
     public ConnectionStressSpec(@JsonProperty("startMs") long startMs,
             @JsonProperty("durationMs") long durationMs,
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index 3813b418fba..24869e3e966 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -89,7 +89,7 @@ import java.util.Set;
  *    }
  * }
  */
-public class ConsumeBenchSpec extends TaskSpec {
+public final class ConsumeBenchSpec extends TaskSpec {
 
     private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = 
"^[^:]+(:[\\d]+|[^:]*)$";
     private final String consumerNode;
@@ -104,7 +104,6 @@ public class ConsumeBenchSpec extends TaskSpec {
     private final int threadsPerWorker;
     private final Optional<RecordProcessor> recordProcessor;
 
-    @SuppressWarnings("this-escape")
     @JsonCreator
     public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
                             @JsonProperty("durationMs") long durationMs,
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 653fa9cddb6..621333d74f9 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -59,7 +59,7 @@ import java.util.Optional;
  *   }
  * }
  */
-public class ProduceBenchSpec extends TaskSpec {
+public final class ProduceBenchSpec extends TaskSpec {
     private final String producerNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
@@ -75,7 +75,6 @@ public class ProduceBenchSpec extends TaskSpec {
     private final boolean useConfiguredPartitioner;
     private final boolean skipFlush;
 
-    @SuppressWarnings("this-escape")
     @JsonCreator
     public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
                          @JsonProperty("durationMs") long durationMs,
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 1000fb32ead..223b0c0e20c 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -31,7 +31,7 @@ import java.util.Map;
  * The specification for a workload that sends messages to a broker and then
  * reads them back.
  */
-public class RoundTripWorkloadSpec extends TaskSpec {
+public final class RoundTripWorkloadSpec extends TaskSpec {
     private final String clientNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
@@ -43,7 +43,6 @@ public class RoundTripWorkloadSpec extends TaskSpec {
     private final Map<String, String> consumerConf;
     private final Map<String, String> adminClientConf;
 
-    @SuppressWarnings("this-escape")
     @JsonCreator
     public RoundTripWorkloadSpec(@JsonProperty("startMs") long startMs,
              @JsonProperty("durationMs") long durationMs,
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java
index 516de304f2f..ccd94784f35 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java
@@ -65,7 +65,7 @@ import java.util.Map;
  *   }
  *  }
  */
-public class SustainedConnectionSpec extends TaskSpec {
+public final class SustainedConnectionSpec extends TaskSpec {
     private final String clientNode;
     private final String bootstrapServers;
     private final Map<String, String> producerConf;
@@ -81,7 +81,6 @@ public class SustainedConnectionSpec extends TaskSpec {
     private final int numThreads;
     private final int refreshRateMs;
 
-    @SuppressWarnings("this-escape")
     @JsonCreator
     public SustainedConnectionSpec(
             @JsonProperty("startMs") long startMs,

Reply via email to