This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f0bd873309c3b0a13edb5354912ffdb1169de5b4
Author: Qingsheng Ren <[email protected]>
AuthorDate: Tue Sep 14 15:22:00 2021 +0800

    [FLINK-24277][connector/kafka] Add configuration for committing offset on 
checkpoint and disable it if group ID is not specified
    
    (cherry picked from commit ca8bff231aed2412f579d0a4e446e9a6bee42581)
---
 docs/content/docs/connectors/datastream/kafka.md   |   1 +
 .../flink/connector/kafka/source/KafkaSource.java  |   5 +
 .../connector/kafka/source/KafkaSourceBuilder.java |  35 ++++++-
 .../connector/kafka/source/KafkaSourceOptions.java |   6 ++
 .../source/reader/KafkaPartitionSplitReader.java   |  24 ++---
 .../kafka/source/reader/KafkaSourceReader.java     |  17 ++++
 .../kafka/source/KafkaSourceBuilderTest.java       | 103 ++++++++++++++++++++-
 .../connector/kafka/source/KafkaSourceITCase.java  |  19 ++++
 .../kafka/source/reader/KafkaSourceReaderTest.java |  44 ++++++++-
 9 files changed, 233 insertions(+), 21 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/kafka.md 
b/docs/content/docs/connectors/datastream/kafka.md
index 8fcd023..cab25dc 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -154,6 +154,7 @@ KafkaSource has following options for configuration:
   below for more details.
 - ```register.consumer.metrics``` specifies whether to register metrics of 
KafkaConsumer in Flink
 metric group
+- ```commit.offsets.on.checkpoint``` specifies whether to commit consuming 
offsets to Kafka brokers on checkpoint
 
 For configurations of KafkaConsumer, you can refer to
 <a href="http://kafka.apache.org/documentation/#consumerconfigs";>Apache Kafka 
documentation</a>
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index af0013b..d1219c0 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -214,4 +214,9 @@ public class KafkaSource<OUT>
         props.stringPropertyNames().forEach(key -> config.setString(key, 
props.getProperty(key)));
         return config;
     }
+
+    @VisibleForTesting
+    Configuration getConfiguration() {
+        return toConfiguration(props);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index cd286ed..eb93683 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -41,6 +41,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The @builder class for {@link KafkaSource} to make it easier for the users 
to construct a {@link
@@ -429,8 +430,12 @@ public class KafkaSourceBuilder<OUT> {
                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName(),
                 true);
-        maybeOverride(
-                ConsumerConfig.GROUP_ID_CONFIG, "KafkaSource-" + new 
Random().nextLong(), false);
+        if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+            LOG.warn(
+                    "Offset commit on checkpoint is disabled because {} is not 
specified",
+                    ConsumerConfig.GROUP_ID_CONFIG);
+            
maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", 
false);
+        }
         maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", 
false);
         maybeOverride(
                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
@@ -443,10 +448,13 @@ public class KafkaSourceBuilder<OUT> {
                 "-1",
                 boundedness == Boundedness.BOUNDED);
 
-        // If the client id prefix is not set, reuse the consumer group id as 
the client id prefix.
+        // If the client id prefix is not set, reuse the consumer group id as 
the client id prefix,
+        // or generate a random string if consumer group id is not specified.
         maybeOverride(
                 KafkaSourceOptions.CLIENT_ID_PREFIX.key(),
-                props.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)
+                        ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG)
+                        : "KafkaSource-" + new Random().nextLong(),
                 false);
     }
 
