This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bf3df16e3cc9a3bede3f5dabc8d08c9369e02485 Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Sep 16 15:20:08 2021 +0800 [FLINK-24277][connector/kafka] Add configuration for committing offset on checkpoint and disable it if group ID is not specified --- .../flink/connector/kafka/source/KafkaSource.java | 6 ++ .../connector/kafka/source/KafkaSourceBuilder.java | 35 ++++++- .../connector/kafka/source/KafkaSourceOptions.java | 6 ++ .../source/reader/KafkaPartitionSplitReader.java | 24 ++--- .../kafka/source/reader/KafkaSourceReader.java | 17 ++++ .../kafka/source/KafkaSourceBuilderTest.java | 103 ++++++++++++++++++++- .../connector/kafka/source/KafkaSourceITCase.java | 19 ++++ .../kafka/source/reader/KafkaSourceReaderTest.java | 36 ++++++- 8 files changed, 225 insertions(+), 21 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index ea7ad6c..477a4d6 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.source; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -178,4 +179,9 @@ public class KafkaSource<OUT> props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key))); return config; } + + @VisibleForTesting + Configuration getConfiguration() { + return toConfiguration(props); + } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index 0099df3..ddcf2c7 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * The @builder class for {@link KafkaSource} to make it easier for the users to construct a {@link @@ -412,8 +413,12 @@ public class KafkaSourceBuilder<OUT> { ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), true); - maybeOverride( - ConsumerConfig.GROUP_ID_CONFIG, "KafkaSource-" + new Random().nextLong(), false); + if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + LOG.warn( + "Offset commit on checkpoint is disabled because {} is not specified", + ConsumerConfig.GROUP_ID_CONFIG); + maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false); + } maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); maybeOverride( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, @@ -426,10 +431,13 @@ public class KafkaSourceBuilder<OUT> { "-1", boundedness == Boundedness.BOUNDED); - // If the client id prefix is not set, reuse the consumer group id as the client id prefix. + // If the client id prefix is not set, reuse the consumer group id as the client id prefix, + // or generate a random string if consumer group id is not specified. maybeOverride( KafkaSourceOptions.CLIENT_ID_PREFIX.key(), - props.getProperty(ConsumerConfig.GROUP_ID_CONFIG), + props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) + ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) + : "KafkaSource-" + new Random().nextLong(), false); } @@ -464,5 +472,24 @@ public class KafkaSourceBuilder<OUT> { "No subscribe mode is specified, " + "should be one of topics, topic pattern and partition set."); checkNotNull(deserializationSchema, "Deserialization schema is required but not provided."); + // Check consumer group ID + checkState( + props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(), + String.format( + "Property %s is required when offset commit is enabled", + ConsumerConfig.GROUP_ID_CONFIG)); + } + + private boolean offsetCommitEnabledManually() { + boolean autoCommit = + props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + && Boolean.parseBoolean( + props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + boolean commitOnCheckpoint = + props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) + && Boolean.parseBoolean( + props.getProperty( + KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())); + return autoCommit || commitOnCheckpoint; } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java index 98c735f..3da2eb7 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java @@ -41,6 +41,12 @@ public class KafkaSourceOptions { "The interval in milliseconds for the Kafka source to discover " + "the new partitions. A non-positive value disables the partition discovery."); + public static final ConfigOption<Boolean> COMMIT_OFFSETS_ON_CHECKPOINT = + ConfigOptions.key("commit.offsets.on.checkpoint") + .booleanType() + .defaultValue(true) + .withDescription("Whether to commit consuming offset on checkpoint."); + @SuppressWarnings("unchecked") public static <T> T getOption( Properties props, ConfigOption configOption, Function<String, T> parser) { diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index c1a827c..af0001d 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -311,17 +311,19 @@ public class KafkaPartitionSplitReader<T> Set<TopicPartition> partitionsStoppingAtCommitted) { Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest); stoppingOffsets.putAll(endOffset); - consumer.committed(partitionsStoppingAtCommitted) - .forEach( - (tp, offsetAndMetadata) -> { - Preconditions.checkNotNull( - offsetAndMetadata, - String.format( - "Partition %s should stop at committed offset. " - + "But there is no committed offset of this partition for group %s", - tp, groupId)); - stoppingOffsets.put(tp, offsetAndMetadata.offset()); - }); + if (!partitionsStoppingAtCommitted.isEmpty()) { + consumer.committed(partitionsStoppingAtCommitted) + .forEach( + (tp, offsetAndMetadata) -> { + Preconditions.checkNotNull( + offsetAndMetadata, + String.format( + "Partition %s should stop at committed offset. " + + "But there is no committed offset of this partition for group %s", + tp, groupId)); + stoppingOffsets.put(tp, offsetAndMetadata.offset()); + }); + } } private void removeEmptySplits() { diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java index bf3d42e..d2c365d 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState; @@ -54,6 +55,7 @@ public class KafkaSourceReader<T> // and the split fetcher thread in the callback. private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit; private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits; + private final boolean commitOffsetsOnCheckpoint; public KafkaSourceReader( FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue, @@ -69,6 +71,13 @@ public class KafkaSourceReader<T> context); this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.offsetsOfFinishedSplits = new ConcurrentHashMap<>(); + this.commitOffsetsOnCheckpoint = + config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT); + if (!commitOffsetsOnCheckpoint) { + LOG.warn( + "Offset commit on checkpoint is disabled. " + + "Consuming offset will not be reported back to Kafka cluster."); + } } @Override @@ -86,6 +95,10 @@ public class KafkaSourceReader<T> @Override public List<KafkaPartitionSplit> snapshotState(long checkpointId) { List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId); + if (!commitOffsetsOnCheckpoint) { + return splits; + } + if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) { offsetsToCommit.put(checkpointId, Collections.emptyMap()); } else { @@ -110,6 +123,10 @@ public class KafkaSourceReader<T> @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.debug("Committing offsets for checkpoint {}", checkpointId); + if (!commitOffsetsOnCheckpoint) { + return; + } + ((KafkaSourceFetcherManager<T>) splitFetcherManager) .commitOffsets( offsetsToCommit.get(checkpointId), diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java index e83a3d3..7d3d98c 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java @@ -18,19 +18,100 @@ package org.apache.flink.connector.kafka.source; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; import org.junit.Test; /** Tests for {@link KafkaSourceBuilder}. */ public class KafkaSourceBuilderTest extends TestLogger { @Test + public void testBuildSourceWithGroupId() { + final KafkaSource<String> kafkaSource = getBasicBuilder().setGroupId("groupId").build(); + // Commit on checkpoint should be enabled by default + Assert.assertTrue( + kafkaSource + .getConfiguration() + .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)); + // Auto commit should be disabled by default + Assert.assertFalse( + kafkaSource + .getConfiguration() + .get( + ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + .booleanType() + .noDefaultValue())); + } + + @Test public void testBuildSourceWithoutGroupId() { - new KafkaSourceBuilder<String>() + final KafkaSource<String> kafkaSource = getBasicBuilder().build(); + // Commit on checkpoint and auto commit should be disabled because group.id is not specified + Assert.assertFalse( + kafkaSource + .getConfiguration() + .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)); + Assert.assertFalse( + kafkaSource + .getConfiguration() + .get( + ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + .booleanType() + .noDefaultValue())); + } + + @Test + public void testEnableCommitOnCheckpointWithoutGroupId() { + final IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> + getBasicBuilder() + .setProperty( + KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT + .key(), + "true") + .build()); + MatcherAssert.assertThat( + exception.getMessage(), + CoreMatchers.containsString( + "Property group.id is required when offset commit is enabled")); + } + + @Test + public void testEnableAutoCommitWithoutGroupId() { + final IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> + getBasicBuilder() + .setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + .build()); + MatcherAssert.assertThat( + exception.getMessage(), + CoreMatchers.containsString( + "Property group.id is required when offset commit is enabled")); + } + + @Test + public void testDisableOffsetCommitWithoutGroupId() { + getBasicBuilder() + .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false") + .build(); + getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").build(); + } + + private KafkaSourceBuilder<String> getBasicBuilder() { + return new KafkaSourceBuilder<String>() .setBootstrapServers("testServer") .setTopics("topic") .setDeserializer( @@ -45,7 +126,23 @@ public class KafkaSourceBuilderTest extends TestLogger { ConsumerRecord<byte[], byte[]> record, Collector<String> collector) throws Exception {} - }) - .build(); + }); + } + + @SuppressWarnings({"unchecked", "SameParameterValue"}) + private <T extends Throwable> T assertThrows(Class<T> exceptionClass, Runnable runnable) { + try { + runnable.run(); + } catch (Throwable t) { + if (exceptionClass.isInstance(t)) { + return (T) t; + } + throw new AssertionError( + String.format( + "Expected %s to be thrown, but actually got %s", + exceptionClass, t.getClass())); + } + throw new AssertionError( + String.format("Expected %s to be thrown, but nothing was thrown", exceptionClass)); } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java index 593a974..a44993e 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java @@ -107,6 +107,25 @@ public class KafkaSourceITCase { executeAndVerify(env, stream); } + @Test + public void testBasicReadWithoutGroupId() throws Exception { + KafkaSource<PartitionAndValue> source = + KafkaSource.<PartitionAndValue>builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + .setDeserializer(new TestingKafkaRecordDeserializer()) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(OffsetsInitializer.latest()) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream<PartitionAndValue> stream = + env.fromSource( + source, WatermarkStrategy.noWatermarks(), "testBasicReadWithoutGroupId"); + executeAndVerify(env, stream); + } + // ----------------- private static class PartitionAndValue implements Serializable { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java index 35cd003..637c8f8 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer; @@ -50,6 +51,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.function.Supplier; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; @@ -221,6 +223,28 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp } } + @Test + public void testDisableOffsetCommit() throws Exception { + final Properties properties = new Properties(); + properties.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false"); + try (KafkaSourceReader<Integer> reader = + (KafkaSourceReader<Integer>) + createReader(Boundedness.CONTINUOUS_UNBOUNDED, properties)) { + reader.addSplits( + getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED)); + ValidatingSourceOutput output = new ValidatingSourceOutput(); + long checkpointId = 0; + do { + checkpointId++; + reader.pollNext(output); + // Create a checkpoint for each message consumption, but not complete them. + reader.snapshotState(checkpointId); + // Offsets to commit should be always empty because offset commit is disabled + assertEquals(0, reader.getOffsetsToCommit().size()); + } while (output.count() < TOTAL_NUM_RECORDS); + } + } + // ------------------------------------------ @Override @@ -256,6 +280,13 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp private SourceReader<Integer, KafkaPartitionSplit> createReader( Boundedness boundedness, String groupId) { + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return createReader(boundedness, properties); + } + + private SourceReader<Integer, KafkaPartitionSplit> createReader( + Boundedness boundedness, Properties props) { KafkaSourceBuilder<Integer> builder = KafkaSource.<Integer>builder() .setClientIdPrefix("KafkaSourceReaderTest") @@ -265,9 +296,8 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp .setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaSourceTestEnv.brokerConnectionStrings) - .setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - + .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .setProperties(props); if (boundedness == Boundedness.BOUNDED) { builder.setBounded(OffsetsInitializer.latest()); }
