This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c7c34ed414dcd069bde0d8de36ff049d39f2a618 Author: Qingsheng Ren <[email protected]> AuthorDate: Tue Sep 14 18:17:56 2021 +0800 [FLINK-24277][connector/kafka] Add OffsetsInitializerValidator interface for validating offset initializer in KafkaSourceBuilder (cherry picked from commit 2da73edba95685537040305f30ee9d6dfd8d6c02) --- .../connector/kafka/source/KafkaSourceBuilder.java | 8 ++++ .../initializer/OffsetsInitializerValidator.java | 39 ++++++++++++++++ .../ReaderHandledOffsetsInitializer.java | 17 ++++++- .../initializer/SpecifiedOffsetsInitializer.java | 22 ++++++++- .../kafka/source/KafkaSourceBuilderTest.java | 53 ++++++++++++++++++++++ 5 files changed, 137 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index eb93683..d105cd8 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; @@ -495,6 +496,13 @@ public class KafkaSourceBuilder<OUT> { String.format( "Property %s is required when offset commit is enabled", ConsumerConfig.GROUP_ID_CONFIG)); + // Check offsets initializers + if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) { + ((OffsetsInitializerValidator) startingOffsetsInitializer).validate(props); + } + if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) { + ((OffsetsInitializerValidator) stoppingOffsetsInitializer).validate(props); + } } private boolean offsetCommitEnabledManually() { diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java new file mode 100644 index 0000000..c198107 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source.enumerator.initializer; + +import org.apache.flink.annotation.Internal; + +import java.util.Properties; + +/** + * Interface for validating {@link OffsetsInitializer} with properties from {@link + * org.apache.flink.connector.kafka.source.KafkaSource}. + */ +@Internal +public interface OffsetsInitializerValidator { + + /** + * Validate offsets initializer with properties of Kafka source. + * + * @param kafkaSourceProperties Properties of Kafka source + * @throws IllegalStateException if validation fails + */ + void validate(Properties kafkaSourceProperties) throws IllegalStateException; +} diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java index 1773d63..026320d 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java @@ -20,12 +20,16 @@ package org.apache.flink.connector.kafka.source.enumerator.initializer; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkState; /** * A initializer that initialize the partitions to the earliest / latest / last-committed offsets. @@ -34,7 +38,7 @@ import java.util.Map; * * <p>Package private and should be instantiated via {@link OffsetsInitializer}. */ -class ReaderHandledOffsetsInitializer implements OffsetsInitializer { +class ReaderHandledOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator { private static final long serialVersionUID = 172938052008787981L; private final long startingOffset; private final OffsetResetStrategy offsetResetStrategy; @@ -65,4 +69,15 @@ class ReaderHandledOffsetsInitializer implements OffsetsInitializer { public OffsetResetStrategy getAutoOffsetResetStrategy() { return offsetResetStrategy; } + + @Override + public void validate(Properties kafkaSourceProperties) { + if (startingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) { + checkState( + kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG), + String.format( + "Property %s is required when using committed offset for offsets initializer", + ConsumerConfig.GROUP_ID_CONFIG)); + } + } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java index d3335de..5766a5f 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java @@ -18,6 +18,9 @@ package org.apache.flink.connector.kafka.source.enumerator.initializer; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; + +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; @@ -27,6 +30,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkState; /** * An implementation of {@link OffsetsInitializer} which initializes the offsets of the partition @@ -34,7 +40,7 @@ import java.util.Map; * * <p>Package private and should be instantiated via {@link OffsetsInitializer}. */ -class SpecifiedOffsetsInitializer implements OffsetsInitializer { +class SpecifiedOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator { private static final long serialVersionUID = 1649702397250402877L; private final Map<TopicPartition, Long> initialOffsets; private final OffsetResetStrategy offsetResetStrategy; @@ -85,4 +91,18 @@ class SpecifiedOffsetsInitializer implements OffsetsInitializer { public OffsetResetStrategy getAutoOffsetResetStrategy() { return offsetResetStrategy; } + + @Override + public void validate(Properties kafkaSourceProperties) { + initialOffsets.forEach( + (tp, offset) -> { + if (offset == KafkaPartitionSplit.COMMITTED_OFFSET) { + checkState( + kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG), + String.format( + "Property %s is required because partition %s is initialized with committed offset", + ConsumerConfig.GROUP_ID_CONFIG, tp)); + } + }); + } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java index e13514a..4358e8c 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java @@ -18,16 +18,22 @@ package org.apache.flink.connector.kafka.source; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.util.TestLogger; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + /** Tests for {@link KafkaSourceBuilder}. */ public class KafkaSourceBuilderTest extends TestLogger { @@ -108,6 +114,53 @@ public class KafkaSourceBuilderTest extends TestLogger { getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").build(); } + @Test + public void testUsingCommittedOffsetsInitializerWithoutGroupId() { + // Using OffsetsInitializer#committedOffsets as starting offsets + final IllegalStateException startingOffsetException = + assertThrows( + IllegalStateException.class, + () -> + getBasicBuilder() + .setStartingOffsets(OffsetsInitializer.committedOffsets()) + .build()); + MatcherAssert.assertThat( + startingOffsetException.getMessage(), + CoreMatchers.containsString( + "Property group.id is required when using committed offset for offsets initializer")); + + // Using OffsetsInitializer#committedOffsets as stopping offsets + final IllegalStateException stoppingOffsetException = + assertThrows( + IllegalStateException.class, + () -> + getBasicBuilder() + .setBounded(OffsetsInitializer.committedOffsets()) + .build()); + MatcherAssert.assertThat( + stoppingOffsetException.getMessage(), + CoreMatchers.containsString( + "Property group.id is required when using committed offset for offsets initializer")); + + // Using OffsetsInitializer#offsets to manually specify committed offset as starting offset + final IllegalStateException specificStartingOffsetException = + assertThrows( + IllegalStateException.class, + () -> { + final Map<TopicPartition, Long> offsetMap = new HashMap<>(); + offsetMap.put( + new TopicPartition("topic", 0), + KafkaPartitionSplit.COMMITTED_OFFSET); + getBasicBuilder() + .setStartingOffsets(OffsetsInitializer.offsets(offsetMap)) + .build(); + }); + MatcherAssert.assertThat( + specificStartingOffsetException.getMessage(), + CoreMatchers.containsString( + "Property group.id is required because partition topic-0 is initialized with committed offset")); + } + private KafkaSourceBuilder<String> getBasicBuilder() { return new KafkaSourceBuilder<String>() .setBootstrapServers("testServer")
