StephanEwen commented on a change in pull request #13574:
URL: https://github.com/apache/flink/pull/13574#discussion_r518249593



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -0,0 +1,474 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The @builder class for {@link KafkaSource} to make it easier for the users 
to construct
+ * a {@link KafkaSource}.
+ *
+ * <p>The following example shows the minimum setup to create a KafkaSource 
that reads the
+ * String values from a Kafka topic.
+ * <pre>{@code
+ * KafkaSource<String> source = KafkaSource
+ *     .<String>builder()
+ *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ *     .setGroupId("myGroup")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
+ *     .build();
+ * }</pre>
+ * The bootstrap servers, group id, topics/partitions to consume, and the 
record deserializer
+ * are required fields that must be set.
+ *
+ * <p>To specify the starting offsets of the KafkaSource, one can call
+ * {@link #setStartingOffsets(OffsetsInitializer)}.
+ *
+ * <p>By default the KafkaSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode
+ * and never stops until the Flink job is canceled or fails. To let the 
KafkaSource run
+ * in {@link Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given 
offsets, one can
+ * call {@link #setUnbounded(OffsetsInitializer)}. For example the following 
KafkaSource
+ * stops after it consumes up to the latest partition offsets at the point 
when the Flink
+ * started.
+ * <pre>{@code
+ * KafkaSource<String> source = KafkaSource
+ *     .<String>builder()
+ *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ *     .setGroupId("myGroup")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
+ *     .setUnbounded(OffsetsInitializer.latest())
+ *     .build();
+ * }</pre>
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the 
settings to
+ * build a KafkaSource.
+ */
+public class KafkaSourceBuilder<OUT> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceBuilder.class);
+       private static final String[] REQUIRED_CONFIGS = {
+               ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+               ConsumerConfig.GROUP_ID_CONFIG};
+       // The subscriber specifies the partitions to subscribe to.
+       private KafkaSubscriber subscriber;
+       // Users can specify the starting / stopping offset initializer.
+       private OffsetsInitializer startingOffsetsInitializer;
+       private OffsetsInitializer stoppingOffsetsInitializer;
+       // Boundedness
+       private Boundedness boundedness;
+       private KafkaRecordDeserializer<OUT> deserializationSchema;
+       // The configurations.
+       protected Properties props;
+
+       KafkaSourceBuilder() {
+               this.subscriber = null;
+               this.startingOffsetsInitializer = OffsetsInitializer.earliest();
+               this.stoppingOffsetsInitializer = new 
NoStoppingOffsetsInitializer();
+               this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+               this.deserializationSchema = null;
+               this.props = new Properties();
+       }
+
+       /**
+        * Sets the bootstrap servers for the KafkaConsumer of the KafkaSource.
+        *
+        * @param bootstrapServers the bootstrap servers of the Kafka cluster.
+        * @return this KafkaSourceBuilder.
+        */
+       public KafkaSourceBuilder<OUT> setBootstrapServers(String 
bootstrapServers) {
+               return setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+       }
+
+       /**
+        * Sets the consumer group id of the KafkaSource.
+        *
+        * @param groupId the group id of the KafkaSource.
+        * @return this KafkaSourceBuilder.
+        */
+       public KafkaSourceBuilder<OUT> setGroupId(String groupId) {
+               return setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+       }
+
+       /**
+        * Set a list of topics the KafkaSource should consume from. All the 
topics
+        * in the list should have existed in the Kafka cluster. Otherwise an
+        * exception will be thrown. To allow some of the topics to be created 
lazily,
+        * please use {@link #setTopicPattern(Pattern)} instead.
+        *
+        * @param topics the list of topics to consume from.
+        * @return this KafkaSourceBuilder.
+        * @see 
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+        */
+       public KafkaSourceBuilder<OUT> setTopics(List<String> topics) {

Review comment:
       Maybe add an overload here for `setTopics(String... topics)` for 
convenience.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to