This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d4c661c0174 MINOR: cleanup warnings in Kafka Streams code base (#14549)
d4c661c0174 is described below
commit d4c661c01745e5b98027cbcc51b5a06e8b41f405
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Oct 15 19:32:32 2023 -0700
MINOR: cleanup warnings in Kafka Streams code base (#14549)
Reviewers: Guozhang Wang <[email protected]>, A. Sophie Blee-Goldman
<[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 40 ++++++++++++++--------
.../org/apache/kafka/streams/KeyQueryMetadata.java | 6 ++--
.../java/org/apache/kafka/streams/KeyValue.java | 2 +-
.../apache/kafka/streams/StoreQueryParameters.java | 2 +-
.../org/apache/kafka/streams/StreamsBuilder.java | 8 ++---
.../org/apache/kafka/streams/StreamsConfig.java | 37 +++++++++-----------
.../org/apache/kafka/streams/StreamsMetadata.java | 4 +--
.../org/apache/kafka/streams/StreamsMetrics.java | 8 ++---
.../java/org/apache/kafka/streams/Topology.java | 2 +-
.../org/apache/kafka/streams/TopologyConfig.java | 2 +-
.../apache/kafka/streams/TopologyDescription.java | 17 ++++++++-
.../streams/errors/TaskCorruptedException.java | 10 +++---
.../namedtopology/NamedTopologyBuilder.java | 2 +-
13 files changed, 81 insertions(+), 59 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index da998d5d983..dec8527e978 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -86,7 +86,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@@ -1093,7 +1092,7 @@ public class KafkaStreams implements AutoCloseable {
* threads are adapted so that the sum of the cache sizes over all stream
threads equals the total
* cache size specified in configuration {@link
StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
*
- * @param timeout The length of time to wait for the thread to shutdown
+ * @param timeout The length of time to wait for the thread to shut down
* @throws org.apache.kafka.common.errors.TimeoutException if the thread
does not stop in time
* @return name of the removed stream thread or empty if a stream thread
could not be removed because
* no stream threads are alive
@@ -1148,20 +1147,32 @@ public class KafkaStreams implements AutoCloseable {
try {
final long remainingTimeMs = timeoutMs -
(time.milliseconds() - startMs);
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs,
TimeUnit.MILLISECONDS);
- } catch (final
java.util.concurrent.TimeoutException e) {
- log.error("Could not remove static member {}
from consumer group {} due to a timeout: {}",
- groupInstanceID.get(),
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
- throw new TimeoutException(e.getMessage(), e);
+ } catch (final
java.util.concurrent.TimeoutException exception) {
+ log.error(
+ String.format(
+ "Could not remove static member %s
from consumer group %s due to a timeout:",
+ groupInstanceID.get(),
+
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
+ ),
+ exception
+ );
+ throw new
TimeoutException(exception.getMessage(), exception);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
- } catch (final ExecutionException e) {
- log.error("Could not remove static member {}
from consumer group {} due to: {}",
- groupInstanceID.get(),
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
+ } catch (final ExecutionException exception) {
+ log.error(
+ String.format(
+ "Could not remove static member %s
from consumer group %s due to:",
+ groupInstanceID.get(),
+
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
+ ),
+ exception
+ );
throw new StreamsException(
"Could not remove static member " +
groupInstanceID.get()
+ " from consumer group " +
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
+ " for the following reason: ",
- e.getCause()
+ exception.getCause()
);
}
}
@@ -1180,7 +1191,7 @@ public class KafkaStreams implements AutoCloseable {
return Optional.empty();
}
- /**
+ /*
* Takes a snapshot and counts the number of stream threads which are not
in PENDING_SHUTDOWN or DEAD
*
* note: iteration over SynchronizedList is not thread safe so it must be
manually synchronized. However, we may
@@ -1723,7 +1734,7 @@ public class KafkaStreams implements AutoCloseable {
/**
* This method pauses processing for the KafkaStreams instance.
*
- * Paused topologies will only skip over a) processing, b) punctuation,
and c) standby tasks.
+ * <p>Paused topologies will only skip over a) processing, b)
punctuation, and c) standby tasks.
* Notably, paused topologies will still poll Kafka consumers, and commit
offsets.
* This method sets transient state that is not maintained or managed
among instances.
* Note that pause() can be called before start() in order to start a
KafkaStreams instance
@@ -1787,7 +1798,6 @@ public class KafkaStreams implements AutoCloseable {
* @deprecated since 3.0 use {@link #metadataForLocalThreads()}
*/
@Deprecated
- @SuppressWarnings("deprecation")
public Set<org.apache.kafka.streams.processor.ThreadMetadata>
localThreadsMetadata() {
return metadataForLocalThreads().stream().map(threadMetadata -> new
org.apache.kafka.streams.processor.ThreadMetadata(
threadMetadata.threadName(),
@@ -1835,7 +1845,7 @@ public class KafkaStreams implements AutoCloseable {
* values returned are just estimates and meant to be used for making soft
decisions on whether the data in the store
* partition is fresh enough for querying.
*
- * Note: Each invocation of this method issues a call to the Kafka
brokers. Thus its advisable to limit the frequency
+ * <p>Note: Each invocation of this method issues a call to the Kafka
brokers. Thus, it's advisable to limit the frequency
* of invocation to once every few seconds.
*
* @return map of store names to another map of partition to {@link
LagInfo}s
@@ -1885,7 +1895,7 @@ public class KafkaStreams implements AutoCloseable {
* Run an interactive query against a state store.
* <p>
* This method allows callers outside of the Streams runtime to access the
internal state of
- * stateful processors. See
https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+ * stateful processors. See <a
href="https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html">IQ
docs</a>
* for more information.
* <p>
* NOTICE: This functionality is {@link Evolving} and subject to change in
minor versions.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
index 9ca495214d6..9f4311922be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
@@ -53,7 +53,7 @@ public class KeyQueryMetadata {
* Get the active Kafka Streams instance for given key.
*
* @return active instance's {@link HostInfo}
- * @deprecated Use {@link #activeHost()} instead.
+ * @deprecated since 2.7.0; use {@link #activeHost()} instead.
*/
@Deprecated
public HostInfo getActiveHost() {
@@ -64,7 +64,7 @@ public class KeyQueryMetadata {
* Get the Kafka Streams instances that host the key as standbys.
*
* @return set of standby {@link HostInfo} or a empty set, if no standbys
are configured
- * @deprecated Use {@link #standbyHosts()} instead.
+ * @deprecated since 2.7.0; use {@link #standbyHosts()} instead.
*/
@Deprecated
public Set<HostInfo> getStandbyHosts() {
@@ -75,7 +75,7 @@ public class KeyQueryMetadata {
* Get the store partition corresponding to the key.
*
* @return store partition number
- * @deprecated Use {@link #partition()} instead.
+ * @deprecated since 2.7.0; use {@link #partition()} instead.
*/
@Deprecated
public int getPartition() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index b534d1101ab..d9d38c39378 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -71,7 +71,7 @@ public class KeyValue<K, V> {
return false;
}
- final KeyValue other = (KeyValue) obj;
+ final KeyValue<?, ?> other = (KeyValue<?, ?>) obj;
return Objects.equals(key, other.key) && Objects.equals(value,
other.value);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
b/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
index e0dd96f7d7e..b76f310d8d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
@@ -105,7 +105,7 @@ public class StoreQueryParameters<T> {
if (!(obj instanceof StoreQueryParameters)) {
return false;
}
- final StoreQueryParameters storeQueryParameters =
(StoreQueryParameters) obj;
+ final StoreQueryParameters<?> storeQueryParameters =
(StoreQueryParameters<?>) obj;
return Objects.equals(storeQueryParameters.partition, partition)
&& Objects.equals(storeQueryParameters.staleStores,
staleStores)
&& Objects.equals(storeQueryParameters.storeName, storeName)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 1c2cde831fd..6746bd28f7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -87,12 +87,12 @@ public class StreamsBuilder {
*/
@SuppressWarnings("this-escape")
public StreamsBuilder(final TopologyConfig topologyConfigs) {
- topology = getNewTopology(topologyConfigs);
+ topology = newTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new
InternalStreamsBuilder(internalTopologyBuilder);
}
- protected Topology getNewTopology(final TopologyConfig topologyConfigs) {
+ protected Topology newTopology(final TopologyConfig topologyConfigs) {
return new Topology(topologyConfigs);
}
@@ -416,7 +416,7 @@ public class StreamsBuilder {
/**
* Create a {@link GlobalKTable} for the specified topic.
- *
+ * <p>
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local
{@link KeyValueStore} configured with
@@ -467,7 +467,7 @@ public class StreamsBuilder {
/**
* Create a {@link GlobalKTable} for the specified topic.
- *
+ * <p>
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local
{@link KeyValueStore} configured with
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2b36a0820b4..320370503e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -157,6 +157,7 @@ public class StreamsConfig extends AbstractConfig {
private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
private static final int DEFAULT_TRANSACTION_TIMEOUT = 10000;
+ @SuppressWarnings("unused")
public static final int DUMMY_THREAD_INDEX = 1;
public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
@@ -240,14 +241,12 @@ public class StreamsConfig extends AbstractConfig {
public static final String CLIENT_TAG_PREFIX = "client.tag.";
/** {@code topology.optimization} */
- private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
- + " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
- + "or a comma separated list of specific optimizations: "
- + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\" +
"
- + "\"SINGLE_STORE_SELF_JOIN+\").";
-
-
public static final String TOPOLOGY_OPTIMIZATION_CONFIG =
"topology.optimization";
+ private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
+ + " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+ + "or a comma separated list of specific optimizations: "
+ + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\",
\"+MERGE_REPARTITION_TOPICS+\" + "
+ + "\"SINGLE_STORE_SELF_JOIN+\").";
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration
telling Kafka "
+ "Streams if it should optimize the topology and what optimizations
to apply. "
+ CONFIG_ERROR_MSG
@@ -1380,8 +1379,12 @@ public class StreamsConfig extends AbstractConfig {
private void verifyEOSTransactionTimeoutCompatibility() {
final long commitInterval = getLong(COMMIT_INTERVAL_MS_CONFIG);
final String transactionTimeoutConfigKey =
producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
- final int transactionTimeout =
originals().containsKey(transactionTimeoutConfigKey) ? (int) parseType(
- transactionTimeoutConfigKey,
originals().get(transactionTimeoutConfigKey), Type.INT) :
DEFAULT_TRANSACTION_TIMEOUT;
+ final int transactionTimeout =
+ originals().containsKey(transactionTimeoutConfigKey) ?
+ (int) Objects.requireNonNull(
+ parseType(transactionTimeoutConfigKey,
originals().get(transactionTimeoutConfigKey), Type.INT),
+ "Could not parse config `" + COMMIT_INTERVAL_MS_CONFIG
+ "` because it's set to `null`") :
+ DEFAULT_TRANSACTION_TIMEOUT;
if (transactionTimeout < commitInterval) {
throw new IllegalArgumentException(String.format("Transaction
timeout %d was set lower than " +
@@ -1548,9 +1551,7 @@ public class StreamsConfig extends AbstractConfig {
// Get main consumer override configs
final Map<String, Object> mainConsumerProps =
originalsWithPrefix(MAIN_CONSUMER_PREFIX);
- for (final Map.Entry<String, Object> entry:
mainConsumerProps.entrySet()) {
- consumerProps.put(entry.getKey(), entry.getValue());
- }
+ consumerProps.putAll(mainConsumerProps);
// this is a hack to work around StreamsConfig constructor inside
StreamsPartitionAssignor to avoid casting
consumerProps.put(APPLICATION_ID_CONFIG, groupId);
@@ -1622,9 +1623,7 @@ public class StreamsConfig extends AbstractConfig {
// Get restore consumer override configs
final Map<String, Object> restoreConsumerProps =
originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
- for (final Map.Entry<String, Object> entry:
restoreConsumerProps.entrySet()) {
- baseConsumerProps.put(entry.getKey(), entry.getValue());
- }
+ baseConsumerProps.putAll(restoreConsumerProps);
// no need to set group id for a restore consumer
baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
@@ -1657,9 +1656,7 @@ public class StreamsConfig extends AbstractConfig {
// Get global consumer override configs
final Map<String, Object> globalConsumerProps =
originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
- for (final Map.Entry<String, Object> entry:
globalConsumerProps.entrySet()) {
- baseConsumerProps.put(entry.getKey(), entry.getValue());
- }
+ baseConsumerProps.putAll(globalConsumerProps);
// no need to set group id for a global consumer
baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
@@ -1806,7 +1803,7 @@ public class StreamsConfig extends AbstractConfig {
* @return an configured instance of key Serde class
*/
@SuppressWarnings("WeakerAccess")
- public Serde defaultKeySerde() {
+ public Serde<?> defaultKeySerde() {
final Object keySerdeConfigSetting =
get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
if (keySerdeConfigSetting == null) {
throw new ConfigException("Please specify a key serde or set one
through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG");
@@ -1828,7 +1825,7 @@ public class StreamsConfig extends AbstractConfig {
* @return an configured instance of value Serde class
*/
@SuppressWarnings("WeakerAccess")
- public Serde defaultValueSerde() {
+ public Serde<?> defaultValueSerde() {
final Object valueSerdeConfigSetting =
get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
if (valueSerdeConfigSetting == null) {
throw new ConfigException("Please specify a value serde or set one
through StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
index 11c4941b3d6..4af0f614a1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
@@ -65,7 +65,7 @@ public interface StreamsMetadata {
/**
* Host where the Streams client runs.
*
- * This method is equivalent to {@code StreamsMetadata.hostInfo().host();}
+ * <p>This method is equivalent to {@code
StreamsMetadata.hostInfo().host();}
*
* @return the host where the Streams client runs
*/
@@ -74,7 +74,7 @@ public interface StreamsMetadata {
/**
* Port on which the Streams client listens.
*
- * This method is equivalent to {@code StreamsMetadata.hostInfo().port();}
+ * <p>This method is equivalent to {@code
StreamsMetadata.hostInfo().port();}
*
* @return the port on which Streams client listens
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
index cbf21698173..7e142fa4dc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -46,10 +46,10 @@ public interface StreamsMetrics {
* of the operation, and hence the rate / count metrics will be updated
accordingly; and the recorded latency value
* will be used to update the average / max latency as well.
*
- * Note that you can add more metrics to this sensor after you created it,
which can then be updated upon
+ * <p>Note that you can add more metrics to this sensor after you created
it, which can then be updated upon
* {@link Sensor#record(double)} calls.
*
- * The added sensor and its metrics can be removed with {@link
#removeSensor(Sensor) removeSensor()}.
+ * <p>The added sensor and its metrics can be removed with {@link
#removeSensor(Sensor) removeSensor()}.
*
* @param scopeName name of the scope, which will be used as part
of the metric type, e.g.: "stream-[scope]-metrics".
* @param entityName name of the entity, which will be used as
part of the metric tags, e.g.: "[scope]-id" = "[entity]".
@@ -76,10 +76,10 @@ public interface StreamsMetrics {
* Whenever a user records this sensor via {@link Sensor#record(double)}
etc,
* it will be counted as one invocation of the operation, and hence the
rate / count metrics will be updated accordingly.
*
- * Note that you can add more metrics to this sensor after you created it,
which can then be updated upon
+ * <p>Note that you can add more metrics to this sensor after you created
it, which can then be updated upon
* {@link Sensor#record(double)} calls.
*
- * The added sensor and its metrics can be removed with {@link
#removeSensor(Sensor) removeSensor()}.
+ * <p>The added sensor and its metrics can be removed with {@link
#removeSensor(Sensor) removeSensor()}.
*
* @param scopeName name of the scope, which will be used as part
of the metrics type, e.g.: "stream-[scope]-metrics".
* @param entityName name of the entity, which will be used as
part of the metric tags, e.g.: "[scope]-id" = "[entity]".
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 314d2190f66..1a3089b29de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -716,7 +716,7 @@ public class Topology {
internalTopologyBuilder.addProcessor(name, supplier, parentNames);
final Set<StoreBuilder<?>> stores = supplier.stores();
if (stores != null) {
- for (final StoreBuilder storeBuilder : stores) {
+ for (final StoreBuilder<?> storeBuilder : stores) {
internalTopologyBuilder.addStateStore(storeBuilder, name);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index 83516197347..afaa1ed269e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -105,7 +105,7 @@ public class TopologyConfig extends AbstractConfig {
Importance.LOW,
DEFAULT_DSL_STORE_DOC);
}
- private final Logger log = LoggerFactory.getLogger(TopologyConfig.class);
+ private final static Logger log =
LoggerFactory.getLogger(TopologyConfig.class);
public final String topologyName;
public final boolean eosEnabled;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 242b9e3e009..a2ff98dcb9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -51,12 +51,14 @@ public interface TopologyDescription {
* Internally assigned unique ID.
* @return the ID of the sub-topology
*/
+ @SuppressWarnings("unused")
int id();
/**
* All nodes of this sub-topology.
* @return set of all nodes within the sub-topology
*/
+ @SuppressWarnings("unused")
Set<Node> nodes();
}
@@ -75,14 +77,17 @@ public interface TopologyDescription {
* The source node reading from a "global" topic.
* @return the "global" source node
*/
+ @SuppressWarnings("unused")
Source source();
/**
* The processor node maintaining the global store.
* @return the "global" processor node
*/
+ @SuppressWarnings("unused")
Processor processor();
+ @SuppressWarnings("unused")
int id();
}
@@ -94,6 +99,7 @@ public interface TopologyDescription {
* The name of the node. Will never be {@code null}.
* @return the name of the node
*/
+ @SuppressWarnings("unused")
String name();
/**
* The predecessors of this node within a sub-topology.
@@ -101,6 +107,7 @@ public interface TopologyDescription {
* Will never be {@code null}.
* @return set of all predecessors
*/
+ @SuppressWarnings("unused")
Set<Node> predecessors();
/**
* The successor of this node within a sub-topology.
@@ -108,6 +115,7 @@ public interface TopologyDescription {
* Will never be {@code null}.
* @return set of all successor
*/
+ @SuppressWarnings("unused")
Set<Node> successors();
}
@@ -121,12 +129,14 @@ public interface TopologyDescription {
* The topic names this source node is reading from.
* @return a set of topic names
*/
+ @SuppressWarnings("unused")
Set<String> topicSet();
/**
* The pattern used to match topic names that is reading from.
* @return the pattern used to match topic names
*/
+ @SuppressWarnings("unused")
Pattern topicPattern();
}
@@ -138,6 +148,7 @@ public interface TopologyDescription {
* The names of all connected stores.
* @return set of store names
*/
+ @SuppressWarnings("unused")
Set<String> stores();
}
@@ -150,6 +161,7 @@ public interface TopologyDescription {
* Could be {@code null} if the topic name can only be dynamically
determined based on {@link TopicNameExtractor}
* @return a topic name
*/
+ @SuppressWarnings("unused")
String topic();
/**
@@ -157,19 +169,22 @@ public interface TopologyDescription {
* Could be {@code null} if the topic name is not dynamically
determined.
* @return the {@link TopicNameExtractor} class used get the topic name
*/
- TopicNameExtractor topicNameExtractor();
+ @SuppressWarnings("unused")
+ TopicNameExtractor<?, ?> topicNameExtractor();
}
/**
* All sub-topologies of the represented topology.
* @return set of all sub-topologies
*/
+ @SuppressWarnings("unused")
Set<Subtopology> subtopologies();
/**
* All global stores of the represented topology.
* @return set of all global stores
*/
+ @SuppressWarnings("unused")
Set<GlobalStore> globalStores();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
index 854f470e53c..0f6c50579d3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
@@ -22,12 +22,12 @@ import org.apache.kafka.streams.processor.TaskId;
import java.util.Set;
/**
- * Indicates a specific task is corrupted and need to be re-initialized. It
can be thrown when
+ * Indicates a specific task is corrupted and need to be re-initialized. It
can be thrown when:
*
- * 1) Under EOS, if the checkpoint file does not contain offsets for
corresponding store's changelogs, meaning
- * previously it was not close cleanly;
- * 2) Out-of-range exception thrown during restoration, meaning that the
changelog has been modified and we re-bootstrap
- * the store.
+ * <ol>
+ * <li>Under EOS, if the checkpoint file does not contain offsets for
corresponding store's changelogs, meaning previously it was not close
cleanly.</li>
+ * <li>Out-of-range exception thrown during restoration, meaning that the
changelog has been modified and we re-bootstrap the store.</li>
+ * </ol>
*/
public class TaskCorruptedException extends StreamsException {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java
index 42af69e25f5..8e04a8e8c03 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java
@@ -37,7 +37,7 @@ public class NamedTopologyBuilder extends StreamsBuilder {
}
@Override
- protected NamedTopology getNewTopology(final TopologyConfig
topologyConfigs) {
+ protected NamedTopology newTopology(final TopologyConfig topologyConfigs) {
return new NamedTopology(new InternalTopologyBuilder(topologyConfigs));
}
}