This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 73627b6dd40 KAFKA-19775: Don't fail if
nextOffsetsAndMetadataToBeConsumed is not available. (#20665)
73627b6dd40 is described below
commit 73627b6dd40c69d744281ee3149461c5a0f2febf
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Oct 15 21:26:38 2025 -0700
KAFKA-19775: Don't fail if nextOffsetsAndMetadataToBeConsumed is not
available. (#20665)
Before we added caching for consumer next offsets we'd called
`mainConsumer.position` and always expected something back. When we
added the caching, we kept the check that we always have nextOffset, but
as the logic changed to fetching the offsets from poll, we may not have
anything for topics that have no messages. This PR accounts for that.
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../integration/RegexSourceIntegrationTest.java | 60 ++++++++++++++++++++++
.../streams/processor/internals/StreamTask.java | 41 ++++++---------
2 files changed, 76 insertions(+), 25 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index ceb8b653eb1..0058f94c090 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -87,6 +87,8 @@ public class RegexSourceIntegrationTest {
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
+ private static final List<TopicPartition> ASSIGNED_PARTITIONS = new
ArrayList<>();
+
@BeforeAll
public static void startCluster() throws IOException, InterruptedException
{
CLUSTER.start();
@@ -254,6 +256,64 @@ public class RegexSourceIntegrationTest {
}
}
+ @Test
+ public void shouldNotCrashIfPatternMatchesTopicHasNoData() throws
Exception {
+ final String topic1 = "TEST-TOPIC-1";
+ final String topic2 = "TEST-TOPIC-2";
+
+ try {
+ CLUSTER.createTopic(topic1);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+ builder.stream(Pattern.compile("not-a-match"));
+ final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+
+ pattern1Stream
+ .selectKey((k, v) -> k)
+ .groupByKey()
+ .aggregate(() -> "", (k, v, a) -> v)
+ .toStream().to(outputTopic, Produced.with(Serdes.String(),
Serdes.String()));
+
+ final Topology topology = builder.build();
+ assertThat(topology.describe().subtopologies().size(),
greaterThan(1));
+ streams = new KafkaStreams(builder.build(), streamsConfiguration,
new DefaultKafkaClientSupplier() {
+ @Override
+ public Consumer<byte[], byte[]> getConsumer(final Map<String,
Object> config) {
+ return new KafkaConsumer<>(config, new
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+ @Override
+ public void subscribe(final Pattern topics, final
ConsumerRebalanceListener listener) {
+ super.subscribe(topics, new
TheConsumerRebalanceListener(assignedTopics, listener));
+ }
+ };
+ }
+ });
+
+ startApplicationAndWaitUntilRunning(streams);
+ TestUtils.waitForCondition(() -> assignedTopics.contains(topic1)
&& !assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);
+
+ CLUSTER.createTopic(topic2);
+ TestUtils.waitForCondition(() -> assignedTopics.contains(topic1)
&& assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);
+
+ final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ topic1,
+ Collections.singletonList(record1),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ CLUSTER.time
+ );
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
StringDeserializer.class, StringDeserializer.class),
+ outputTopic,
+ List.of(record1)
+ );
+
+ streams.close();
+ } finally {
+ CLUSTER.deleteTopics(topic1, topic2);
+ }
+ }
+
private String createTopic(final int suffix) throws InterruptedException {
final String outputTopic = "outputTopic_" + suffix;
CLUSTER.createTopic(outputTopic);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3ea6a374e84..855a85e2c75 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -461,38 +461,29 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
}
}
- private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition
partition) {
+ private Optional<OffsetAndMetadata> findOffsetAndMetadata(final
TopicPartition partition) {
Long offset = partitionGroup.headRecordOffset(partition);
Optional<Integer> leaderEpoch =
partitionGroup.headRecordLeaderEpoch(partition);
final long partitionTime =
partitionGroup.partitionTimestamp(partition);
if (offset == null) {
- try {
- if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition))
{
- final OffsetAndMetadata offsetAndMetadata =
nextOffsetsAndMetadataToBeConsumed.get(partition);
- offset = offsetAndMetadata.offset();
- leaderEpoch = offsetAndMetadata.leaderEpoch();
- } else {
- // This indicates a bug and thus we rethrow it as fatal
`IllegalStateException`
- throw new IllegalStateException("Stream task " + id + "
does not know the partition: " + partition);
- }
- } catch (final KafkaException fatal) {
- throw new StreamsException(fatal);
+ final OffsetAndMetadata offsetAndMetadata =
nextOffsetsAndMetadataToBeConsumed.get(partition);
+ if (offsetAndMetadata == null) {
+ // it may be that we have not yet consumed any record from
this partition, hence nothing to commit
+ return Optional.empty();
}
+ offset = offsetAndMetadata.offset();
+ leaderEpoch = offsetAndMetadata.leaderEpoch();
}
- return new OffsetAndMetadata(offset,
+ return Optional.of(new OffsetAndMetadata(offset,
leaderEpoch,
- new TopicPartitionMetadata(partitionTime,
processorContext.processorMetadata()).encode());
+ new TopicPartitionMetadata(partitionTime,
processorContext.processorMetadata()).encode()));
}
private Map<TopicPartition, OffsetAndMetadata>
committableOffsetsAndMetadata() {
- final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
-
switch (state()) {
case CREATED:
case RESTORING:
- committableOffsets = Collections.emptyMap();
-
- break;
+ return Collections.emptyMap();
case RUNNING:
case SUSPENDED:
@@ -501,12 +492,13 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// input partitions
final Set<TopicPartition> partitionsNeedCommit =
processorContext.processorMetadata().needsCommit() ?
inputPartitions() : consumedOffsets.keySet();
- committableOffsets = new
HashMap<>(partitionsNeedCommit.size());
- for (final TopicPartition partition : partitionsNeedCommit) {
- committableOffsets.put(partition,
findOffsetAndMetadata(partition));
- }
- break;
+ return partitionsNeedCommit.stream()
+ .map(partition -> findOffsetAndMetadata(partition)
+ .map(offsetAndMetadata -> Map.entry(partition,
offsetAndMetadata)))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
case CLOSED:
throw new IllegalStateException("Illegal state " + state() + "
while getting committable offsets for active task " + id);
@@ -515,7 +507,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
throw new IllegalStateException("Unknown state " + state() + "
while post committing active task " + id);
}
- return committableOffsets;
}
@Override