This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 55fc7fa95aa [FLINK-28475][connector/kafka] Fix Kafka source could not
stop with stopping offset = 0
55fc7fa95aa is described below
commit 55fc7fa95aae4304a8694c03adf79d0a609918b1
Author: LeoZhang <[email protected]>
AuthorDate: Sat Jul 9 22:07:12 2022 +0800
[FLINK-28475][connector/kafka] Fix Kafka source could not stop with
stopping offset = 0
Co-authored-by: Mason Chen <[email protected]>
This closes #20234
---
.../kafka/source/split/KafkaPartitionSplit.java | 2 +-
.../reader/KafkaPartitionSplitReaderTest.java | 11 ++++-
.../split/KafkaPartitionSplitSerializerTest.java | 54 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
index c213683ae9e..8c2a1fd1ffd 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
@@ -79,7 +79,7 @@ public class KafkaPartitionSplit implements SourceSplit {
}
public Optional<Long> getStoppingOffset() {
- return stoppingOffset > 0
+ return stoppingOffset >= 0
|| stoppingOffset == LATEST_OFFSET
|| stoppingOffset == COMMITTED_OFFSET
? Optional.of(stoppingOffset)
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index c7df7945c44..7263bd02885 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -73,6 +73,7 @@ public class KafkaPartitionSplitReaderTest {
private static final int NUM_SUBTASKS = 3;
private static final String TOPIC1 = "topic1";
private static final String TOPIC2 = "topic2";
+ private static final String TOPIC3 = "topic3";
private static Map<Integer, Map<String, KafkaPartitionSplit>>
splitsByOwners;
private static Map<TopicPartition, Long> earliestOffsets;
@@ -84,6 +85,7 @@ public class KafkaPartitionSplitReaderTest {
KafkaSourceTestEnv.setup();
KafkaSourceTestEnv.setupTopic(TOPIC1, true, true,
KafkaSourceTestEnv::getRecordsForTopic);
KafkaSourceTestEnv.setupTopic(TOPIC2, true, true,
KafkaSourceTestEnv::getRecordsForTopic);
+ KafkaSourceTestEnv.createTestTopic(TOPIC3);
splitsByOwners =
KafkaSourceTestEnv.getSplitsByOwners(Arrays.asList(TOPIC1,
TOPIC2), NUM_SUBTASKS);
earliestOffsets =
@@ -245,11 +247,18 @@ public class KafkaPartitionSplitReaderTest {
new TopicPartition(TOPIC2, 0),
KafkaPartitionSplit.LATEST_OFFSET,
KafkaPartitionSplit.LATEST_OFFSET);
- reader.handleSplitsChanges(new
SplitsAddition<>(Arrays.asList(normalSplit, emptySplit)));
+ final KafkaPartitionSplit emptySplitWithZeroStoppingOffset =
+ new KafkaPartitionSplit(new TopicPartition(TOPIC3, 0), 0, 0);
+
+ reader.handleSplitsChanges(
+ new SplitsAddition<>(
+ Arrays.asList(normalSplit, emptySplit,
emptySplitWithZeroStoppingOffset)));
// Fetch and check empty splits is added to finished splits
RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>
recordsWithSplitIds = reader.fetch();
assertThat(recordsWithSplitIds.finishedSplits()).contains(emptySplit.splitId());
+ assertThat(recordsWithSplitIds.finishedSplits())
+ .contains(emptySplitWithZeroStoppingOffset.splitId());
// Assign another valid split to avoid consumer.poll() blocking
final KafkaPartitionSplit anotherNormalSplit =
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
new file mode 100644
index 00000000000..4ca5c9cb663
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.split;
+
+import org.apache.kafka.common.TopicPartition;
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KafkaPartitionSplitSerializer}. */
+public class KafkaPartitionSplitSerializerTest {
+
+ @Test
+ public void testSerializer() throws IOException {
+ String topic = "topic";
+ Long offsetZero = 0L;
+ Long normalOffset = 1L;
+ TopicPartition topicPartition = new TopicPartition(topic, 1);
+ List<Long> stoppingOffsets =
+ Lists.newArrayList(
+ KafkaPartitionSplit.COMMITTED_OFFSET,
+ KafkaPartitionSplit.LATEST_OFFSET,
+ offsetZero,
+ normalOffset);
+ KafkaPartitionSplitSerializer splitSerializer = new
KafkaPartitionSplitSerializer();
+ for (Long stoppingOffset : stoppingOffsets) {
+ KafkaPartitionSplit kafkaPartitionSplit =
+ new KafkaPartitionSplit(topicPartition, 0, stoppingOffset);
+ byte[] serialize = splitSerializer.serialize(kafkaPartitionSplit);
+ KafkaPartitionSplit deserializeSplit =
+ splitSerializer.deserialize(splitSerializer.getVersion(),
serialize);
+ assertThat(deserializeSplit).isEqualTo(kafkaPartitionSplit);
+ }
+ }
+}