This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0adc6fa3e1a KAFKA-19271: allow intercepting internal method call
(#19832)
0adc6fa3e1a is described below
commit 0adc6fa3e1ad21ac5b830cfe86948bcc2245e8f7
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Jun 9 07:28:13 2025 -0700
KAFKA-19271: allow intercepting internal method call (#19832)
To allow intercepting the internal subscribe call to the async-consumer,
we need to extend ConsumerWrapper interface accordingly, instead of
returning the wrapped async-consumer back to the KS runtime.
Reviewers: Lucas Brutschy <[email protected]>
---
.../kafka/streams/internals/ConsumerWrapper.java | 9 +++---
.../streams/processor/internals/StreamThread.java | 36 ++++++++++++++--------
.../processor/internals/StreamThreadTest.java | 33 --------------------
3 files changed, 28 insertions(+), 50 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/ConsumerWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/internals/ConsumerWrapper.java
index 20cd7cb84d7..5088cd4b7dd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/ConsumerWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/ConsumerWrapper.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
+import org.apache.kafka.clients.consumer.internals.StreamsRebalanceListener;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
@@ -54,10 +55,6 @@ public abstract class ConsumerWrapper implements
Consumer<byte[], byte[]> {
this.delegate = delegate;
}
- public AsyncKafkaConsumer<byte[], byte[]> consumer() {
- return delegate;
- }
-
@Override
public Set<TopicPartition> assignment() {
return delegate.assignment();
@@ -78,6 +75,10 @@ public abstract class ConsumerWrapper implements
Consumer<byte[], byte[]> {
delegate.subscribe(topics, callback);
}
+ public void subscribe(final Collection<String> topics, final
StreamsRebalanceListener streamsRebalanceListener) {
+ delegate.subscribe(topics, streamsRebalanceListener);
+ }
+
@Override
public void assign(final Collection<TopicPartition> partitions) {
delegate.assign(partitions);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 47775c53652..c0cdff2421b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1138,19 +1138,29 @@ public class StreamThread extends Thread implements
ProcessingThread {
mainConsumer.subscribe(topologyMetadata.sourceTopicPattern(),
rebalanceListener);
} else {
if (streamsRebalanceData.isPresent()) {
- final AsyncKafkaConsumer<byte[], byte[]> consumer =
mainConsumer instanceof ConsumerWrapper
- ? ((ConsumerWrapper) mainConsumer).consumer()
- : (AsyncKafkaConsumer<byte[], byte[]>) mainConsumer;
- consumer.subscribe(
- topologyMetadata.allFullSourceTopicNames(),
- new DefaultStreamsRebalanceListener(
- log,
- time,
- streamsRebalanceData.get(),
- this,
- taskManager
- )
- );
+ if (mainConsumer instanceof ConsumerWrapper) {
+ ((ConsumerWrapper) mainConsumer).subscribe(
+ topologyMetadata.allFullSourceTopicNames(),
+ new DefaultStreamsRebalanceListener(
+ log,
+ time,
+ streamsRebalanceData.get(),
+ this,
+ taskManager
+ )
+ );
+ } else {
+ ((AsyncKafkaConsumer<byte[], byte[]>)
mainConsumer).subscribe(
+ topologyMetadata.allFullSourceTopicNames(),
+ new DefaultStreamsRebalanceListener(
+ log,
+ time,
+ streamsRebalanceData.get(),
+ this,
+ taskManager
+ )
+ );
+ }
} else {
mainConsumer.subscribe(topologyMetadata.allFullSourceTopicNames(),
rebalanceListener);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 44cd11459f7..1b4143eceee 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -69,7 +69,6 @@ import
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.ConsumerWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
@@ -3952,38 +3951,6 @@ public class StreamThreadTest {
);
}
- @ParameterizedTest
- @MethodSource("data")
- public void shouldWrapMainConsumerFromClassConfig(final boolean
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
- final Properties streamsConfigProps = configProps(false,
stateUpdaterEnabled, processingThreadsEnabled);
- streamsConfigProps.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
- streamsConfigProps.put(InternalConfig.INTERNAL_CONSUMER_WRAPPER,
TestWrapper.class);
-
- thread = createStreamThread("clientId", new
StreamsConfig(streamsConfigProps));
-
- assertInstanceOf(
- AsyncKafkaConsumer.class,
- assertInstanceOf(TestWrapper.class,
thread.mainConsumer()).consumer()
- );
- }
-
- @ParameterizedTest
- @MethodSource("data")
- public void shouldWrapMainConsumerFromStringConfig(final boolean
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
- final Properties streamsConfigProps = configProps(false,
stateUpdaterEnabled, processingThreadsEnabled);
- streamsConfigProps.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
- streamsConfigProps.put(InternalConfig.INTERNAL_CONSUMER_WRAPPER,
TestWrapper.class.getName());
-
- thread = createStreamThread("clientId", new
StreamsConfig(streamsConfigProps));
-
- assertInstanceOf(
- AsyncKafkaConsumer.class,
- assertInstanceOf(TestWrapper.class,
thread.mainConsumer()).consumer()
- );
- }
-
- public static final class TestWrapper extends ConsumerWrapper { }
-
private StreamThread setUpThread(final Properties streamsConfigProps) {
final StreamsConfig config = new StreamsConfig(streamsConfigProps);
final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);