This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new a42f6a42 [hotfix] Revert [FLINK-38869] InitializationContext for
KafkaSubscriber
a42f6a42 is described below
commit a42f6a42928bba801bc75897470562debe4c50c4
Author: Efrat Levitan <[email protected]>
AuthorDate: Mon Mar 2 03:58:55 2026 +0200
[hotfix] Revert [FLINK-38869] InitializationContext for KafkaSubscriber
This closes #221.
---
.../source/enumerator/KafkaSourceEnumerator.java | 7 -------
.../enumerator/subscriber/KafkaSubscriber.java | 24 ----------------------
.../enumerator/subscriber/KafkaSubscriberTest.java | 10 ---------
3 files changed, 41 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 34523a5b..da73fd86 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -241,8 +241,6 @@ public class KafkaSourceEnumerator
addPartitionSplitChangeToPendingAssignments(preinitializedSplits);
}
- subscriber.open(new KafkaSubscriberInitContext());
-
if (partitionDiscoveryIntervalMs > 0) {
LOG.info(
"Starting the KafkaSourceEnumerator for consumer group {} "
@@ -302,7 +300,6 @@ public class KafkaSourceEnumerator
if (adminClient != null) {
adminClient.close();
}
- subscriber.close();
}
// ----------------- private methods -------------------
@@ -617,10 +614,6 @@ public class KafkaSourceEnumerator
// --------------- private class ---------------
- static class KafkaSubscriberInitContext implements
KafkaSubscriber.InitializationContext {
- private KafkaSubscriberInitContext() {}
- }
-
/** A container class to hold the newly added partitions and removed
partitions. */
@VisibleForTesting
static class PartitionChange {
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
index 62959e69..37de884a 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
@@ -47,17 +47,6 @@ import java.util.regex.Pattern;
@PublicEvolving
public interface KafkaSubscriber extends Serializable {
- /**
- * Opens the subscriber. This lifecycle method will be called before {@link
- * #getSubscribedTopicPartitions(AdminClient)} calls are made.
- *
- * <p>Implementations may override this method to initialize any
additional resources (beyond
- * the Kafka {@link AdminClient}) required for discovering topic
partitions.
- *
- * @param initializationContext initialization context for the subscriber.
- */
- default void open(InitializationContext initializationContext) {}
-
/**
* Get a set of subscribed {@link TopicPartition}s.
*
@@ -66,19 +55,6 @@ public interface KafkaSubscriber extends Serializable {
*/
Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient);
- /**
- * Closes the subscriber. This lifecycle method will be called after this
{@link
- * KafkaSubscriber} will no longer be used.
- *
- * <p>Any resources created in the {@link #open(InitializationContext)}
method should be cleaned
- * up here.
- */
- default void close() {}
-
- /** Initialization context for the {@link KafkaSubscriber}. */
- @PublicEvolving
- interface InitializationContext {}
-
// ----------------- factory methods --------------
static KafkaSubscriber getTopicListSubscriber(List<String> topics) {
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index 2e89dbcb..79f35889 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -68,7 +68,6 @@ class KafkaSubscriberTest {
List<String> topics = Arrays.asList(TOPIC1, TOPIC2);
KafkaSubscriber subscriber =
KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1,
TOPIC2));
- subscriber.open(new TestSubscriberInitContext());
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
@@ -85,7 +84,6 @@ class KafkaSubscriberTest {
final KafkaSubscriber subscriber =
KafkaSubscriber.getTopicListSubscriber(
Collections.singletonList(NON_EXISTING_TOPIC.topic()));
- subscriber.open(new TestSubscriberInitContext());
assertThatThrownBy(() ->
subscriber.getSubscribedTopicPartitions(adminClient))
.isInstanceOf(RuntimeException.class)
@@ -98,7 +96,6 @@ class KafkaSubscriberTest {
KafkaSubscriber subscriber =
KafkaSubscriber.getTopicPatternSubscriber(pattern);
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
- subscriber.open(new TestSubscriberInitContext());
final Set<TopicPartition> expectedSubscribedPartitions =
new HashSet<>(
@@ -117,7 +114,6 @@ class KafkaSubscriberTest {
partitions.remove(new TopicPartition(TOPIC1, 1));
KafkaSubscriber subscriber =
KafkaSubscriber.getPartitionSetSubscriber(partitions);
- subscriber.open(new TestSubscriberInitContext());
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
@@ -133,7 +129,6 @@ class KafkaSubscriberTest {
final KafkaSubscriber subscriber =
KafkaSubscriber.getPartitionSetSubscriber(
Collections.singleton(nonExistingPartition));
- subscriber.open(new TestSubscriberInitContext());
assertThatThrownBy(() ->
subscriber.getSubscribedTopicPartitions(adminClient))
.isInstanceOf(RuntimeException.class)
@@ -142,9 +137,4 @@ class KafkaSubscriberTest {
"Partition '%s' does not exist on Kafka
brokers",
nonExistingPartition));
}
-
- private static class TestSubscriberInitContext
- implements KafkaSubscriber.InitializationContext {
- private TestSubscriberInitContext() {}
- }
}