This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bf3df16e3cc9a3bede3f5dabc8d08c9369e02485
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Sep 16 15:20:08 2021 +0800

    [FLINK-24277][connector/kafka] Add configuration for committing offset on 
checkpoint and disable it if group ID is not specified
---
 .../flink/connector/kafka/source/KafkaSource.java  |   6 ++
 .../connector/kafka/source/KafkaSourceBuilder.java |  35 ++++++-
 .../connector/kafka/source/KafkaSourceOptions.java |   6 ++
 .../source/reader/KafkaPartitionSplitReader.java   |  24 ++---
 .../kafka/source/reader/KafkaSourceReader.java     |  17 ++++
 .../kafka/source/KafkaSourceBuilderTest.java       | 103 ++++++++++++++++++++-
 .../connector/kafka/source/KafkaSourceITCase.java  |  19 ++++
 .../kafka/source/reader/KafkaSourceReaderTest.java |  36 ++++++-
 8 files changed, 225 insertions(+), 21 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index ea7ad6c..477a4d6 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.kafka.source;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
@@ -178,4 +179,9 @@ public class KafkaSource<OUT>
         props.stringPropertyNames().forEach(key -> config.setString(key, 
props.getProperty(key)));
         return config;
     }
+
+    @VisibleForTesting
+    Configuration getConfiguration() {
+        return toConfiguration(props);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index 0099df3..ddcf2c7 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The @builder class for {@link KafkaSource} to make it easier for the users 
to construct a {@link
@@ -412,8 +413,12 @@ public class KafkaSourceBuilder<OUT> {
                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName(),
                 true);
-        maybeOverride(
-                ConsumerConfig.GROUP_ID_CONFIG, "KafkaSource-" + new 
Random().nextLong(), false);
+        if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+            LOG.warn(
+                    "Offset commit on checkpoint is disabled because {} is not 
specified",
+                    ConsumerConfig.GROUP_ID_CONFIG);
+            
maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", 
false);
+        }
         maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", 
false);
         maybeOverride(
                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
@@ -426,10 +431,13 @@ public class KafkaSourceBuilder<OUT> {
                 "-1",
                 boundedness == Boundedness.BOUNDED);
 
-        // If the client id prefix is not set, reuse the consumer group id as 
the client id prefix.
+        // If the client id prefix is not set, reuse the consumer group id as 
the client id prefix,
+        // or generate a random string if consumer group id is not specified.
         maybeOverride(
                 KafkaSourceOptions.CLIENT_ID_PREFIX.key(),
-                props.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)
+                        ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG)
+                        : "KafkaSource-" + new Random().nextLong(),
                 false);
     }
 
