This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0adc6fa3e1a KAFKA-19271: allow intercepting internal method call 
(#19832)
0adc6fa3e1a is described below

commit 0adc6fa3e1ad21ac5b830cfe86948bcc2245e8f7
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Jun 9 07:28:13 2025 -0700

    KAFKA-19271: allow intercepting internal method call (#19832)
    
    To allow intercepting the internal subscribe call to the async-consumer,
    we need to extend ConsumerWrapper interface accordingly, instead of
    returning the wrapped async-consumer back to the KS runtime.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../kafka/streams/internals/ConsumerWrapper.java   |  9 +++---
 .../streams/processor/internals/StreamThread.java  | 36 ++++++++++++++--------
 .../processor/internals/StreamThreadTest.java      | 33 --------------------
 3 files changed, 28 insertions(+), 50 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/ConsumerWrapper.java 
b/streams/src/main/java/org/apache/kafka/streams/internals/ConsumerWrapper.java
index 20cd7cb84d7..5088cd4b7dd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/ConsumerWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/ConsumerWrapper.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
 import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
+import org.apache.kafka.clients.consumer.internals.StreamsRebalanceListener;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
@@ -54,10 +55,6 @@ public abstract class ConsumerWrapper implements 
Consumer<byte[], byte[]> {
         this.delegate = delegate;
     }
 
-    public AsyncKafkaConsumer<byte[], byte[]> consumer() {
-        return delegate;
-    }
-
     @Override
     public Set<TopicPartition> assignment() {
         return delegate.assignment();
@@ -78,6 +75,10 @@ public abstract class ConsumerWrapper implements 
Consumer<byte[], byte[]> {
         delegate.subscribe(topics, callback);
     }
 
+    public void subscribe(final Collection<String> topics, final 
StreamsRebalanceListener streamsRebalanceListener) {
+        delegate.subscribe(topics, streamsRebalanceListener);
+    }
+
     @Override
     public void assign(final Collection<TopicPartition> partitions) {
         delegate.assign(partitions);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 47775c53652..c0cdff2421b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1138,19 +1138,29 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             mainConsumer.subscribe(topologyMetadata.sourceTopicPattern(), 
rebalanceListener);
         } else {
             if (streamsRebalanceData.isPresent()) {
-                final AsyncKafkaConsumer<byte[], byte[]> consumer = 
mainConsumer instanceof ConsumerWrapper
-                    ? ((ConsumerWrapper) mainConsumer).consumer()
-                    : (AsyncKafkaConsumer<byte[], byte[]>) mainConsumer;
-                consumer.subscribe(
-                    topologyMetadata.allFullSourceTopicNames(),
-                    new DefaultStreamsRebalanceListener(
-                        log,
-                        time,
-                        streamsRebalanceData.get(),
-                        this,
-                        taskManager
-                    )
-                );
+                if (mainConsumer instanceof ConsumerWrapper) {
+                    ((ConsumerWrapper) mainConsumer).subscribe(
+                        topologyMetadata.allFullSourceTopicNames(),
+                        new DefaultStreamsRebalanceListener(
+                            log,
+                            time,
+                            streamsRebalanceData.get(),
+                            this,
+                            taskManager
+                        )
+                    );
+                } else {
+                    ((AsyncKafkaConsumer<byte[], byte[]>) 
mainConsumer).subscribe(
+                        topologyMetadata.allFullSourceTopicNames(),
+                        new DefaultStreamsRebalanceListener(
+                            log,
+                            time,
+                            streamsRebalanceData.get(),
+                            this,
+                            taskManager
+                        )
+                    );
+                }
             } else {
                 
mainConsumer.subscribe(topologyMetadata.allFullSourceTopicNames(), 
rebalanceListener);
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 44cd11459f7..1b4143eceee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -69,7 +69,6 @@ import 
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.ConsumerWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
@@ -3952,38 +3951,6 @@ public class StreamThreadTest {
         );
     }
 
-    @ParameterizedTest
-    @MethodSource("data")
-    public void shouldWrapMainConsumerFromClassConfig(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
-        final Properties streamsConfigProps = configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled);
-        streamsConfigProps.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
-        streamsConfigProps.put(InternalConfig.INTERNAL_CONSUMER_WRAPPER, 
TestWrapper.class);
-
-        thread = createStreamThread("clientId", new 
StreamsConfig(streamsConfigProps));
-
-        assertInstanceOf(
-            AsyncKafkaConsumer.class,
-            assertInstanceOf(TestWrapper.class, 
thread.mainConsumer()).consumer()
-        );
-    }
-
-    @ParameterizedTest
-    @MethodSource("data")
-    public void shouldWrapMainConsumerFromStringConfig(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
-        final Properties streamsConfigProps = configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled);
-        streamsConfigProps.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
-        streamsConfigProps.put(InternalConfig.INTERNAL_CONSUMER_WRAPPER, 
TestWrapper.class.getName());
-
-        thread = createStreamThread("clientId", new 
StreamsConfig(streamsConfigProps));
-
-        assertInstanceOf(
-            AsyncKafkaConsumer.class,
-            assertInstanceOf(TestWrapper.class, 
thread.mainConsumer()).consumer()
-        );
-    }
-
-    public static final class TestWrapper extends ConsumerWrapper { }
-
     private StreamThread setUpThread(final Properties streamsConfigProps) {
         final StreamsConfig config = new StreamsConfig(streamsConfigProps);
         final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);

Reply via email to