This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c4697f12ecb [FLINK-24660][connector/kafka] allow setting
KafkaSubscriber in KafkaSourceBuilder
c4697f12ecb is described below
commit c4697f12ecb097a7a1c890264d813e834b8ae44d
Author: Mason Chen <[email protected]>
AuthorDate: Tue Apr 5 09:53:19 2022 -0700
[FLINK-24660][connector/kafka] allow setting KafkaSubscriber in
KafkaSourceBuilder
This closes #19366
---
.../flink/connector/kafka/source/KafkaSource.java | 10 ++++-
.../connector/kafka/source/KafkaSourceBuilder.java | 12 ++++++
.../enumerator/subscriber/KafkaSubscriber.java | 4 +-
.../kafka/source/KafkaSourceBuilderTest.java | 47 ++++++++++++++++++++++
4 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 478bed59708..1b327504587 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -76,7 +76,10 @@ import java.util.function.Supplier;
* .build();
* }</pre>
*
- * <p>See {@link KafkaSourceBuilder} for more details.
+ * <p>{@link
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only
supports
+ * adding new splits and not removing splits in split discovery.
+ *
+ * <p>See {@link KafkaSourceBuilder} for more details on how to configure this
source.
*
* @param <OUT> the output type of the source.
*/
@@ -225,4 +228,9 @@ public class KafkaSource<OUT>
Configuration getConfiguration() {
return toConfiguration(props);
}
+
+ @VisibleForTesting
+ KafkaSubscriber getKafkaSubscriber() {
+ return subscriber;
+ }
}
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index cbce9b98f53..9e0c1e5e010 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -184,6 +184,18 @@ public class KafkaSourceBuilder<OUT> {
return this;
}
+ /**
+ * Set a custom Kafka subscriber to use to discover new splits.
+ *
+ * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split
discovery.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber
kafkaSubscriber) {
+ ensureSubscriberIsNull("custom");
+ this.subscriber = checkNotNull(kafkaSubscriber);
+ return this;
+ }
+
/**
* Specify from which offsets the KafkaSource should start consume from by
providing an {@link
* OffsetsInitializer}.
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
index 638f7fb7826..1b819fb2305 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
@@ -18,7 +18,7 @@
package org.apache.flink.connector.kafka.source.enumerator.subscriber;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
@@ -40,7 +40,7 @@ import java.util.regex.Pattern;
* <p>The KafkaSubscriber provides a unified interface for the Kafka source to
support all these
* three types of subscribing mode.
*/
-@Internal
+@PublicEvolving
public interface KafkaSubscriber extends Serializable {
/**
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index 26a13d3158f..236e9618f67 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -19,18 +19,23 @@ package org.apache.flink.connector.kafka.source;
import org.apache.flink.configuration.ConfigOptions;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.TestLoggerExtension;
+import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -152,6 +157,40 @@ public class KafkaSourceBuilderTest {
"Property group.id is required because partition
topic-0 is initialized with committed offset");
}
+ @Test
+ public void testSettingCustomKafkaSubscriber() {
+ ExampleCustomSubscriber exampleCustomSubscriber = new
ExampleCustomSubscriber();
+ KafkaSourceBuilder<String> customKafkaSubscriberBuilder =
+ new KafkaSourceBuilder<String>()
+ .setBootstrapServers("testServer")
+ .setKafkaSubscriber(exampleCustomSubscriber)
+ .setDeserializer(
+ KafkaRecordDeserializationSchema.valueOnly(
+ StringDeserializer.class));
+
+ assertThat(customKafkaSubscriberBuilder.build().getKafkaSubscriber())
+ .isEqualTo(exampleCustomSubscriber);
+
+ assertThatThrownBy(() ->
customKafkaSubscriberBuilder.setTopics("topic"))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Cannot use topics for consumption because a
ExampleCustomSubscriber is already set for consumption.");
+
+ assertThatThrownBy(
+ () ->
customKafkaSubscriberBuilder.setTopicPattern(Pattern.compile(".+")))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Cannot use topic pattern for consumption because a
ExampleCustomSubscriber is already set for consumption.");
+
+ assertThatThrownBy(
+ () ->
+ customKafkaSubscriberBuilder.setPartitions(
+ Collections.singleton(new
TopicPartition("topic", 0))))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Cannot use partitions for consumption because a
ExampleCustomSubscriber is already set for consumption.");
+ }
+
private KafkaSourceBuilder<String> getBasicBuilder() {
return new KafkaSourceBuilder<String>()
.setBootstrapServers("testServer")
@@ -159,4 +198,12 @@ public class KafkaSourceBuilderTest {
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
}
+
+ private static class ExampleCustomSubscriber implements KafkaSubscriber {
+
+ @Override
+ public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient
adminClient) {
+ return Collections.singleton(new TopicPartition("topic", 0));
+ }
+ }
}