This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 44481caa6bb KAFKA-19775: Don't fail if 
nextOffsetsAndMetadataToBeConsumed is not available. (#20665)
44481caa6bb is described below

commit 44481caa6bbb297779cd210e68a689d876f15408
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Oct 15 21:26:38 2025 -0700

    KAFKA-19775: Don't fail if nextOffsetsAndMetadataToBeConsumed is not 
available. (#20665)
    
    Before we added caching for consumer next offsets we'd called
    `mainConsumer.position` and always expected something back. When we
    added the caching, we kept the check that we always have nextOffset, but
    as the logic changed to fetching the offsets from poll, we may not have
    anything for topics that have no messages. This PR accounts for that.
    
    Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../integration/RegexSourceIntegrationTest.java    | 60 ++++++++++++++++++++++
 .../streams/processor/internals/StreamTask.java    | 41 ++++++---------
 2 files changed, 76 insertions(+), 25 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index e65399ee690..880818703ba 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -87,6 +87,8 @@ public class RegexSourceIntegrationTest {
     private static final int NUM_BROKERS = 1;
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
+    private static final List<TopicPartition> ASSIGNED_PARTITIONS = new 
ArrayList<>();
+
     @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
         CLUSTER.start();
@@ -254,6 +256,64 @@ public class RegexSourceIntegrationTest {
         }
     }
 
+    @Test
+    public void shouldNotCrashIfPatternMatchesTopicHasNoData() throws 
Exception {
+        final String topic1 = "TEST-TOPIC-1";
+        final String topic2 = "TEST-TOPIC-2";
+
+        try {
+            CLUSTER.createTopic(topic1);
+
+            final StreamsBuilder builder = new StreamsBuilder();
+            final KStream<String, String> pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+            builder.stream(Pattern.compile("not-a-match"));
+            final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), 
Serdes.String()));
+
+            final Topology topology = builder.build();
+            assertThat(topology.describe().subtopologies().size(), 
greaterThan(1));
+            streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+                @Override
+                public Consumer<byte[], byte[]> getConsumer(final Map<String, 
Object> config) {
+                    return new KafkaConsumer<>(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+                        @Override
+                        public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+                            super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+                        }
+                    };
+                }
+            });
+
+            startApplicationAndWaitUntilRunning(streams);
+            TestUtils.waitForCondition(() -> assignedTopics.contains(topic1) 
&& !assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);
+
+            CLUSTER.createTopic(topic2);
+            TestUtils.waitForCondition(() -> assignedTopics.contains(topic1) 
&& assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);
+
+            final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
+            IntegrationTestUtils.produceKeyValuesSynchronously(
+                topic1,
+                Collections.singletonList(record1),
+                TestUtils.producerConfig(CLUSTER.bootstrapServers(), 
StringSerializer.class, StringSerializer.class),
+                CLUSTER.time
+            );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(CLUSTER.bootstrapServers(), 
StringDeserializer.class, StringDeserializer.class),
+                outputTopic,
+                List.of(record1)
+            );
+
+            streams.close();
+        } finally {
+            CLUSTER.deleteTopics(topic1, topic2);
+        }
+    }
+
     private String createTopic(final int suffix) throws InterruptedException {
         final String outputTopic = "outputTopic_" + suffix;
         CLUSTER.createTopic(outputTopic);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 42b57e46aa4..895eac6c321 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -465,38 +465,29 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         }
     }
 
-    private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition 
partition) {
+    private Optional<OffsetAndMetadata> findOffsetAndMetadata(final 
TopicPartition partition) {
         Long offset = partitionGroup.headRecordOffset(partition);
         Optional<Integer> leaderEpoch = 
partitionGroup.headRecordLeaderEpoch(partition);
         final long partitionTime = 
partitionGroup.partitionTimestamp(partition);
         if (offset == null) {
-            try {
-                if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) 
{
-                    final OffsetAndMetadata offsetAndMetadata = 
nextOffsetsAndMetadataToBeConsumed.get(partition);
-                    offset = offsetAndMetadata.offset();
-                    leaderEpoch = offsetAndMetadata.leaderEpoch();
-                } else {
-                    // This indicates a bug and thus we rethrow it as fatal 
`IllegalStateException`
-                    throw new IllegalStateException("Stream task " + id + " 
does not know the partition: " + partition);
-                }
-            } catch (final KafkaException fatal) {
-                throw new StreamsException(fatal);
+            final OffsetAndMetadata offsetAndMetadata = 
nextOffsetsAndMetadataToBeConsumed.get(partition);
+            if (offsetAndMetadata == null) {
+                // it may be that we have not yet consumed any record from 
this partition, hence nothing to commit
+                return Optional.empty();
             }
+            offset = offsetAndMetadata.offset();
+            leaderEpoch = offsetAndMetadata.leaderEpoch();
         }
-        return new OffsetAndMetadata(offset,
+        return Optional.of(new OffsetAndMetadata(offset,
                 leaderEpoch,
-                new TopicPartitionMetadata(partitionTime, 
processorContext.processorMetadata()).encode());
+                new TopicPartitionMetadata(partitionTime, 
processorContext.processorMetadata()).encode()));
     }
 
     private Map<TopicPartition, OffsetAndMetadata> 
committableOffsetsAndMetadata() {
-        final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
-
         switch (state()) {
             case CREATED:
             case RESTORING:
-                committableOffsets = Collections.emptyMap();
-
-                break;
+                return Collections.emptyMap();
 
             case RUNNING:
             case SUSPENDED:
@@ -505,12 +496,13 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 // input partitions
                 final Set<TopicPartition> partitionsNeedCommit = 
processorContext.processorMetadata().needsCommit() ?
                     inputPartitions() : consumedOffsets.keySet();
-                committableOffsets = new 
HashMap<>(partitionsNeedCommit.size());
 
-                for (final TopicPartition partition : partitionsNeedCommit) {
-                    committableOffsets.put(partition, 
findOffsetAndMetadata(partition));
-                }
-                break;
+                return partitionsNeedCommit.stream()
+                        .map(partition -> findOffsetAndMetadata(partition)
+                                .map(offsetAndMetadata -> Map.entry(partition, 
offsetAndMetadata)))
+                        .filter(Optional::isPresent)
+                        .map(Optional::get)
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
 
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " 
while getting committable offsets for active task " + id);
@@ -519,7 +511,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 throw new IllegalStateException("Unknown state " + state() + " 
while post committing active task " + id);
         }
 
-        return committableOffsets;
     }
 
     @Override

Reply via email to