This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 218c510ffaf37090fd783f938063d8ab78d88253
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Dec 16 15:21:03 2024 -0800

    MINOR: improving some internal comments (#18152)
    
    Reviewers: Yaroslav Kutsela (@Serwios), Anna Sophie Blee-Goldman 
<[email protected]>
---
 .../kafka/streams/internals/AutoOffsetResetInternal.java   |  9 ++++++++-
 .../kafka/streams/processor/internals/StreamThread.java    | 12 +++++-------
 .../kafka/streams/processor/internals/TaskManager.java     | 14 +++++++++++++-
 .../java/org/apache/kafka/streams/AutoOffsetResetTest.java |  7 +++----
 4 files changed, 29 insertions(+), 13 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
index dc38dec304c..0fbd267a9da 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
@@ -31,8 +31,15 @@ public class AutoOffsetResetInternal extends AutoOffsetReset 
{
         return offsetResetStrategy;
     }
 
-    @SuppressWarnings("all")
     public Duration duration() {
+        if (duration.isEmpty()) {
+            throw new IllegalStateException(String.format(
+                "Duration is only available for reset strategy '%s', but reset 
strategy is '%s'. "
+                    + "Please check the reset strategy before calling 
duration() via offsetResetStrategy().",
+                StrategyType.BY_DURATION,
+                offsetResetStrategy
+            ));
+        }
         return duration.get();
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ab4ed4656b3..2d9c1b0b11b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1299,8 +1299,10 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         for (final TopicPartition partition : partitions) {
             final Optional<AutoOffsetResetStrategy> offsetResetStrategy = 
topologyMetadata.offsetResetStrategy(partition.topic());
 
-            // This may be null if the task we are currently processing was 
apart of a named topology that was just removed.
-            // TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata 
view of named topologies in sync until final thread has acked
+            // TODO
+            // This may be null if the task we are currently processing was 
part of a named topology that was just removed.
+            // After named topologies are removed, we can update 
`topologyMetadata.offsetResetStrateg()` so it
+            // will not return null any longer, and we can remove this check
             if (offsetResetStrategy != null) {
                 if (offsetResetStrategy.isPresent()) {
                     final AutoOffsetResetStrategy resetPolicy = 
offsetResetStrategy.get();
@@ -1407,11 +1409,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                         }
                     }
                 } catch (final TimeoutException timeoutException) {
-                    for (final TopicPartition partition : 
seekByDuration.keySet()) {
-                        final Task task = taskManager.getActiveTask(partition);
-                        task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
-                        stateUpdater.add(task);
-                    }
+                    
taskManager.maybeInitTaskTimeoutsOrThrow(seekByDuration.keySet(), 
timeoutException, now);
                     log.debug(
                         String.format(
                             "Could not reset offset for %s due to the 
following exception; will retry.",
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6b90e354c27..037ff941105 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1886,7 +1886,19 @@ public class TaskManager {
         }
     }
 
-    Task getActiveTask(final TopicPartition partition) {
+    void maybeInitTaskTimeoutsOrThrow(
+        final Collection<TopicPartition> partitions,
+        final TimeoutException timeoutException,
+        final long nowMs
+    ) {
+        for (final TopicPartition partition : partitions) {
+            final Task task = getActiveTask(partition);
+            task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException);
+            stateUpdater.add(task);
+        }
+    }
+
+    private Task getActiveTask(final TopicPartition partition) {
         final Task activeTask = tasks.activeTasksForInputPartition(partition);
 
         if (activeTask == null) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java 
b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
index e5679e99d8f..fb4d9738c93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
@@ -21,7 +21,6 @@ import 
org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
-import java.util.NoSuchElementException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -32,19 +31,19 @@ class AutoOffsetResetTest {
     @Test
     void shouldThrowExceptionOnDurationForNoneReset() {
         final AutoOffsetResetInternal none = new 
AutoOffsetResetInternal(AutoOffsetReset.none());
-        assertThrows(NoSuchElementException.class, none::duration, "None 
should not have a duration.");
+        assertThrows(IllegalStateException.class, none::duration, "None should 
not have a duration.");
     }
 
     @Test
     void shouldThrowExceptionOnDurationForEarliestReset() {
         final AutoOffsetResetInternal earliest = new 
AutoOffsetResetInternal(AutoOffsetReset.earliest());
-        assertThrows(NoSuchElementException.class, earliest::duration, 
"Earliest should not have a duration.");
+        assertThrows(IllegalStateException.class, earliest::duration, 
"Earliest should not have a duration.");
     }
 
     @Test
     void shouldThrowExceptionOnDurationForLastetReset() {
         final AutoOffsetResetInternal latest = new 
AutoOffsetResetInternal(AutoOffsetReset.latest());
-        assertThrows(NoSuchElementException.class, latest::duration, "Latest 
should not have a duration.");
+        assertThrows(IllegalStateException.class, latest::duration, "Latest 
should not have a duration.");
     }
 
     @Test

Reply via email to