This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cd6e46f [FLINK-38869] InitializationContext for KafkaSubscriber (#208)
5cd6e46f is described below

commit 5cd6e46f28f65846a9271e2c658e0d4450117b20
Author: Efrat Levitan <[email protected]>
AuthorDate: Thu Jan 22 18:21:09 2026 +0200

    [FLINK-38869] InitializationContext for KafkaSubscriber (#208)
---
 .../source/enumerator/KafkaSourceEnumerator.java   |  7 +++++++
 .../enumerator/subscriber/KafkaSubscriber.java     | 24 ++++++++++++++++++++++
 .../enumerator/subscriber/KafkaSubscriberTest.java | 10 +++++++++
 3 files changed, 41 insertions(+)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index c811f176..61c64e12 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -200,6 +200,8 @@ public class KafkaSourceEnumerator
             addPartitionSplitChangeToPendingAssignments(preinitializedSplits);
         }
 
+        subscriber.open(new KafkaSubscriberInitContext());
+
         if (partitionDiscoveryIntervalMs > 0) {
             LOG.info(
                     "Starting the KafkaSourceEnumerator for consumer group {} "
@@ -259,6 +261,7 @@ public class KafkaSourceEnumerator
         if (adminClient != null) {
             adminClient.close();
         }
+        subscriber.close();
     }
 
     // ----------------- private methods -------------------
@@ -562,6 +565,10 @@ public class KafkaSourceEnumerator
 
     // --------------- private class ---------------
 
+    static class KafkaSubscriberInitContext implements 
KafkaSubscriber.InitializationContext {
+        private KafkaSubscriberInitContext() {}
+    }
+
     /** A container class to hold the newly added partitions and removed 
partitions. */
     @VisibleForTesting
     static class PartitionChange {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
index 37de884a..62959e69 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
@@ -47,6 +47,17 @@ import java.util.regex.Pattern;
 @PublicEvolving
 public interface KafkaSubscriber extends Serializable {
 
+    /**
+     * Opens the subscriber. This lifecycle method will be called before {@link
+     * #getSubscribedTopicPartitions(AdminClient)} calls are made.
+     *
+     * <p>Implementations may override this method to initialize any 
additional resources (beyond
+     * the Kafka {@link AdminClient}) required for discovering topic 
partitions.
+     *
+     * @param initializationContext initialization context for the subscriber.
+     */
+    default void open(InitializationContext initializationContext) {}
+
     /**
      * Get a set of subscribed {@link TopicPartition}s.
      *
@@ -55,6 +66,19 @@ public interface KafkaSubscriber extends Serializable {
      */
     Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient);
 
+    /**
+     * Closes the subscriber. This lifecycle method will be called after this 
{@link
+     * KafkaSubscriber} will no longer be used.
+     *
+     * <p>Any resources created in the {@link #open(InitializationContext)} 
method should be cleaned
+     * up here.
+     */
+    default void close() {}
+
+    /** Initialization context for the {@link KafkaSubscriber}. */
+    @PublicEvolving
+    interface InitializationContext {}
+
     // ----------------- factory methods --------------
 
     static KafkaSubscriber getTopicListSubscriber(List<String> topics) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index 0fd61f86..b04d587a 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -68,6 +68,7 @@ public class KafkaSubscriberTest {
         List<String> topics = Arrays.asList(TOPIC1, TOPIC2);
         KafkaSubscriber subscriber =
                 KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, 
TOPIC2));
+        subscriber.open(new TestSubscriberInitContext());
         final Set<TopicPartition> subscribedPartitions =
                 subscriber.getSubscribedTopicPartitions(adminClient);
 
@@ -84,6 +85,7 @@ public class KafkaSubscriberTest {
         final KafkaSubscriber subscriber =
                 KafkaSubscriber.getTopicListSubscriber(
                         Collections.singletonList(NON_EXISTING_TOPIC.topic()));
+        subscriber.open(new TestSubscriberInitContext());
 
         assertThatThrownBy(() -> 
subscriber.getSubscribedTopicPartitions(adminClient))
                 .isInstanceOf(RuntimeException.class)
@@ -96,6 +98,7 @@ public class KafkaSubscriberTest {
         KafkaSubscriber subscriber = 
KafkaSubscriber.getTopicPatternSubscriber(pattern);
         final Set<TopicPartition> subscribedPartitions =
                 subscriber.getSubscribedTopicPartitions(adminClient);
+        subscriber.open(new TestSubscriberInitContext());
 
         final Set<TopicPartition> expectedSubscribedPartitions =
                 new HashSet<>(
@@ -114,6 +117,7 @@ public class KafkaSubscriberTest {
         partitions.remove(new TopicPartition(TOPIC1, 1));
 
         KafkaSubscriber subscriber = 
KafkaSubscriber.getPartitionSetSubscriber(partitions);
+        subscriber.open(new TestSubscriberInitContext());
 
         final Set<TopicPartition> subscribedPartitions =
                 subscriber.getSubscribedTopicPartitions(adminClient);
@@ -129,6 +133,7 @@ public class KafkaSubscriberTest {
         final KafkaSubscriber subscriber =
                 KafkaSubscriber.getPartitionSetSubscriber(
                         Collections.singleton(nonExistingPartition));
+        subscriber.open(new TestSubscriberInitContext());
 
         assertThatThrownBy(() -> 
subscriber.getSubscribedTopicPartitions(adminClient))
                 .isInstanceOf(RuntimeException.class)
@@ -137,4 +142,9 @@ public class KafkaSubscriberTest {
                                 "Partition '%s' does not exist on Kafka 
brokers",
                                 nonExistingPartition));
     }
+
+    private static class TestSubscriberInitContext implements 
KafkaSubscriber.InitializationContext {
+        private TestSubscriberInitContext() {}
+    }
+
 }

Reply via email to