This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f0bd873309c3b0a13edb5354912ffdb1169de5b4 Author: Qingsheng Ren <[email protected]> AuthorDate: Tue Sep 14 15:22:00 2021 +0800 [FLINK-24277][connector/kafka] Add configuration for committing offset on checkpoint and disable it if group ID is not specified (cherry picked from commit ca8bff231aed2412f579d0a4e446e9a6bee42581) --- docs/content/docs/connectors/datastream/kafka.md | 1 + .../flink/connector/kafka/source/KafkaSource.java | 5 + .../connector/kafka/source/KafkaSourceBuilder.java | 35 ++++++- .../connector/kafka/source/KafkaSourceOptions.java | 6 ++ .../source/reader/KafkaPartitionSplitReader.java | 24 ++--- .../kafka/source/reader/KafkaSourceReader.java | 17 ++++ .../kafka/source/KafkaSourceBuilderTest.java | 103 ++++++++++++++++++++- .../connector/kafka/source/KafkaSourceITCase.java | 19 ++++ .../kafka/source/reader/KafkaSourceReaderTest.java | 44 ++++++++- 9 files changed, 233 insertions(+), 21 deletions(-) diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md index 8fcd023..cab25dc 100644 --- a/docs/content/docs/connectors/datastream/kafka.md +++ b/docs/content/docs/connectors/datastream/kafka.md @@ -154,6 +154,7 @@ KafkaSource has following options for configuration: below for more details. - ```register.consumer.metrics``` specifies whether to register metrics of KafkaConsumer in Flink metric group +- ```commit.offsets.on.checkpoint``` specifies whether to commit consuming offsets to Kafka brokers on checkpoint For configurations of KafkaConsumer, you can refer to <a href="http://kafka.apache.org/documentation/#consumerconfigs">Apache Kafka documentation</a> diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index af0013b..d1219c0 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -214,4 +214,9 @@ public class KafkaSource<OUT> props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key))); return config; } + + @VisibleForTesting + Configuration getConfiguration() { + return toConfiguration(props); + } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index cd286ed..eb93683 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -41,6 +41,7 @@ import java.util.Set; import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * The @builder class for {@link KafkaSource} to make it easier for the users to construct a {@link @@ -429,8 +430,12 @@ public class KafkaSourceBuilder<OUT> { ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), true); - maybeOverride( - ConsumerConfig.GROUP_ID_CONFIG, "KafkaSource-" + new Random().nextLong(), false); + if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + LOG.warn( + "Offset commit on checkpoint is disabled because {} is not specified", + ConsumerConfig.GROUP_ID_CONFIG); + maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false); + } maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); maybeOverride( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, @@ -443,10 +448,13 @@ public class KafkaSourceBuilder<OUT> { "-1", boundedness == Boundedness.BOUNDED); - // If the client id prefix is not set, reuse the consumer group id as the client id prefix. + // If the client id prefix is not set, reuse the consumer group id as the client id prefix, + // or generate a random string if consumer group id is not specified. maybeOverride( KafkaSourceOptions.CLIENT_ID_PREFIX.key(), - props.getProperty(ConsumerConfig.GROUP_ID_CONFIG), + props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) + ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) + : "KafkaSource-" + new Random().nextLong(), false); } @@ -481,5 +489,24 @@ public class KafkaSourceBuilder<OUT> { "No subscribe mode is specified, " + "should be one of topics, topic pattern and partition set."); checkNotNull(deserializationSchema, "Deserialization schema is required but not provided."); + // Check consumer group ID + checkState( + props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(), + String.format( + "Property %s is required when offset commit is enabled", + ConsumerConfig.GROUP_ID_CONFIG)); + } + + private boolean offsetCommitEnabledManually() { + boolean autoCommit = + props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + && Boolean.parseBoolean( + props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + boolean commitOnCheckpoint = + props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) + && Boolean.parseBoolean( + props.getProperty( + KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())); + return autoCommit || commitOnCheckpoint; } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java index e48804b..1a05833 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java @@ -48,6 +48,12 @@ public class KafkaSourceOptions { .withDescription( "Whether to register metrics of KafkaConsumer into Flink metric group"); + public static final ConfigOption<Boolean> COMMIT_OFFSETS_ON_CHECKPOINT = + ConfigOptions.key("commit.offsets.on.checkpoint") + .booleanType() + .defaultValue(true) + .withDescription("Whether to commit consuming offset on checkpoint."); + @SuppressWarnings("unchecked") public static <T> T getOption( Properties props, ConfigOption<?> configOption, Function<String, T> parser) { diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index e13e10e..a5d81fa 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -340,17 +340,19 @@ public class KafkaPartitionSplitReader<T> Set<TopicPartition> partitionsStoppingAtCommitted) { Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest); stoppingOffsets.putAll(endOffset); - consumer.committed(partitionsStoppingAtCommitted) - .forEach( - (tp, offsetAndMetadata) -> { - Preconditions.checkNotNull( - offsetAndMetadata, - String.format( - "Partition %s should stop at committed offset. " - + "But there is no committed offset of this partition for group %s", - tp, groupId)); - stoppingOffsets.put(tp, offsetAndMetadata.offset()); - }); + if (!partitionsStoppingAtCommitted.isEmpty()) { + consumer.committed(partitionsStoppingAtCommitted) + .forEach( + (tp, offsetAndMetadata) -> { + Preconditions.checkNotNull( + offsetAndMetadata, + String.format( + "Partition %s should stop at committed offset. " + + "But there is no committed offset of this partition for group %s", + tp, groupId)); + stoppingOffsets.put(tp, offsetAndMetadata.offset()); + }); + } } private void removeEmptySplits() { diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java index 79c8064..08f82b0 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; @@ -55,6 +56,7 @@ public class KafkaSourceReader<T> private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit; private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits; private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics; + private final boolean commitOffsetsOnCheckpoint; public KafkaSourceReader( FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue, @@ -67,6 +69,13 @@ public class KafkaSourceReader<T> this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.offsetsOfFinishedSplits = new ConcurrentHashMap<>(); this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics; + this.commitOffsetsOnCheckpoint = + config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT); + if (!commitOffsetsOnCheckpoint) { + LOG.warn( + "Offset commit on checkpoint is disabled. " + + "Consuming offset will not be reported back to Kafka cluster."); + } } @Override @@ -84,6 +93,10 @@ public class KafkaSourceReader<T> @Override public List<KafkaPartitionSplit> snapshotState(long checkpointId) { List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId); + if (!commitOffsetsOnCheckpoint) { + return splits; + } + if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) { offsetsToCommit.put(checkpointId, Collections.emptyMap()); } else { @@ -108,6 +121,10 @@ public class KafkaSourceReader<T> @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.debug("Committing offsets for checkpoint {}", checkpointId); + if (!commitOffsetsOnCheckpoint) { + return; + } + ((KafkaSourceFetcherManager<T>) splitFetcherManager) .commitOffsets( offsetsToCommit.get(checkpointId), diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java index 6624e4d..e13514a 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java @@ -17,22 +17,119 @@ package org.apache.flink.connector.kafka.source; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.util.TestLogger; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; import org.junit.Test; /** Tests for {@link KafkaSourceBuilder}. */ public class KafkaSourceBuilderTest extends TestLogger { @Test + public void testBuildSourceWithGroupId() { + final KafkaSource<String> kafkaSource = getBasicBuilder().setGroupId("groupId").build(); + // Commit on checkpoint should be enabled by default + Assert.assertTrue( + kafkaSource + .getConfiguration() + .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)); + // Auto commit should be disabled by default + Assert.assertFalse( + kafkaSource + .getConfiguration() + .get( + ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + .booleanType() + .noDefaultValue())); + } + + @Test public void testBuildSourceWithoutGroupId() { - new KafkaSourceBuilder<String>() + final KafkaSource<String> kafkaSource = getBasicBuilder().build(); + // Commit on checkpoint and auto commit should be disabled because group.id is not specified + Assert.assertFalse( + kafkaSource + .getConfiguration() + .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)); + Assert.assertFalse( + kafkaSource + .getConfiguration() + .get( + ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + .booleanType() + .noDefaultValue())); + } + + @Test + public void testEnableCommitOnCheckpointWithoutGroupId() { + final IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> + getBasicBuilder() + .setProperty( + KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT + .key(), + "true") + .build()); + MatcherAssert.assertThat( + exception.getMessage(), + CoreMatchers.containsString( + "Property group.id is required when offset commit is enabled")); + } + + @Test + public void testEnableAutoCommitWithoutGroupId() { + final IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> + getBasicBuilder() + .setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + .build()); + MatcherAssert.assertThat( + exception.getMessage(), + CoreMatchers.containsString( + "Property group.id is required when offset commit is enabled")); + } + + @Test + public void testDisableOffsetCommitWithoutGroupId() { + getBasicBuilder() + .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false") + .build(); + getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").build(); + } + + private KafkaSourceBuilder<String> getBasicBuilder() { + return new KafkaSourceBuilder<String>() .setBootstrapServers("testServer") .setTopics("topic") .setDeserializer( - KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) - .build(); + KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)); + } + + @SuppressWarnings({"unchecked", "SameParameterValue"}) + private <T extends Throwable> T assertThrows(Class<T> exceptionClass, Runnable runnable) { + try { + runnable.run(); + } catch (Throwable t) { + if (exceptionClass.isInstance(t)) { + return (T) t; + } + throw new AssertionError( + String.format( + "Expected %s to be thrown, but actually got %s", + exceptionClass, t.getClass())); + } + throw new AssertionError( + String.format("Expected %s to be thrown, but nothing was thrown", exceptionClass)); } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java index 89ce39c..ea37d8d 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java @@ -197,6 +197,25 @@ public class KafkaSourceITCase { executeAndVerify(env, stream); } + @Test + public void testBasicReadWithoutGroupId() throws Exception { + KafkaSource<PartitionAndValue> source = + KafkaSource.<PartitionAndValue>builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + .setDeserializer(new TestingKafkaRecordDeserializationSchema()) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(OffsetsInitializer.latest()) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream<PartitionAndValue> stream = + env.fromSource( + source, WatermarkStrategy.noWatermarks(), "testBasicReadWithoutGroupId"); + executeAndVerify(env, stream); + } + // ----------------- private static class PartitionAndValue implements Serializable { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java index 889afa4..0304315 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv; import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -60,6 +61,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.function.Consumer; import java.util.function.Supplier; @@ -241,6 +243,32 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp } @Test + public void testDisableOffsetCommit() throws Exception { + final Properties properties = new Properties(); + properties.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false"); + try (KafkaSourceReader<Integer> reader = + (KafkaSourceReader<Integer>) + createReader( + Boundedness.CONTINUOUS_UNBOUNDED, + new TestingReaderContext(), + (ignore) -> {}, + properties)) { + reader.addSplits( + getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED)); + ValidatingSourceOutput output = new ValidatingSourceOutput(); + long checkpointId = 0; + do { + checkpointId++; + reader.pollNext(output); + // Create a checkpoint for each message consumption, but not complete them. + reader.snapshotState(checkpointId); + // Offsets to commit should be always empty because offset commit is disabled + assertEquals(0, reader.getOffsetsToCommit().size()); + } while (output.count() < TOTAL_NUM_RECORDS); + } + } + + @Test public void testKafkaSourceMetrics() throws Exception { final MetricListener metricListener = new MetricListener(); final String groupId = "testKafkaSourceMetrics"; @@ -397,6 +425,17 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp SourceReaderContext context, Consumer<Collection<String>> splitFinishedHook) throws Exception { + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return createReader(boundedness, context, splitFinishedHook, properties); + } + + private SourceReader<Integer, KafkaPartitionSplit> createReader( + Boundedness boundedness, + SourceReaderContext context, + Consumer<Collection<String>> splitFinishedHook, + Properties props) + throws Exception { KafkaSourceBuilder<Integer> builder = KafkaSource.<Integer>builder() .setClientIdPrefix("KafkaSourceReaderTest") @@ -407,9 +446,8 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp .setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaSourceTestEnv.brokerConnectionStrings) - .setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - + .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .setProperties(props); if (boundedness == Boundedness.BOUNDED) { builder.setBounded(OffsetsInitializer.latest()); }
