This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 73627b6dd40 KAFKA-19775: Don't fail if 
nextOffsetsAndMetadataToBeConsumed is not available. (#20665)
73627b6dd40 is described below

commit 73627b6dd40c69d744281ee3149461c5a0f2febf
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Oct 15 21:26:38 2025 -0700

    KAFKA-19775: Don't fail if nextOffsetsAndMetadataToBeConsumed is not 
available. (#20665)
    
    Before we added caching for consumer next offsets we'd called
    `mainConsumer.position` and always expected something back. When we
    added the caching, we kept the check that we always have nextOffset, but
    as the logic changed to fetching the offsets from poll, we may not have
    anything for topics that have no messages. This PR accounts for that.
    
    Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../integration/RegexSourceIntegrationTest.java    | 60 ++++++++++++++++++++++
 .../streams/processor/internals/StreamTask.java    | 41 ++++++---------
 2 files changed, 76 insertions(+), 25 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index ceb8b653eb1..0058f94c090 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -87,6 +87,8 @@ public class RegexSourceIntegrationTest {
     private static final int NUM_BROKERS = 1;
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
+    private static final List<TopicPartition> ASSIGNED_PARTITIONS = new 
ArrayList<>();
+
     @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
         CLUSTER.start();
@@ -254,6 +256,64 @@ public class RegexSourceIntegrationTest {
         }
     }
 
+    @Test
+    public void shouldNotCrashIfPatternMatchesTopicHasNoData() throws 
Exception {
+        final String topic1 = "TEST-TOPIC-1";
+        final String topic2 = "TEST-TOPIC-2";
+
+        try {
+            CLUSTER.createTopic(topic1);
+
+            final StreamsBuilder builder = new StreamsBuilder();
+            final KStream<String, String> pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+            builder.stream(Pattern.compile("not-a-match"));
+            final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), 
Serdes.String()));
+
+            final Topology topology = builder.build();
+            assertThat(topology.describe().subtopologies().size(), 
greaterThan(1));
+            streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+                @Override
+                public Consumer<byte[], byte[]> getConsumer(final Map<String, 
Object> config) {
+                    return new KafkaConsumer<>(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+                        @Override
+                        public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+                            super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+                        }
+                    };
+                }
+            });
+
+            startApplicationAndWaitUntilRunning(streams);
+            TestUtils.waitForCondition(() -> assignedTopics.contains(topic1) 
&& !assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);
+
+            CLUSTER.createTopic(topic2);
+            TestUtils.waitForCondition(() -> assignedTopics.contains(topic1) 
&& assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);
+
+            final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
+            IntegrationTestUtils.produceKeyValuesSynchronously(
+                topic1,
+                Collections.singletonList(record1),
+                TestUtils.producerConfig(CLUSTER.bootstrapServers(), 
StringSerializer.class, StringSerializer.class),
+                CLUSTER.time
+            );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(CLUSTER.bootstrapServers(), 
StringDeserializer.class, StringDeserializer.class),
+                outputTopic,
+                List.of(record1)
+            );
+
+            streams.close();
+        } finally {
+            CLUSTER.deleteTopics(topic1, topic2);
+        }
+    }
+
     private String createTopic(final int suffix) throws InterruptedException {
         final String outputTopic = "outputTopic_" + suffix;
         CLUSTER.createTopic(outputTopic);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3ea6a374e84..855a85e2c75 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -461,38 +461,29 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         }
     }
 
-    private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition 
partition) {
+    private Optional<OffsetAndMetadata> findOffsetAndMetadata(final 
TopicPartition partition) {
         Long offset = partitionGroup.headRecordOffset(partition);
         Optional<Integer> leaderEpoch = 
partitionGroup.headRecordLeaderEpoch(partition);
         final long partitionTime = 
partitionGroup.partitionTimestamp(partition);
         if (offset == null) {
-            try {
-                if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) 
{
-                    final OffsetAndMetadata offsetAndMetadata = 
nextOffsetsAndMetadataToBeConsumed.get(partition);
-                    offset = offsetAndMetadata.offset();
-                    leaderEpoch = offsetAndMetadata.leaderEpoch();
-                } else {
-                    // This indicates a bug and thus we rethrow it as fatal 
`IllegalStateException`
-                    throw new IllegalStateException("Stream task " + id + " 
does not know the partition: " + partition);
-                }
-            } catch (final KafkaException fatal) {
-                throw new StreamsException(fatal);
+            final OffsetAndMetadata offsetAndMetadata = 
nextOffsetsAndMetadataToBeConsumed.get(partition);
+            if (offsetAndMetadata == null) {
+                // it may be that we have not yet consumed any record from 
this partition, hence nothing to commit
+                return Optional.empty();
             }
+            offset = offsetAndMetadata.offset();
+            leaderEpoch = offsetAndMetadata.leaderEpoch();
         }
-        return new OffsetAndMetadata(offset,
+        return Optional.of(new OffsetAndMetadata(offset,
                 leaderEpoch,
-                new TopicPartitionMetadata(partitionTime, 
processorContext.processorMetadata()).encode());
+                new TopicPartitionMetadata(partitionTime, 
processorContext.processorMetadata()).encode()));
     }
 
     private Map<TopicPartition, OffsetAndMetadata> 
committableOffsetsAndMetadata() {
-        final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
-
         switch (state()) {
             case CREATED:
             case RESTORING:
-                committableOffsets = Collections.emptyMap();
-
-                break;
+                return Collections.emptyMap();
 
             case RUNNING:
             case SUSPENDED:
@@ -501,12 +492,13 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 // input partitions
                 final Set<TopicPartition> partitionsNeedCommit = 
processorContext.processorMetadata().needsCommit() ?
                     inputPartitions() : consumedOffsets.keySet();
-                committableOffsets = new 
HashMap<>(partitionsNeedCommit.size());
 
-                for (final TopicPartition partition : partitionsNeedCommit) {
-                    committableOffsets.put(partition, 
findOffsetAndMetadata(partition));
-                }
-                break;
+                return partitionsNeedCommit.stream()
+                        .map(partition -> findOffsetAndMetadata(partition)
+                                .map(offsetAndMetadata -> Map.entry(partition, 
offsetAndMetadata)))
+                        .filter(Optional::isPresent)
+                        .map(Optional::get)
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
 
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " 
while getting committable offsets for active task " + id);
@@ -515,7 +507,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 throw new IllegalStateException("Unknown state " + state() + " 
while post committing active task " + id);
         }
 
-        return committableOffsets;
     }
 
     @Override

Reply via email to