This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d7859571242 MINOR: code cleanup in InternalTopicManager (#21165)
d7859571242 is described below
commit d78595712420db2f6ad243eb25c3d389d61173db
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 17 10:49:13 2025 -0800
MINOR: code cleanup in InternalTopicManager (#21165)
- Avoid double-brace initialization
- Fix timeout error handling, by pass in correct list of topic names
- Remove unnecessary code
Reviewers: Vincent Potuček (@Pankraz76), Lucas Brutschy
<[email protected]>
---
.../processor/internals/InternalTopicManager.java | 27 +++++++++++-----------
1 file changed, 13 insertions(+), 14 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 40c9ea69de7..554a646b484 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -194,18 +194,19 @@ public class InternalTopicManager {
(streamsSide, brokerSide) ->
validateCleanupPolicy(validationResult, streamsSide, brokerSide)
);
}
-
+
+ final Set<String> topicsStillToValidate = new HashSet<>();
+ topicsStillToValidate.addAll(topicDescriptionsStillToValidate);
+ topicsStillToValidate.addAll(topicConfigsStillToValidate);
+
maybeThrowTimeout(new TimeoutContext(
- new HashSet<String>() {{
- addAll(topicDescriptionsStillToValidate);
- addAll(topicConfigsStillToValidate);
- }},
- deadline,
- "Validation timeout",
- String.format("Could not validate internal topics
within %d milliseconds. " +
- "This can happen if the Kafka cluster is
temporarily not available.", retryTimeoutMs),
- null
- ));
+ topicsStillToValidate,
+ deadline,
+ "Validation timeout",
+ String.format("Could not validate internal topics within
%d milliseconds. " +
+ "This can happen if the Kafka cluster is temporarily
not available.", retryTimeoutMs),
+ null
+ ));
if (!descriptionsForTopic.isEmpty() ||
!configsForTopic.isEmpty()) {
Utils.sleep(100);
@@ -497,7 +498,7 @@ public class InternalTopicManager {
}
if (!topicsNotReady.isEmpty()) {
maybeThrowTimeout(new TimeoutContext(
- Collections.singleton("makeReadyCheck"), // dummy
collection just to trigger if `topicsNotReady` is non-empty
+ topicsNotReady,
deadlineMs,
"MakeReady timeout",
String.format("Could not create topics within %d
milliseconds. This can happen if the Kafka cluster is temporarily not
available.", retryTimeoutMs),
@@ -613,8 +614,6 @@ public class InternalTopicManager {
deadlineMs - time.milliseconds()
);
Utils.sleep(retryBackOffMs);
- } else {
- continue;
}
}
return createdTopics;