This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d4c661c0174 MINOR: cleanup warnings in Kafka Streams code base (#14549)
d4c661c0174 is described below

commit d4c661c01745e5b98027cbcc51b5a06e8b41f405
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Oct 15 19:32:32 2023 -0700

    MINOR: cleanup warnings in Kafka Streams code base (#14549)
    
    Reviewers: Guozhang Wang <[email protected]>, A. Sophie Blee-Goldman 
<[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 40 ++++++++++++++--------
 .../org/apache/kafka/streams/KeyQueryMetadata.java |  6 ++--
 .../java/org/apache/kafka/streams/KeyValue.java    |  2 +-
 .../apache/kafka/streams/StoreQueryParameters.java |  2 +-
 .../org/apache/kafka/streams/StreamsBuilder.java   |  8 ++---
 .../org/apache/kafka/streams/StreamsConfig.java    | 37 +++++++++-----------
 .../org/apache/kafka/streams/StreamsMetadata.java  |  4 +--
 .../org/apache/kafka/streams/StreamsMetrics.java   |  8 ++---
 .../java/org/apache/kafka/streams/Topology.java    |  2 +-
 .../org/apache/kafka/streams/TopologyConfig.java   |  2 +-
 .../apache/kafka/streams/TopologyDescription.java  | 17 ++++++++-
 .../streams/errors/TaskCorruptedException.java     | 10 +++---
 .../namedtopology/NamedTopologyBuilder.java        |  2 +-
 13 files changed, 81 insertions(+), 59 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index da998d5d983..dec8527e978 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -86,7 +86,6 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
@@ -1093,7 +1092,7 @@ public class KafkaStreams implements AutoCloseable {
      * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
      * cache size specified in configuration {@link 
StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
      *
-     * @param timeout The length of time to wait for the thread to shutdown
+     * @param timeout The length of time to wait for the thread to shut down
      * @throws org.apache.kafka.common.errors.TimeoutException if the thread 
does not stop in time
      * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
      *         no stream threads are alive
@@ -1148,20 +1147,32 @@ public class KafkaStreams implements AutoCloseable {
                             try {
                                 final long remainingTimeMs = timeoutMs - 
(time.milliseconds() - startMs);
                                 
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs,
 TimeUnit.MILLISECONDS);
-                            } catch (final 
java.util.concurrent.TimeoutException e) {
-                                log.error("Could not remove static member {} 
from consumer group {} due to a timeout: {}",
-                                          groupInstanceID.get(), 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
-                                throw new TimeoutException(e.getMessage(), e);
+                            } catch (final 
java.util.concurrent.TimeoutException exception) {
+                                log.error(
+                                    String.format(
+                                        "Could not remove static member %s 
from consumer group %s due to a timeout:",
+                                        groupInstanceID.get(),
+                                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
+                                    ),
+                                        exception
+                                );
+                                throw new 
TimeoutException(exception.getMessage(), exception);
                             } catch (final InterruptedException e) {
                                 Thread.currentThread().interrupt();
-                            } catch (final ExecutionException e) {
-                                log.error("Could not remove static member {} 
from consumer group {} due to: {}",
-                                          groupInstanceID.get(), 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
+                            } catch (final ExecutionException exception) {
+                                log.error(
+                                    String.format(
+                                        "Could not remove static member %s 
from consumer group %s due to:",
+                                        groupInstanceID.get(),
+                                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
+                                    ),
+                                        exception
+                                );
                                 throw new StreamsException(
                                         "Could not remove static member " + 
groupInstanceID.get()
                                             + " from consumer group " + 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
                                             + " for the following reason: ",
-                                        e.getCause()
+                                        exception.getCause()
                                 );
                             }
                         }
@@ -1180,7 +1191,7 @@ public class KafkaStreams implements AutoCloseable {
         return Optional.empty();
     }
 
-    /**
+    /*
      * Takes a snapshot and counts the number of stream threads which are not 
in PENDING_SHUTDOWN or DEAD
      *
      * note: iteration over SynchronizedList is not thread safe so it must be 
manually synchronized. However, we may
@@ -1723,7 +1734,7 @@ public class KafkaStreams implements AutoCloseable {
     /**
      *  This method pauses processing for the KafkaStreams instance.
      *
-     *  Paused topologies will only skip over a) processing, b) punctuation, 
and c) standby tasks.
+     *  <p>Paused topologies will only skip over a) processing, b) 
punctuation, and c) standby tasks.
      *  Notably, paused topologies will still poll Kafka consumers, and commit 
offsets.
      *  This method sets transient state that is not maintained or managed 
among instances.
      *  Note that pause() can be called before start() in order to start a 
KafkaStreams instance
@@ -1787,7 +1798,6 @@ public class KafkaStreams implements AutoCloseable {
      * @deprecated since 3.0 use {@link #metadataForLocalThreads()}
      */
     @Deprecated
-    @SuppressWarnings("deprecation")
     public Set<org.apache.kafka.streams.processor.ThreadMetadata> 
localThreadsMetadata() {
         return metadataForLocalThreads().stream().map(threadMetadata -> new 
org.apache.kafka.streams.processor.ThreadMetadata(
                 threadMetadata.threadName(),
@@ -1835,7 +1845,7 @@ public class KafkaStreams implements AutoCloseable {
      * values returned are just estimates and meant to be used for making soft 
decisions on whether the data in the store
      * partition is fresh enough for querying.
      *
-     * Note: Each invocation of this method issues a call to the Kafka 
brokers. Thus its advisable to limit the frequency
+     * <p>Note: Each invocation of this method issues a call to the Kafka 
brokers. Thus, it's advisable to limit the frequency
      * of invocation to once every few seconds.
      *
      * @return map of store names to another map of partition to {@link 
LagInfo}s
@@ -1885,7 +1895,7 @@ public class KafkaStreams implements AutoCloseable {
      * Run an interactive query against a state store.
      * <p>
      * This method allows callers outside of the Streams runtime to access the 
internal state of
-     * stateful processors. See 
https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * stateful processors. See <a 
href="https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html";>IQ
 docs</a>
      * for more information.
      * <p>
      * NOTICE: This functionality is {@link Evolving} and subject to change in 
minor versions.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java 
b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
index 9ca495214d6..9f4311922be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
@@ -53,7 +53,7 @@ public class KeyQueryMetadata {
      * Get the active Kafka Streams instance for given key.
      *
      * @return active instance's {@link HostInfo}
-     * @deprecated Use {@link #activeHost()} instead.
+     * @deprecated since 2.7.0; use {@link #activeHost()} instead.
      */
     @Deprecated
     public HostInfo getActiveHost() {
@@ -64,7 +64,7 @@ public class KeyQueryMetadata {
      * Get the Kafka Streams instances that host the key as standbys.
      *
      * @return set of standby {@link HostInfo} or a empty set, if no standbys 
are configured
-     * @deprecated Use {@link #standbyHosts()} instead.
+     * @deprecated since 2.7.0; use {@link #standbyHosts()} instead.
      */
     @Deprecated
     public Set<HostInfo> getStandbyHosts() {
@@ -75,7 +75,7 @@ public class KeyQueryMetadata {
      * Get the store partition corresponding to the key.
      *
      * @return store partition number
-     * @deprecated Use {@link #partition()} instead.
+     * @deprecated since 2.7.0; use {@link #partition()} instead.
      */
     @Deprecated
     public int getPartition() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java 
b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index b534d1101ab..d9d38c39378 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -71,7 +71,7 @@ public class KeyValue<K, V> {
             return false;
         }
 
-        final KeyValue other = (KeyValue) obj;
+        final KeyValue<?, ?> other = (KeyValue<?, ?>) obj;
         return Objects.equals(key, other.key) && Objects.equals(value, 
other.value);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java 
b/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
index e0dd96f7d7e..b76f310d8d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
@@ -105,7 +105,7 @@ public class StoreQueryParameters<T> {
         if (!(obj instanceof StoreQueryParameters)) {
             return false;
         }
-        final StoreQueryParameters storeQueryParameters = 
(StoreQueryParameters) obj;
+        final StoreQueryParameters<?> storeQueryParameters = 
(StoreQueryParameters<?>) obj;
         return Objects.equals(storeQueryParameters.partition, partition)
                 && Objects.equals(storeQueryParameters.staleStores, 
staleStores)
                 && Objects.equals(storeQueryParameters.storeName, storeName)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 1c2cde831fd..6746bd28f7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -87,12 +87,12 @@ public class StreamsBuilder {
      */
     @SuppressWarnings("this-escape")
     public StreamsBuilder(final TopologyConfig topologyConfigs) {
-        topology = getNewTopology(topologyConfigs);
+        topology = newTopology(topologyConfigs);
         internalTopologyBuilder = topology.internalTopologyBuilder;
         internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
     }
 
-    protected Topology getNewTopology(final TopologyConfig topologyConfigs) {
+    protected Topology newTopology(final TopologyConfig topologyConfigs) {
         return new Topology(topologyConfigs);
     }
 
@@ -416,7 +416,7 @@ public class StreamsBuilder {
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.
-     *
+     * <p>
      * Input {@link KeyValue} pairs with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local 
{@link KeyValueStore} configured with
@@ -467,7 +467,7 @@ public class StreamsBuilder {
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.
-     *
+     * <p>
      * Input {@link KeyValue} pairs with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local 
{@link KeyValueStore} configured with
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2b36a0820b4..320370503e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -157,6 +157,7 @@ public class StreamsConfig extends AbstractConfig {
     private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
     private static final int DEFAULT_TRANSACTION_TIMEOUT = 10000;
 
+    @SuppressWarnings("unused")
     public static final int DUMMY_THREAD_INDEX = 1;
     public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
 
@@ -240,14 +241,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String CLIENT_TAG_PREFIX = "client.tag.";
 
     /** {@code topology.optimization} */
-    private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
-        + " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
-        + "or a comma separated list of specific optimizations: "
-        + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\" + 
"
-        + "\"SINGLE_STORE_SELF_JOIN+\").";
-
-
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
+    private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
+            + " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+            + "or a comma separated list of specific optimizations: "
+            + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", 
\"+MERGE_REPARTITION_TOPICS+\" + "
+            + "\"SINGLE_STORE_SELF_JOIN+\").";
     private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka "
         + "Streams if it should optimize the topology and what optimizations 
to apply. "
         + CONFIG_ERROR_MSG
@@ -1380,8 +1379,12 @@ public class StreamsConfig extends AbstractConfig {
     private void verifyEOSTransactionTimeoutCompatibility() {
         final long commitInterval = getLong(COMMIT_INTERVAL_MS_CONFIG);
         final String transactionTimeoutConfigKey = 
producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
-        final int transactionTimeout = 
originals().containsKey(transactionTimeoutConfigKey) ? (int) parseType(
-            transactionTimeoutConfigKey, 
originals().get(transactionTimeoutConfigKey), Type.INT) : 
DEFAULT_TRANSACTION_TIMEOUT;
+        final int transactionTimeout =
+                originals().containsKey(transactionTimeoutConfigKey) ?
+                    (int) Objects.requireNonNull(
+                        parseType(transactionTimeoutConfigKey, 
originals().get(transactionTimeoutConfigKey), Type.INT),
+                        "Could not parse config `" + COMMIT_INTERVAL_MS_CONFIG 
+ "` because it's set to `null`") :
+                    DEFAULT_TRANSACTION_TIMEOUT;
 
         if (transactionTimeout < commitInterval) {
             throw new IllegalArgumentException(String.format("Transaction 
timeout %d was set lower than " +
@@ -1548,9 +1551,7 @@ public class StreamsConfig extends AbstractConfig {
 
         // Get main consumer override configs
         final Map<String, Object> mainConsumerProps = 
originalsWithPrefix(MAIN_CONSUMER_PREFIX);
-        for (final Map.Entry<String, Object> entry: 
mainConsumerProps.entrySet()) {
-            consumerProps.put(entry.getKey(), entry.getValue());
-        }
+        consumerProps.putAll(mainConsumerProps);
 
         // this is a hack to work around StreamsConfig constructor inside 
StreamsPartitionAssignor to avoid casting
         consumerProps.put(APPLICATION_ID_CONFIG, groupId);
@@ -1622,9 +1623,7 @@ public class StreamsConfig extends AbstractConfig {
 
         // Get restore consumer override configs
         final Map<String, Object> restoreConsumerProps = 
originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
-        for (final Map.Entry<String, Object> entry: 
restoreConsumerProps.entrySet()) {
-            baseConsumerProps.put(entry.getKey(), entry.getValue());
-        }
+        baseConsumerProps.putAll(restoreConsumerProps);
 
         // no need to set group id for a restore consumer
         baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
@@ -1657,9 +1656,7 @@ public class StreamsConfig extends AbstractConfig {
 
         // Get global consumer override configs
         final Map<String, Object> globalConsumerProps = 
originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
-        for (final Map.Entry<String, Object> entry: 
globalConsumerProps.entrySet()) {
-            baseConsumerProps.put(entry.getKey(), entry.getValue());
-        }
+        baseConsumerProps.putAll(globalConsumerProps);
 
         // no need to set group id for a global consumer
         baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
@@ -1806,7 +1803,7 @@ public class StreamsConfig extends AbstractConfig {
      * @return an configured instance of key Serde class
      */
     @SuppressWarnings("WeakerAccess")
-    public Serde defaultKeySerde() {
+    public Serde<?> defaultKeySerde() {
         final Object keySerdeConfigSetting = 
get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
         if (keySerdeConfigSetting ==  null) {
             throw new ConfigException("Please specify a key serde or set one 
through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG");
@@ -1828,7 +1825,7 @@ public class StreamsConfig extends AbstractConfig {
      * @return an configured instance of value Serde class
      */
     @SuppressWarnings("WeakerAccess")
-    public Serde defaultValueSerde() {
+    public Serde<?> defaultValueSerde() {
         final Object valueSerdeConfigSetting = 
get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
         if (valueSerdeConfigSetting == null) {
             throw new ConfigException("Please specify a value serde or set one 
through StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
index 11c4941b3d6..4af0f614a1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
@@ -65,7 +65,7 @@ public interface StreamsMetadata {
     /**
      * Host where the Streams client runs. 
      *
-     * This method is equivalent to {@code StreamsMetadata.hostInfo().host();}
+     * <p>This method is equivalent to {@code 
StreamsMetadata.hostInfo().host();}
      *
      * @return the host where the Streams client runs
      */
@@ -74,7 +74,7 @@ public interface StreamsMetadata {
     /**
      * Port on which the Streams client listens.
      * 
-     * This method is equivalent to {@code StreamsMetadata.hostInfo().port();}
+     * <p>This method is equivalent to {@code 
StreamsMetadata.hostInfo().port();}
      *
      * @return the port on which Streams client listens
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
index cbf21698173..7e142fa4dc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -46,10 +46,10 @@ public interface StreamsMetrics {
      * of the operation, and hence the rate / count metrics will be updated 
accordingly; and the recorded latency value
      * will be used to update the average / max latency as well.
      *
-     * Note that you can add more metrics to this sensor after you created it, 
which can then be updated upon
+     * <p>Note that you can add more metrics to this sensor after you created 
it, which can then be updated upon
      * {@link Sensor#record(double)} calls.
      *
-     * The added sensor and its metrics can be removed with {@link 
#removeSensor(Sensor) removeSensor()}.
+     * <p>The added sensor and its metrics can be removed with {@link 
#removeSensor(Sensor) removeSensor()}.
      *
      * @param scopeName          name of the scope, which will be used as part 
of the metric type, e.g.: "stream-[scope]-metrics".
      * @param entityName         name of the entity, which will be used as 
part of the metric tags, e.g.: "[scope]-id" = "[entity]".
@@ -76,10 +76,10 @@ public interface StreamsMetrics {
      * Whenever a user records this sensor via {@link Sensor#record(double)} 
etc,
      * it will be counted as one invocation of the operation, and hence the 
rate / count metrics will be updated accordingly.
      *
-     * Note that you can add more metrics to this sensor after you created it, 
which can then be updated upon
+     * <p>Note that you can add more metrics to this sensor after you created 
it, which can then be updated upon
      * {@link Sensor#record(double)} calls.
      *
-     * The added sensor and its metrics can be removed with {@link 
#removeSensor(Sensor) removeSensor()}.
+     * <p>The added sensor and its metrics can be removed with {@link 
#removeSensor(Sensor) removeSensor()}.
      *
      * @param scopeName          name of the scope, which will be used as part 
of the metrics type, e.g.: "stream-[scope]-metrics".
      * @param entityName         name of the entity, which will be used as 
part of the metric tags, e.g.: "[scope]-id" = "[entity]".
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 314d2190f66..1a3089b29de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -716,7 +716,7 @@ public class Topology {
         internalTopologyBuilder.addProcessor(name, supplier, parentNames);
         final Set<StoreBuilder<?>> stores = supplier.stores();
         if (stores != null) {
-            for (final StoreBuilder storeBuilder : stores) {
+            for (final StoreBuilder<?> storeBuilder : stores) {
                 internalTopologyBuilder.addStateStore(storeBuilder, name);
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index 83516197347..afaa1ed269e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -105,7 +105,7 @@ public class TopologyConfig extends AbstractConfig {
                 Importance.LOW,
                 DEFAULT_DSL_STORE_DOC);
     }
-    private final Logger log = LoggerFactory.getLogger(TopologyConfig.class);
+    private final static Logger log = 
LoggerFactory.getLogger(TopologyConfig.class);
 
     public final String topologyName;
     public final boolean eosEnabled;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 242b9e3e009..a2ff98dcb9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -51,12 +51,14 @@ public interface TopologyDescription {
          * Internally assigned unique ID.
          * @return the ID of the sub-topology
          */
+        @SuppressWarnings("unused")
         int id();
 
         /**
          * All nodes of this sub-topology.
          * @return set of all nodes within the sub-topology
          */
+        @SuppressWarnings("unused")
         Set<Node> nodes();
     }
 
@@ -75,14 +77,17 @@ public interface TopologyDescription {
          * The source node reading from a "global" topic.
          * @return the "global" source node
          */
+        @SuppressWarnings("unused")
         Source source();
 
         /**
          * The processor node maintaining the global store.
          * @return the "global" processor node
          */
+        @SuppressWarnings("unused")
         Processor processor();
 
+        @SuppressWarnings("unused")
         int id();
     }
 
@@ -94,6 +99,7 @@ public interface TopologyDescription {
          * The name of the node. Will never be {@code null}.
          * @return the name of the node
          */
+        @SuppressWarnings("unused")
         String name();
         /**
          * The predecessors of this node within a sub-topology.
@@ -101,6 +107,7 @@ public interface TopologyDescription {
          * Will never be {@code null}.
          * @return set of all predecessors
          */
+        @SuppressWarnings("unused")
         Set<Node> predecessors();
         /**
          * The successor of this node within a sub-topology.
@@ -108,6 +115,7 @@ public interface TopologyDescription {
          * Will never be {@code null}.
          * @return set of all successor
          */
+        @SuppressWarnings("unused")
         Set<Node> successors();
     }
 
@@ -121,12 +129,14 @@ public interface TopologyDescription {
          * The topic names this source node is reading from.
          * @return a set of topic names
          */
+        @SuppressWarnings("unused")
         Set<String> topicSet();
 
         /**
          * The pattern used to match topic names that is reading from.
          * @return the pattern used to match topic names
          */
+        @SuppressWarnings("unused")
         Pattern topicPattern();
     }
 
@@ -138,6 +148,7 @@ public interface TopologyDescription {
          * The names of all connected stores.
          * @return set of store names
          */
+        @SuppressWarnings("unused")
         Set<String> stores();
     }
 
@@ -150,6 +161,7 @@ public interface TopologyDescription {
          * Could be {@code null} if the topic name can only be dynamically 
determined based on {@link TopicNameExtractor}
          * @return a topic name
          */
+        @SuppressWarnings("unused")
         String topic();
 
         /**
@@ -157,19 +169,22 @@ public interface TopologyDescription {
          * Could be {@code null} if the topic name is not dynamically 
determined.
          * @return the {@link TopicNameExtractor} class used get the topic name
          */
-        TopicNameExtractor topicNameExtractor();
+        @SuppressWarnings("unused")
+        TopicNameExtractor<?, ?> topicNameExtractor();
     }
 
     /**
      * All sub-topologies of the represented topology.
      * @return set of all sub-topologies
      */
+    @SuppressWarnings("unused")
     Set<Subtopology> subtopologies();
 
     /**
      * All global stores of the represented topology.
      * @return set of all global stores
      */
+    @SuppressWarnings("unused")
     Set<GlobalStore> globalStores();
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
index 854f470e53c..0f6c50579d3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
@@ -22,12 +22,12 @@ import org.apache.kafka.streams.processor.TaskId;
 import java.util.Set;
 
 /**
- * Indicates a specific task is corrupted and need to be re-initialized. It 
can be thrown when
+ * Indicates a specific task is corrupted and need to be re-initialized. It 
can be thrown when:
  *
- * 1) Under EOS, if the checkpoint file does not contain offsets for 
corresponding store's changelogs, meaning
- *    previously it was not close cleanly;
- * 2) Out-of-range exception thrown during restoration, meaning that the 
changelog has been modified and we re-bootstrap
- *    the store.
+ * <ol>
+ *   <li>Under EOS, if the checkpoint file does not contain offsets for 
corresponding store's changelogs, meaning previously it was not close 
cleanly.</li>
+ *   <li>Out-of-range exception thrown during restoration, meaning that the 
changelog has been modified and we re-bootstrap the store.</li>
+ * </ol>
  */
 public class TaskCorruptedException extends StreamsException {
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java
index 42af69e25f5..8e04a8e8c03 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java
@@ -37,7 +37,7 @@ public class NamedTopologyBuilder extends StreamsBuilder {
     }
 
     @Override
-    protected NamedTopology getNewTopology(final TopologyConfig 
topologyConfigs) {
+    protected NamedTopology newTopology(final TopologyConfig topologyConfigs) {
         return new NamedTopology(new InternalTopologyBuilder(topologyConfigs));
     }
 }

Reply via email to