@@ -481,5 +489,24 @@ public class KafkaSourceBuilder<OUT> {
                 "No subscribe mode is specified, "
                         + "should be one of topics, topic pattern and 
partition set.");
         checkNotNull(deserializationSchema, "Deserialization schema is 
required but not provided.");
+        // Check consumer group ID
+        checkState(
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || 
!offsetCommitEnabledManually(),
+                String.format(
+                        "Property %s is required when offset commit is 
enabled",
+                        ConsumerConfig.GROUP_ID_CONFIG));
+    }
+
+    private boolean offsetCommitEnabledManually() {
+        boolean autoCommit =
+                props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                        && Boolean.parseBoolean(
+                                
props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+        boolean commitOnCheckpoint =
+                
props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())
+                        && Boolean.parseBoolean(
+                                props.getProperty(
+                                        
KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()));
+        return autoCommit || commitOnCheckpoint;
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
index e48804b..1a05833 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
@@ -48,6 +48,12 @@ public class KafkaSourceOptions {
                     .withDescription(
                             "Whether to register metrics of KafkaConsumer into 
Flink metric group");
 
+    public static final ConfigOption<Boolean> COMMIT_OFFSETS_ON_CHECKPOINT =
+            ConfigOptions.key("commit.offsets.on.checkpoint")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether to commit consuming offset on 
checkpoint.");
+
     @SuppressWarnings("unchecked")
     public static <T> T getOption(
             Properties props, ConfigOption<?> configOption, Function<String, 
T> parser) {
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index e13e10e..a5d81fa 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -340,17 +340,19 @@ public class KafkaPartitionSplitReader<T>
             Set<TopicPartition> partitionsStoppingAtCommitted) {
         Map<TopicPartition, Long> endOffset = 
consumer.endOffsets(partitionsStoppingAtLatest);
         stoppingOffsets.putAll(endOffset);
-        consumer.committed(partitionsStoppingAtCommitted)
-                .forEach(
-                        (tp, offsetAndMetadata) -> {
-                            Preconditions.checkNotNull(
-                                    offsetAndMetadata,
-                                    String.format(
-                                            "Partition %s should stop at 
committed offset. "
-                                                    + "But there is no 
committed offset of this partition for group %s",
-                                            tp, groupId));
-                            stoppingOffsets.put(tp, 
offsetAndMetadata.offset());
-                        });
+        if (!partitionsStoppingAtCommitted.isEmpty()) {
+            consumer.committed(partitionsStoppingAtCommitted)
+                    .forEach(
+                            (tp, offsetAndMetadata) -> {
+                                Preconditions.checkNotNull(
+                                        offsetAndMetadata,
+                                        String.format(
+                                                "Partition %s should stop at 
committed offset. "
+                                                        + "But there is no 
committed offset of this partition for group %s",
+                                                tp, groupId));
+                                stoppingOffsets.put(tp, 
offsetAndMetadata.offset());
+                            });
+        }
     }
 
     private void removeEmptySplits() {
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index 79c8064..08f82b0 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import 
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
 import 
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
@@ -55,6 +56,7 @@ public class KafkaSourceReader<T>
     private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> 
offsetsToCommit;
     private final ConcurrentMap<TopicPartition, OffsetAndMetadata> 
offsetsOfFinishedSplits;
     private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
+    private final boolean commitOffsetsOnCheckpoint;
 
     public KafkaSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, 
Long>>> elementsQueue,
@@ -67,6 +69,13 @@ public class KafkaSourceReader<T>
         this.offsetsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
         this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
         this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
+        this.commitOffsetsOnCheckpoint =
+                config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
+        if (!commitOffsetsOnCheckpoint) {
+            LOG.warn(
+                    "Offset commit on checkpoint is disabled. "
+                            + "Consuming offset will not be reported back to 
Kafka cluster.");
+        }
     }
 
     @Override
@@ -84,6 +93,10 @@ public class KafkaSourceReader<T>
     @Override
     public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
         List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
+        if (!commitOffsetsOnCheckpoint) {
+            return splits;
+        }
+
         if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
             offsetsToCommit.put(checkpointId, Collections.emptyMap());
         } else {
@@ -108,6 +121,10 @@ public class KafkaSourceReader<T>
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         LOG.debug("Committing offsets for checkpoint {}", checkpointId);
+        if (!commitOffsetsOnCheckpoint) {
+            return;
+        }
+
         ((KafkaSourceFetcherManager<T>) splitFetcherManager)
                 .commitOffsets(
                         offsetsToCommit.get(checkpointId),
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index 6624e4d..e13514a 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -17,22 +17,119 @@
 
 package org.apache.flink.connector.kafka.source;
 
+import org.apache.flink.configuration.ConfigOptions;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
 import org.junit.Test;
 
 /** Tests for {@link KafkaSourceBuilder}. */
 public class KafkaSourceBuilderTest extends TestLogger {
 
     @Test
+    public void testBuildSourceWithGroupId() {
+        final KafkaSource<String> kafkaSource = 
getBasicBuilder().setGroupId("groupId").build();
+        // Commit on checkpoint should be enabled by default
+        Assert.assertTrue(
+                kafkaSource
+                        .getConfiguration()
+                        .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
+        // Auto commit should be disabled by default
+        Assert.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(
+                                
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                                        .booleanType()
+                                        .noDefaultValue()));
+    }
+
+    @Test
     public void testBuildSourceWithoutGroupId() {
-        new KafkaSourceBuilder<String>()
+        final KafkaSource<String> kafkaSource = getBasicBuilder().build();
+        // Commit on checkpoint and auto commit should be disabled because 
group.id is not specified
+        Assert.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
+        Assert.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(
+                                
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                                        .booleanType()
+                                        .noDefaultValue()));
+    }
+
+    @Test
+    public void testEnableCommitOnCheckpointWithoutGroupId() {
+        final IllegalStateException exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        .setProperty(
+                                                
KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
+                                                        .key(),
+                                                "true")
+                                        .build());
+        MatcherAssert.assertThat(
+                exception.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when offset commit is 
enabled"));
+    }
+
+    @Test
+    public void testEnableAutoCommitWithoutGroupId() {
+        final IllegalStateException exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        .setProperty(
+                                                
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+                                        .build());
+        MatcherAssert.assertThat(
+                exception.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when offset commit is 
enabled"));
+    }
+
+    @Test
+    public void testDisableOffsetCommitWithoutGroupId() {
+        getBasicBuilder()
+                
.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
+                .build();
+        
getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false").build();
+    }
+
+    private KafkaSourceBuilder<String> getBasicBuilder() {
+        return new KafkaSourceBuilder<String>()
                 .setBootstrapServers("testServer")
                 .setTopics("topic")
                 .setDeserializer(
-                        
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
-                .build();
+                        
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
+    }
+
+    @SuppressWarnings({"unchecked", "SameParameterValue"})
+    private <T extends Throwable> T assertThrows(Class<T> exceptionClass, 
Runnable runnable) {
+        try {
+            runnable.run();
+        } catch (Throwable t) {
+            if (exceptionClass.isInstance(t)) {
+                return (T) t;
+            }
+            throw new AssertionError(
+                    String.format(
+                            "Expected %s to be thrown, but actually got %s",
+                            exceptionClass, t.getClass()));
+        }
+        throw new AssertionError(
+                String.format("Expected %s to be thrown, but nothing was 
thrown", exceptionClass));
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 89ce39c..ea37d8d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -197,6 +197,25 @@ public class KafkaSourceITCase {
         executeAndVerify(env, stream);
     }
 
+    @Test
+    public void testBasicReadWithoutGroupId() throws Exception {
+        KafkaSource<PartitionAndValue> source =
+                KafkaSource.<PartitionAndValue>builder()
+                        
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                        .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+                        .setDeserializer(new 
TestingKafkaRecordDeserializationSchema())
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .setBounded(OffsetsInitializer.latest())
+                        .build();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        DataStream<PartitionAndValue> stream =
+                env.fromSource(
+                        source, WatermarkStrategy.noWatermarks(), 
"testBasicReadWithoutGroupId");
+        executeAndVerify(env, stream);
+    }
+
     // -----------------
 
     private static class PartitionAndValue implements Serializable {
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 889afa4..0304315 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
 import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
@@ -60,6 +61,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -241,6 +243,32 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
     }
 
     @Test
+    public void testDisableOffsetCommit() throws Exception {
+        final Properties properties = new Properties();
+        
properties.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), 
"false");
+        try (KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(
+                                Boundedness.CONTINUOUS_UNBOUNDED,
+                                new TestingReaderContext(),
+                                (ignore) -> {},
+                                properties)) {
+            reader.addSplits(
+                    getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, 
Boundedness.CONTINUOUS_UNBOUNDED));
+            ValidatingSourceOutput output = new ValidatingSourceOutput();
+            long checkpointId = 0;
+            do {
+                checkpointId++;
+                reader.pollNext(output);
+                // Create a checkpoint for each message consumption, but not 
complete them.
+                reader.snapshotState(checkpointId);
+                // Offsets to commit should be always empty because offset 
commit is disabled
+                assertEquals(0, reader.getOffsetsToCommit().size());
+            } while (output.count() < TOTAL_NUM_RECORDS);
+        }
+    }
+
+    @Test
     public void testKafkaSourceMetrics() throws Exception {
         final MetricListener metricListener = new MetricListener();
         final String groupId = "testKafkaSourceMetrics";
@@ -397,6 +425,17 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
             SourceReaderContext context,
             Consumer<Collection<String>> splitFinishedHook)
             throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        return createReader(boundedness, context, splitFinishedHook, 
properties);
+    }
+
+    private SourceReader<Integer, KafkaPartitionSplit> createReader(
+            Boundedness boundedness,
+            SourceReaderContext context,
+            Consumer<Collection<String>> splitFinishedHook,
+            Properties props)
+            throws Exception {
         KafkaSourceBuilder<Integer> builder =
                 KafkaSource.<Integer>builder()
                         .setClientIdPrefix("KafkaSourceReaderTest")
@@ -407,9 +446,8 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
                         .setProperty(
                                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                 KafkaSourceTestEnv.brokerConnectionStrings)
-                        .setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-                        .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
-
+                        .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+                        .setProperties(props);
         if (boundedness == Boundedness.BOUNDED) {
             builder.setBounded(OffsetsInitializer.latest());
         }

Reply via email to