This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c4697f12ecb [FLINK-24660][connector/kafka] allow setting 
KafkaSubscriber in KafkaSourceBuilder
c4697f12ecb is described below

commit c4697f12ecb097a7a1c890264d813e834b8ae44d
Author: Mason Chen <[email protected]>
AuthorDate: Tue Apr 5 09:53:19 2022 -0700

    [FLINK-24660][connector/kafka] allow setting KafkaSubscriber in 
KafkaSourceBuilder
    
    This closes #19366
---
 .../flink/connector/kafka/source/KafkaSource.java  | 10 ++++-
 .../connector/kafka/source/KafkaSourceBuilder.java | 12 ++++++
 .../enumerator/subscriber/KafkaSubscriber.java     |  4 +-
 .../kafka/source/KafkaSourceBuilderTest.java       | 47 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 478bed59708..1b327504587 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -76,7 +76,10 @@ import java.util.function.Supplier;
  *     .build();
  * }</pre>
  *
- * <p>See {@link KafkaSourceBuilder} for more details.
+ * <p>{@link 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only 
supports
+ * adding new splits and not removing splits in split discovery.
+ *
+ * <p>See {@link KafkaSourceBuilder} for more details on how to configure this 
source.
  *
  * @param <OUT> the output type of the source.
  */
@@ -225,4 +228,9 @@ public class KafkaSource<OUT>
     Configuration getConfiguration() {
         return toConfiguration(props);
     }
+
+    @VisibleForTesting
+    KafkaSubscriber getKafkaSubscriber() {
+        return subscriber;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index cbce9b98f53..9e0c1e5e010 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -184,6 +184,18 @@ public class KafkaSourceBuilder<OUT> {
         return this;
     }
 
+    /**
+     * Set a custom Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split 
discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber 
kafkaSubscriber) {
+        ensureSubscriberIsNull("custom");
+        this.subscriber = checkNotNull(kafkaSubscriber);
+        return this;
+    }
+
     /**
      * Specify from which offsets the KafkaSource should start consume from by 
providing an {@link
      * OffsetsInitializer}.
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
index 638f7fb7826..1b819fb2305 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.subscriber;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.common.TopicPartition;
@@ -40,7 +40,7 @@ import java.util.regex.Pattern;
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to 
support all these
  * three types of subscribing mode.
  */
-@Internal
+@PublicEvolving
 public interface KafkaSubscriber extends Serializable {
 
     /**
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index 26a13d3158f..236e9618f67 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -19,18 +19,23 @@ package org.apache.flink.connector.kafka.source;
 
 import org.apache.flink.configuration.ConfigOptions;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.util.TestLoggerExtension;
 
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -152,6 +157,40 @@ public class KafkaSourceBuilderTest {
                         "Property group.id is required because partition 
topic-0 is initialized with committed offset");
     }
 
+    @Test
+    public void testSettingCustomKafkaSubscriber() {
+        ExampleCustomSubscriber exampleCustomSubscriber = new 
ExampleCustomSubscriber();
+        KafkaSourceBuilder<String> customKafkaSubscriberBuilder =
+                new KafkaSourceBuilder<String>()
+                        .setBootstrapServers("testServer")
+                        .setKafkaSubscriber(exampleCustomSubscriber)
+                        .setDeserializer(
+                                KafkaRecordDeserializationSchema.valueOnly(
+                                        StringDeserializer.class));
+
+        assertThat(customKafkaSubscriberBuilder.build().getKafkaSubscriber())
+                .isEqualTo(exampleCustomSubscriber);
+
+        assertThatThrownBy(() -> 
customKafkaSubscriberBuilder.setTopics("topic"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Cannot use topics for consumption because a 
ExampleCustomSubscriber is already set for consumption.");
+
+        assertThatThrownBy(
+                        () -> 
customKafkaSubscriberBuilder.setTopicPattern(Pattern.compile(".+")))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Cannot use topic pattern for consumption because a 
ExampleCustomSubscriber is already set for consumption.");
+
+        assertThatThrownBy(
+                        () ->
+                                customKafkaSubscriberBuilder.setPartitions(
+                                        Collections.singleton(new 
TopicPartition("topic", 0))))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Cannot use partitions for consumption because a 
ExampleCustomSubscriber is already set for consumption.");
+    }
+
     private KafkaSourceBuilder<String> getBasicBuilder() {
         return new KafkaSourceBuilder<String>()
                 .setBootstrapServers("testServer")
@@ -159,4 +198,12 @@ public class KafkaSourceBuilderTest {
                 .setDeserializer(
                         
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
     }
+
+    private static class ExampleCustomSubscriber implements KafkaSubscriber {
+
+        @Override
+        public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient 
adminClient) {
+            return Collections.singleton(new TopicPartition("topic", 0));
+        }
+    }
 }

Reply via email to