This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 218c510ffaf37090fd783f938063d8ab78d88253 Author: Matthias J. Sax <[email protected]> AuthorDate: Mon Dec 16 15:21:03 2024 -0800 MINOR: improving some internal comments (#18152) Reviewers: Yaroslav Kutsela (@Serwios), Anna Sophie Blee-Goldman <[email protected]> --- .../kafka/streams/internals/AutoOffsetResetInternal.java | 9 ++++++++- .../kafka/streams/processor/internals/StreamThread.java | 12 +++++------- .../kafka/streams/processor/internals/TaskManager.java | 14 +++++++++++++- .../java/org/apache/kafka/streams/AutoOffsetResetTest.java | 7 +++---- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java index dc38dec304c..0fbd267a9da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java @@ -31,8 +31,15 @@ public class AutoOffsetResetInternal extends AutoOffsetReset { return offsetResetStrategy; } - @SuppressWarnings("all") public Duration duration() { + if (duration.isEmpty()) { + throw new IllegalStateException(String.format( + "Duration is only available for reset strategy '%s', but reset strategy is '%s'. " + + "Please check the reset strategy before calling duration() via offsetResetStrategy().", + StrategyType.BY_DURATION, + offsetResetStrategy + )); + } return duration.get(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index ab4ed4656b3..2d9c1b0b11b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1299,8 +1299,10 @@ public class StreamThread extends Thread implements ProcessingThread { for (final TopicPartition partition : partitions) { final Optional<AutoOffsetResetStrategy> offsetResetStrategy = topologyMetadata.offsetResetStrategy(partition.topic()); - // This may be null if the task we are currently processing was apart of a named topology that was just removed. - // TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata view of named topologies in sync until final thread has acked + // TODO + // This may be null if the task we are currently processing was part of a named topology that was just removed. + // After named topologies are removed, we can update `topologyMetadata.offsetResetStrateg()` so it + // will not return null any longer, and we can remove this check if (offsetResetStrategy != null) { if (offsetResetStrategy.isPresent()) { final AutoOffsetResetStrategy resetPolicy = offsetResetStrategy.get(); @@ -1407,11 +1409,7 @@ public class StreamThread extends Thread implements ProcessingThread { } } } catch (final TimeoutException timeoutException) { - for (final TopicPartition partition : seekByDuration.keySet()) { - final Task task = taskManager.getActiveTask(partition); - task.maybeInitTaskTimeoutOrThrow(now, timeoutException); - stateUpdater.add(task); - } + taskManager.maybeInitTaskTimeoutsOrThrow(seekByDuration.keySet(), timeoutException, now); log.debug( String.format( "Could not reset offset for %s due to the following exception; will retry.", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 6b90e354c27..037ff941105 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1886,7 +1886,19 @@ public class TaskManager { } } - Task getActiveTask(final TopicPartition partition) { + void maybeInitTaskTimeoutsOrThrow( + final Collection<TopicPartition> partitions, + final TimeoutException timeoutException, + final long nowMs + ) { + for (final TopicPartition partition : partitions) { + final Task task = getActiveTask(partition); + task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException); + stateUpdater.add(task); + } + } + + private Task getActiveTask(final TopicPartition partition) { final Task activeTask = tasks.activeTasksForInputPartition(partition); if (activeTask == null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java index e5679e99d8f..fb4d9738c93 100644 --- a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.streams.internals.AutoOffsetResetInternal; import org.junit.jupiter.api.Test; import java.time.Duration; -import java.util.NoSuchElementException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -32,19 +31,19 @@ class AutoOffsetResetTest { @Test void shouldThrowExceptionOnDurationForNoneReset() { final AutoOffsetResetInternal none = new AutoOffsetResetInternal(AutoOffsetReset.none()); - assertThrows(NoSuchElementException.class, none::duration, "None should not have a duration."); + assertThrows(IllegalStateException.class, none::duration, "None should not have a duration."); } @Test void shouldThrowExceptionOnDurationForEarliestReset() { final AutoOffsetResetInternal earliest = new AutoOffsetResetInternal(AutoOffsetReset.earliest()); - assertThrows(NoSuchElementException.class, earliest::duration, "Earliest should not have a duration."); + assertThrows(IllegalStateException.class, earliest::duration, "Earliest should not have a duration."); } @Test void shouldThrowExceptionOnDurationForLastetReset() { final AutoOffsetResetInternal latest = new AutoOffsetResetInternal(AutoOffsetReset.latest()); - assertThrows(NoSuchElementException.class, latest::duration, "Latest should not have a duration."); + assertThrows(IllegalStateException.class, latest::duration, "Latest should not have a duration."); } @Test