@@ -464,5 +472,24 @@ public class KafkaSourceBuilder<OUT> {
                 "No subscribe mode is specified, "
                         + "should be one of topics, topic pattern and 
partition set.");
         checkNotNull(deserializationSchema, "Deserialization schema is 
required but not provided.");
+        // Check consumer group ID
+        checkState(
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || 
!offsetCommitEnabledManually(),
+                String.format(
+                        "Property %s is required when offset commit is 
enabled",
+                        ConsumerConfig.GROUP_ID_CONFIG));
+    }
+
+    private boolean offsetCommitEnabledManually() {
+        boolean autoCommit =
+                props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                        && Boolean.parseBoolean(
+                                
props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+        boolean commitOnCheckpoint =
+                
props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())
+                        && Boolean.parseBoolean(
+                                props.getProperty(
+                                        
KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()));
+        return autoCommit || commitOnCheckpoint;
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
index 98c735f..3da2eb7 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
@@ -41,6 +41,12 @@ public class KafkaSourceOptions {
                             "The interval in milliseconds for the Kafka source 
to discover "
                                     + "the new partitions. A non-positive 
value disables the partition discovery.");
 
+    public static final ConfigOption<Boolean> COMMIT_OFFSETS_ON_CHECKPOINT =
+            ConfigOptions.key("commit.offsets.on.checkpoint")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether to commit consuming offset on 
checkpoint.");
+
     @SuppressWarnings("unchecked")
     public static <T> T getOption(
             Properties props, ConfigOption configOption, Function<String, T> 
parser) {
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index c1a827c..af0001d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -311,17 +311,19 @@ public class KafkaPartitionSplitReader<T>
             Set<TopicPartition> partitionsStoppingAtCommitted) {
         Map<TopicPartition, Long> endOffset = 
consumer.endOffsets(partitionsStoppingAtLatest);
         stoppingOffsets.putAll(endOffset);
-        consumer.committed(partitionsStoppingAtCommitted)
-                .forEach(
-                        (tp, offsetAndMetadata) -> {
-                            Preconditions.checkNotNull(
-                                    offsetAndMetadata,
-                                    String.format(
-                                            "Partition %s should stop at 
committed offset. "
-                                                    + "But there is no 
committed offset of this partition for group %s",
-                                            tp, groupId));
-                            stoppingOffsets.put(tp, 
offsetAndMetadata.offset());
-                        });
+        if (!partitionsStoppingAtCommitted.isEmpty()) {
+            consumer.committed(partitionsStoppingAtCommitted)
+                    .forEach(
+                            (tp, offsetAndMetadata) -> {
+                                Preconditions.checkNotNull(
+                                        offsetAndMetadata,
+                                        String.format(
+                                                "Partition %s should stop at 
committed offset. "
+                                                        + "But there is no 
committed offset of this partition for group %s",
+                                                tp, groupId));
+                                stoppingOffsets.put(tp, 
offsetAndMetadata.offset());
+                            });
+        }
     }
 
     private void removeEmptySplits() {
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index bf3d42e..d2c365d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import 
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
@@ -54,6 +55,7 @@ public class KafkaSourceReader<T>
     // and the split fetcher thread in the callback.
     private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> 
offsetsToCommit;
     private final ConcurrentMap<TopicPartition, OffsetAndMetadata> 
offsetsOfFinishedSplits;
+    private final boolean commitOffsetsOnCheckpoint;
 
     public KafkaSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, 
Long>>> elementsQueue,
@@ -69,6 +71,13 @@ public class KafkaSourceReader<T>
                 context);
         this.offsetsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
         this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
+        this.commitOffsetsOnCheckpoint =
+                config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
+        if (!commitOffsetsOnCheckpoint) {
+            LOG.warn(
+                    "Offset commit on checkpoint is disabled. "
+                            + "Consuming offset will not be reported back to 
Kafka cluster.");
+        }
     }
 
     @Override
@@ -86,6 +95,10 @@ public class KafkaSourceReader<T>
     @Override
     public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
         List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
+        if (!commitOffsetsOnCheckpoint) {
+            return splits;
+        }
+
         if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
             offsetsToCommit.put(checkpointId, Collections.emptyMap());
         } else {
@@ -110,6 +123,10 @@ public class KafkaSourceReader<T>
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         LOG.debug("Committing offsets for checkpoint {}", checkpointId);
+        if (!commitOffsetsOnCheckpoint) {
+            return;
+        }
+
         ((KafkaSourceFetcherManager<T>) splitFetcherManager)
                 .commitOffsets(
                         offsetsToCommit.get(checkpointId),
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index e83a3d3..7d3d98c 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -18,19 +18,100 @@
 package org.apache.flink.connector.kafka.source;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOptions;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
 import org.junit.Test;
 
 /** Tests for {@link KafkaSourceBuilder}. */
 public class KafkaSourceBuilderTest extends TestLogger {
 
     @Test
+    public void testBuildSourceWithGroupId() {
+        final KafkaSource<String> kafkaSource = 
getBasicBuilder().setGroupId("groupId").build();
+        // Commit on checkpoint should be enabled by default
+        Assert.assertTrue(
+                kafkaSource
+                        .getConfiguration()
+                        .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
+        // Auto commit should be disabled by default
+        Assert.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(
+                                
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                                        .booleanType()
+                                        .noDefaultValue()));
+    }
+
+    @Test
     public void testBuildSourceWithoutGroupId() {
-        new KafkaSourceBuilder<String>()
+        final KafkaSource<String> kafkaSource = getBasicBuilder().build();
+        // Commit on checkpoint and auto commit should be disabled because 
group.id is not specified
+        Assert.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
+        Assert.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(
+                                
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                                        .booleanType()
+                                        .noDefaultValue()));
+    }
+
+    @Test
+    public void testEnableCommitOnCheckpointWithoutGroupId() {
+        final IllegalStateException exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        .setProperty(
+                                                
KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
+                                                        .key(),
+                                                "true")
+                                        .build());
+        MatcherAssert.assertThat(
+                exception.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when offset commit is 
enabled"));
+    }
+
+    @Test
+    public void testEnableAutoCommitWithoutGroupId() {
+        final IllegalStateException exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        .setProperty(
+                                                
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+                                        .build());
+        MatcherAssert.assertThat(
+                exception.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when offset commit is 
enabled"));
+    }
+
+    @Test
+    public void testDisableOffsetCommitWithoutGroupId() {
+        getBasicBuilder()
+                
.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
+                .build();
+        
getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false").build();
+    }
+
+    private KafkaSourceBuilder<String> getBasicBuilder() {
+        return new KafkaSourceBuilder<String>()
                 .setBootstrapServers("testServer")
                 .setTopics("topic")
                 .setDeserializer(
@@ -45,7 +126,23 @@ public class KafkaSourceBuilderTest extends TestLogger {
                                     ConsumerRecord<byte[], byte[]> record,
                                     Collector<String> collector)
                                     throws Exception {}
-                        })
-                .build();
+                        });
+    }
+
+    @SuppressWarnings({"unchecked", "SameParameterValue"})
+    private <T extends Throwable> T assertThrows(Class<T> exceptionClass, 
Runnable runnable) {
+        try {
+            runnable.run();
+        } catch (Throwable t) {
+            if (exceptionClass.isInstance(t)) {
+                return (T) t;
+            }
+            throw new AssertionError(
+                    String.format(
+                            "Expected %s to be thrown, but actually got %s",
+                            exceptionClass, t.getClass()));
+        }
+        throw new AssertionError(
+                String.format("Expected %s to be thrown, but nothing was 
thrown", exceptionClass));
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 593a974..a44993e 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -107,6 +107,25 @@ public class KafkaSourceITCase {
         executeAndVerify(env, stream);
     }
 
+    @Test
+    public void testBasicReadWithoutGroupId() throws Exception {
+        KafkaSource<PartitionAndValue> source =
+                KafkaSource.<PartitionAndValue>builder()
+                        
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                        .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+                        .setDeserializer(new TestingKafkaRecordDeserializer())
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .setBounded(OffsetsInitializer.latest())
+                        .build();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        DataStream<PartitionAndValue> stream =
+                env.fromSource(
+                        source, WatermarkStrategy.noWatermarks(), 
"testBasicReadWithoutGroupId");
+        executeAndVerify(env, stream);
+    }
+
     // -----------------
 
     private static class PartitionAndValue implements Serializable {
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 35cd003..637c8f8 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
@@ -50,6 +51,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.function.Supplier;
 
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
@@ -221,6 +223,28 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
         }
     }
 
+    @Test
+    public void testDisableOffsetCommit() throws Exception {
+        final Properties properties = new Properties();
+        
properties.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), 
"false");
+        try (KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(Boundedness.CONTINUOUS_UNBOUNDED, 
properties)) {
+            reader.addSplits(
+                    getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, 
Boundedness.CONTINUOUS_UNBOUNDED));
+            ValidatingSourceOutput output = new ValidatingSourceOutput();
+            long checkpointId = 0;
+            do {
+                checkpointId++;
+                reader.pollNext(output);
+                // Create a checkpoint for each message consumption, but not 
complete them.
+                reader.snapshotState(checkpointId);
+                // Offsets to commit should be always empty because offset 
commit is disabled
+                assertEquals(0, reader.getOffsetsToCommit().size());
+            } while (output.count() < TOTAL_NUM_RECORDS);
+        }
+    }
+
     // ------------------------------------------
 
     @Override
@@ -256,6 +280,13 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
 
     private SourceReader<Integer, KafkaPartitionSplit> createReader(
             Boundedness boundedness, String groupId) {
+        Properties properties = new Properties();
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        return createReader(boundedness, properties);
+    }
+
+    private SourceReader<Integer, KafkaPartitionSplit> createReader(
+            Boundedness boundedness, Properties props) {
         KafkaSourceBuilder<Integer> builder =
                 KafkaSource.<Integer>builder()
                         .setClientIdPrefix("KafkaSourceReaderTest")
@@ -265,9 +296,8 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
                         .setProperty(
                                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                 KafkaSourceTestEnv.brokerConnectionStrings)
-                        .setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-                        .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
-
+                        .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+                        .setProperties(props);
         if (boundedness == Boundedness.BOUNDED) {
             builder.setBounded(OffsetsInitializer.latest());
         }

Reply via email to