This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 55fc7fa95aa [FLINK-28475][connector/kafka] Fix Kafka source could not 
stop with stopping offset = 0
55fc7fa95aa is described below

commit 55fc7fa95aae4304a8694c03adf79d0a609918b1
Author: LeoZhang <[email protected]>
AuthorDate: Sat Jul 9 22:07:12 2022 +0800

    [FLINK-28475][connector/kafka] Fix Kafka source could not stop with 
stopping offset = 0
    
    Co-authored-by: Mason Chen <[email protected]>
    
    This closes #20234
---
 .../kafka/source/split/KafkaPartitionSplit.java    |  2 +-
 .../reader/KafkaPartitionSplitReaderTest.java      | 11 ++++-
 .../split/KafkaPartitionSplitSerializerTest.java   | 54 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
index c213683ae9e..8c2a1fd1ffd 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
@@ -79,7 +79,7 @@ public class KafkaPartitionSplit implements SourceSplit {
     }
 
     public Optional<Long> getStoppingOffset() {
-        return stoppingOffset > 0
+        return stoppingOffset >= 0
                         || stoppingOffset == LATEST_OFFSET
                         || stoppingOffset == COMMITTED_OFFSET
                 ? Optional.of(stoppingOffset)
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index c7df7945c44..7263bd02885 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -73,6 +73,7 @@ public class KafkaPartitionSplitReaderTest {
     private static final int NUM_SUBTASKS = 3;
     private static final String TOPIC1 = "topic1";
     private static final String TOPIC2 = "topic2";
+    private static final String TOPIC3 = "topic3";
 
     private static Map<Integer, Map<String, KafkaPartitionSplit>> 
splitsByOwners;
     private static Map<TopicPartition, Long> earliestOffsets;
@@ -84,6 +85,7 @@ public class KafkaPartitionSplitReaderTest {
         KafkaSourceTestEnv.setup();
         KafkaSourceTestEnv.setupTopic(TOPIC1, true, true, 
KafkaSourceTestEnv::getRecordsForTopic);
         KafkaSourceTestEnv.setupTopic(TOPIC2, true, true, 
KafkaSourceTestEnv::getRecordsForTopic);
+        KafkaSourceTestEnv.createTestTopic(TOPIC3);
         splitsByOwners =
                 KafkaSourceTestEnv.getSplitsByOwners(Arrays.asList(TOPIC1, 
TOPIC2), NUM_SUBTASKS);
         earliestOffsets =
@@ -245,11 +247,18 @@ public class KafkaPartitionSplitReaderTest {
                         new TopicPartition(TOPIC2, 0),
                         KafkaPartitionSplit.LATEST_OFFSET,
                         KafkaPartitionSplit.LATEST_OFFSET);
-        reader.handleSplitsChanges(new 
SplitsAddition<>(Arrays.asList(normalSplit, emptySplit)));
+        final KafkaPartitionSplit emptySplitWithZeroStoppingOffset =
+                new KafkaPartitionSplit(new TopicPartition(TOPIC3, 0), 0, 0);
+
+        reader.handleSplitsChanges(
+                new SplitsAddition<>(
+                        Arrays.asList(normalSplit, emptySplit, 
emptySplitWithZeroStoppingOffset)));
 
         // Fetch and check empty splits is added to finished splits
         RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> 
recordsWithSplitIds = reader.fetch();
         
assertThat(recordsWithSplitIds.finishedSplits()).contains(emptySplit.splitId());
+        assertThat(recordsWithSplitIds.finishedSplits())
+                .contains(emptySplitWithZeroStoppingOffset.splitId());
 
         // Assign another valid split to avoid consumer.poll() blocking
         final KafkaPartitionSplit anotherNormalSplit =
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
new file mode 100644
index 00000000000..4ca5c9cb663
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.split;
+
+import org.apache.kafka.common.TopicPartition;
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KafkaPartitionSplitSerializer}. */
+public class KafkaPartitionSplitSerializerTest {
+
+    @Test
+    public void testSerializer() throws IOException {
+        String topic = "topic";
+        Long offsetZero = 0L;
+        Long normalOffset = 1L;
+        TopicPartition topicPartition = new TopicPartition(topic, 1);
+        List<Long> stoppingOffsets =
+                Lists.newArrayList(
+                        KafkaPartitionSplit.COMMITTED_OFFSET,
+                        KafkaPartitionSplit.LATEST_OFFSET,
+                        offsetZero,
+                        normalOffset);
+        KafkaPartitionSplitSerializer splitSerializer = new 
KafkaPartitionSplitSerializer();
+        for (Long stoppingOffset : stoppingOffsets) {
+            KafkaPartitionSplit kafkaPartitionSplit =
+                    new KafkaPartitionSplit(topicPartition, 0, stoppingOffset);
+            byte[] serialize = splitSerializer.serialize(kafkaPartitionSplit);
+            KafkaPartitionSplit deserializeSplit =
+                    splitSerializer.deserialize(splitSerializer.getVersion(), 
serialize);
+            assertThat(deserializeSplit).isEqualTo(kafkaPartitionSplit);
+        }
+    }
+}

Reply via email to