This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new a42f6a42 [hotfix] Revert [FLINK-38869] InitializationContext for 
KafkaSubscriber
a42f6a42 is described below

commit a42f6a42928bba801bc75897470562debe4c50c4
Author: Efrat Levitan <[email protected]>
AuthorDate: Mon Mar 2 03:58:55 2026 +0200

    [hotfix] Revert [FLINK-38869] InitializationContext for KafkaSubscriber
    
    This closes #221.
---
 .../source/enumerator/KafkaSourceEnumerator.java   |  7 -------
 .../enumerator/subscriber/KafkaSubscriber.java     | 24 ----------------------
 .../enumerator/subscriber/KafkaSubscriberTest.java | 10 ---------
 3 files changed, 41 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 34523a5b..da73fd86 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -241,8 +241,6 @@ public class KafkaSourceEnumerator
             addPartitionSplitChangeToPendingAssignments(preinitializedSplits);
         }
 
-        subscriber.open(new KafkaSubscriberInitContext());
-
         if (partitionDiscoveryIntervalMs > 0) {
             LOG.info(
                     "Starting the KafkaSourceEnumerator for consumer group {} "
@@ -302,7 +300,6 @@ public class KafkaSourceEnumerator
         if (adminClient != null) {
             adminClient.close();
         }
-        subscriber.close();
     }
 
     // ----------------- private methods -------------------
@@ -617,10 +614,6 @@ public class KafkaSourceEnumerator
 
     // --------------- private class ---------------
 
-    static class KafkaSubscriberInitContext implements 
KafkaSubscriber.InitializationContext {
-        private KafkaSubscriberInitContext() {}
-    }
-
     /** A container class to hold the newly added partitions and removed 
partitions. */
     @VisibleForTesting
     static class PartitionChange {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
index 62959e69..37de884a 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
@@ -47,17 +47,6 @@ import java.util.regex.Pattern;
 @PublicEvolving
 public interface KafkaSubscriber extends Serializable {
 
-    /**
-     * Opens the subscriber. This lifecycle method will be called before {@link
-     * #getSubscribedTopicPartitions(AdminClient)} calls are made.
-     *
-     * <p>Implementations may override this method to initialize any 
additional resources (beyond
-     * the Kafka {@link AdminClient}) required for discovering topic 
partitions.
-     *
-     * @param initializationContext initialization context for the subscriber.
-     */
-    default void open(InitializationContext initializationContext) {}
-
     /**
      * Get a set of subscribed {@link TopicPartition}s.
      *
@@ -66,19 +55,6 @@ public interface KafkaSubscriber extends Serializable {
      */
     Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient);
 
-    /**
-     * Closes the subscriber. This lifecycle method will be called after this 
{@link
-     * KafkaSubscriber} will no longer be used.
-     *
-     * <p>Any resources created in the {@link #open(InitializationContext)} 
method should be cleaned
-     * up here.
-     */
-    default void close() {}
-
-    /** Initialization context for the {@link KafkaSubscriber}. */
-    @PublicEvolving
-    interface InitializationContext {}
-
     // ----------------- factory methods --------------
 
     static KafkaSubscriber getTopicListSubscriber(List<String> topics) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index 2e89dbcb..79f35889 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -68,7 +68,6 @@ class KafkaSubscriberTest {
         List<String> topics = Arrays.asList(TOPIC1, TOPIC2);
         KafkaSubscriber subscriber =
                 KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, 
TOPIC2));
-        subscriber.open(new TestSubscriberInitContext());
         final Set<TopicPartition> subscribedPartitions =
                 subscriber.getSubscribedTopicPartitions(adminClient);
 
@@ -85,7 +84,6 @@ class KafkaSubscriberTest {
         final KafkaSubscriber subscriber =
                 KafkaSubscriber.getTopicListSubscriber(
                         Collections.singletonList(NON_EXISTING_TOPIC.topic()));
-        subscriber.open(new TestSubscriberInitContext());
 
         assertThatThrownBy(() -> 
subscriber.getSubscribedTopicPartitions(adminClient))
                 .isInstanceOf(RuntimeException.class)
@@ -98,7 +96,6 @@ class KafkaSubscriberTest {
         KafkaSubscriber subscriber = 
KafkaSubscriber.getTopicPatternSubscriber(pattern);
         final Set<TopicPartition> subscribedPartitions =
                 subscriber.getSubscribedTopicPartitions(adminClient);
-        subscriber.open(new TestSubscriberInitContext());
 
         final Set<TopicPartition> expectedSubscribedPartitions =
                 new HashSet<>(
@@ -117,7 +114,6 @@ class KafkaSubscriberTest {
         partitions.remove(new TopicPartition(TOPIC1, 1));
 
         KafkaSubscriber subscriber = 
KafkaSubscriber.getPartitionSetSubscriber(partitions);
-        subscriber.open(new TestSubscriberInitContext());
 
         final Set<TopicPartition> subscribedPartitions =
                 subscriber.getSubscribedTopicPartitions(adminClient);
@@ -133,7 +129,6 @@ class KafkaSubscriberTest {
         final KafkaSubscriber subscriber =
                 KafkaSubscriber.getPartitionSetSubscriber(
                         Collections.singleton(nonExistingPartition));
-        subscriber.open(new TestSubscriberInitContext());
 
         assertThatThrownBy(() -> 
subscriber.getSubscribedTopicPartitions(adminClient))
                 .isInstanceOf(RuntimeException.class)
@@ -142,9 +137,4 @@ class KafkaSubscriberTest {
                                 "Partition '%s' does not exist on Kafka 
brokers",
                                 nonExistingPartition));
     }
-
-    private static class TestSubscriberInitContext
-            implements KafkaSubscriber.InitializationContext {
-        private TestSubscriberInitContext() {}
-    }
 }

Reply via email to