This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d7859571242 MINOR: code cleanup in InternalTopicManager (#21165)
d7859571242 is described below

commit d78595712420db2f6ad243eb25c3d389d61173db
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 17 10:49:13 2025 -0800

    MINOR: code cleanup in InternalTopicManager (#21165)
    
    - Avoid double-brace initialization
    - Fix timeout error handling, by pass in correct list of topic names
    - Remove unnecessary code
    
    Reviewers: Vincent Potuček (@Pankraz76), Lucas Brutschy
     <[email protected]>
---
 .../processor/internals/InternalTopicManager.java  | 27 +++++++++++-----------
 1 file changed, 13 insertions(+), 14 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 40c9ea69de7..554a646b484 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -194,18 +194,19 @@ public class InternalTopicManager {
                         (streamsSide, brokerSide) -> 
validateCleanupPolicy(validationResult, streamsSide, brokerSide)
                     );
                 }
-                
+
+                final Set<String> topicsStillToValidate = new HashSet<>();
+                topicsStillToValidate.addAll(topicDescriptionsStillToValidate);
+                topicsStillToValidate.addAll(topicConfigsStillToValidate);
+
                 maybeThrowTimeout(new TimeoutContext(
-                        new HashSet<String>() {{
-                            addAll(topicDescriptionsStillToValidate);
-                            addAll(topicConfigsStillToValidate);
-                        }},
-                        deadline,
-                        "Validation timeout",
-                        String.format("Could not validate internal topics 
within %d milliseconds. " +
-                            "This can happen if the Kafka cluster is 
temporarily not available.", retryTimeoutMs),
-                        null
-                    ));
+                    topicsStillToValidate,
+                    deadline,
+                    "Validation timeout",
+                    String.format("Could not validate internal topics within 
%d milliseconds. " +
+                        "This can happen if the Kafka cluster is temporarily 
not available.", retryTimeoutMs),
+                    null
+                ));
 
                 if (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
                     Utils.sleep(100);
@@ -497,7 +498,7 @@ public class InternalTopicManager {
             }
             if (!topicsNotReady.isEmpty()) {
                 maybeThrowTimeout(new TimeoutContext(
-                    Collections.singleton("makeReadyCheck"), // dummy 
collection just to trigger if `topicsNotReady` is non-empty
+                    topicsNotReady,
                     deadlineMs,
                     "MakeReady timeout",
                     String.format("Could not create topics within %d 
milliseconds. This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs),
@@ -613,8 +614,6 @@ public class InternalTopicManager {
                     deadlineMs - time.milliseconds()
                 );
                 Utils.sleep(retryBackOffMs);
-            } else {
-                continue;
             }
         } 
         return createdTopics;

Reply via email to