This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7c34ed414dcd069bde0d8de36ff049d39f2a618
Author: Qingsheng Ren <[email protected]>
AuthorDate: Tue Sep 14 18:17:56 2021 +0800

    [FLINK-24277][connector/kafka] Add OffsetsInitializerValidator interface 
for validating offset initializer in KafkaSourceBuilder
    
    (cherry picked from commit 2da73edba95685537040305f30ee9d6dfd8d6c02)
---
 .../connector/kafka/source/KafkaSourceBuilder.java |  8 ++++
 .../initializer/OffsetsInitializerValidator.java   | 39 ++++++++++++++++
 .../ReaderHandledOffsetsInitializer.java           | 17 ++++++-
 .../initializer/SpecifiedOffsetsInitializer.java   | 22 ++++++++-
 .../kafka/source/KafkaSourceBuilderTest.java       | 53 ++++++++++++++++++++++
 5 files changed, 137 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index eb93683..d105cd8 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.connector.source.Boundedness;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
 import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 
@@ -495,6 +496,13 @@ public class KafkaSourceBuilder<OUT> {
                 String.format(
                         "Property %s is required when offset commit is 
enabled",
                         ConsumerConfig.GROUP_ID_CONFIG));
+        // Check offsets initializers
+        if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) 
{
+            ((OffsetsInitializerValidator) 
startingOffsetsInitializer).validate(props);
+        }
+        if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) 
{
+            ((OffsetsInitializerValidator) 
stoppingOffsetsInitializer).validate(props);
+        }
     }
 
     private boolean offsetCommitEnabledManually() {
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java
new file mode 100644
index 0000000..c198107
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator.initializer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Properties;
+
+/**
+ * Interface for validating {@link OffsetsInitializer} with properties from 
{@link
+ * org.apache.flink.connector.kafka.source.KafkaSource}.
+ */
+@Internal
+public interface OffsetsInitializerValidator {
+
+    /**
+     * Validate offsets initializer with properties of Kafka source.
+     *
+     * @param kafkaSourceProperties Properties of Kafka source
+     * @throws IllegalStateException if validation fails
+     */
+    void validate(Properties kafkaSourceProperties) throws 
IllegalStateException;
+}
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
index 1773d63..026320d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
@@ -20,12 +20,16 @@ package 
org.apache.flink.connector.kafka.source.enumerator.initializer;
 
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A initializer that initialize the partitions to the earliest / latest / 
last-committed offsets.
@@ -34,7 +38,7 @@ import java.util.Map;
  *
  * <p>Package private and should be instantiated via {@link 
OffsetsInitializer}.
  */
-class ReaderHandledOffsetsInitializer implements OffsetsInitializer {
+class ReaderHandledOffsetsInitializer implements OffsetsInitializer, 
OffsetsInitializerValidator {
     private static final long serialVersionUID = 172938052008787981L;
     private final long startingOffset;
     private final OffsetResetStrategy offsetResetStrategy;
@@ -65,4 +69,15 @@ class ReaderHandledOffsetsInitializer implements 
OffsetsInitializer {
     public OffsetResetStrategy getAutoOffsetResetStrategy() {
         return offsetResetStrategy;
     }
+
+    @Override
+    public void validate(Properties kafkaSourceProperties) {
+        if (startingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) {
+            checkState(
+                    
kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+                    String.format(
+                            "Property %s is required when using committed 
offset for offsets initializer",
+                            ConsumerConfig.GROUP_ID_CONFIG));
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
index d3335de..5766a5f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.initializer;
 
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
@@ -27,6 +30,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An implementation of {@link OffsetsInitializer} which initializes the 
offsets of the partition
@@ -34,7 +40,7 @@ import java.util.Map;
  *
  * <p>Package private and should be instantiated via {@link 
OffsetsInitializer}.
  */
-class SpecifiedOffsetsInitializer implements OffsetsInitializer {
+class SpecifiedOffsetsInitializer implements OffsetsInitializer, 
OffsetsInitializerValidator {
     private static final long serialVersionUID = 1649702397250402877L;
     private final Map<TopicPartition, Long> initialOffsets;
     private final OffsetResetStrategy offsetResetStrategy;
@@ -85,4 +91,18 @@ class SpecifiedOffsetsInitializer implements 
OffsetsInitializer {
     public OffsetResetStrategy getAutoOffsetResetStrategy() {
         return offsetResetStrategy;
     }
+
+    @Override
+    public void validate(Properties kafkaSourceProperties) {
+        initialOffsets.forEach(
+                (tp, offset) -> {
+                    if (offset == KafkaPartitionSplit.COMMITTED_OFFSET) {
+                        checkState(
+                                
kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+                                String.format(
+                                        "Property %s is required because 
partition %s is initialized with committed offset",
+                                        ConsumerConfig.GROUP_ID_CONFIG, tp));
+                    }
+                });
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index e13514a..4358e8c 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -18,16 +18,22 @@
 package org.apache.flink.connector.kafka.source;
 
 import org.apache.flink.configuration.ConfigOptions;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /** Tests for {@link KafkaSourceBuilder}. */
 public class KafkaSourceBuilderTest extends TestLogger {
 
@@ -108,6 +114,53 @@ public class KafkaSourceBuilderTest extends TestLogger {
         
getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false").build();
     }
 
+    @Test
+    public void testUsingCommittedOffsetsInitializerWithoutGroupId() {
+        // Using OffsetsInitializer#committedOffsets as starting offsets
+        final IllegalStateException startingOffsetException =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        
.setStartingOffsets(OffsetsInitializer.committedOffsets())
+                                        .build());
+        MatcherAssert.assertThat(
+                startingOffsetException.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when using committed 
offset for offsets initializer"));
+
+        // Using OffsetsInitializer#committedOffsets as stopping offsets
+        final IllegalStateException stoppingOffsetException =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        
.setBounded(OffsetsInitializer.committedOffsets())
+                                        .build());
+        MatcherAssert.assertThat(
+                stoppingOffsetException.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when using committed 
offset for offsets initializer"));
+
+        // Using OffsetsInitializer#offsets to manually specify committed 
offset as starting offset
+        final IllegalStateException specificStartingOffsetException =
+                assertThrows(
+                        IllegalStateException.class,
+                        () -> {
+                            final Map<TopicPartition, Long> offsetMap = new 
HashMap<>();
+                            offsetMap.put(
+                                    new TopicPartition("topic", 0),
+                                    KafkaPartitionSplit.COMMITTED_OFFSET);
+                            getBasicBuilder()
+                                    
.setStartingOffsets(OffsetsInitializer.offsets(offsetMap))
+                                    .build();
+                        });
+        MatcherAssert.assertThat(
+                specificStartingOffsetException.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required because partition 
topic-0 is initialized with committed offset"));
+    }
+
     private KafkaSourceBuilder<String> getBasicBuilder() {
         return new KafkaSourceBuilder<String>()
                 .setBootstrapServers("testServer")

Reply via email to