This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new a4f83353fe6 [FLINK-27041][connector/kafka] Catch IllegalStateException
in KafkaPartitionSplitReader.fetch() to handle no valid partition case
a4f83353fe6 is described below
commit a4f83353fe623b3f09363e964d6a9aeb7af0bf11
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Jun 16 16:26:34 2022 +0800
[FLINK-27041][connector/kafka] Catch IllegalStateException in
KafkaPartitionSplitReader.fetch() to handle no valid partition case
This closes #19979.
---
.../source/reader/KafkaPartitionSplitReader.java | 29 ++++--
.../connector/kafka/source/KafkaSourceITCase.java | 112 +++++++++++++++++----
.../kafka/source/reader/KafkaSourceReaderTest.java | 34 +++++++
.../kafka/source/testutils/KafkaSourceTestEnv.java | 4 +
4 files changed, 148 insertions(+), 31 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index ebadef32fd2..8a4d408df24 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -95,9 +95,16 @@ public class KafkaPartitionSplitReader
ConsumerRecords<byte[], byte[]> consumerRecords;
try {
consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
- } catch (WakeupException we) {
- return new KafkaPartitionSplitRecords(
- ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+ } catch (WakeupException | IllegalStateException e) {
+ // IllegalStateException will be thrown if the consumer is not
assigned any partitions.
+ // This happens if all assigned partitions are invalid or empty
(starting offset >=
+ // stopping offset). We just mark empty partitions as finished and
return an empty
+ // record container, and this consumer will be closed by
SplitFetcherManager.
+ KafkaPartitionSplitRecords recordsBySplits =
+ new KafkaPartitionSplitRecords(
+ ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+ markEmptySplitsAsFinished(recordsBySplits);
+ return recordsBySplits;
}
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(consumerRecords,
kafkaSourceReaderMetrics);
@@ -128,12 +135,7 @@ public class KafkaPartitionSplitReader
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
}
- // Some splits are discovered as empty when handling split additions.
These splits should be
- // added to finished splits to clean up states in split fetcher and
source reader.
- if (!emptySplits.isEmpty()) {
- recordsBySplits.finishedSplits.addAll(emptySplits);
- emptySplits.clear();
- }
+ markEmptySplitsAsFinished(recordsBySplits);
// Unassign the partitions that has finished.
if (!finishedPartitions.isEmpty()) {
@@ -147,6 +149,15 @@ public class KafkaPartitionSplitReader
return recordsBySplits;
}
+ private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords
recordsBySplits) {
+ // Some splits are discovered as empty when handling split additions.
These splits should be
+ // added to finished splits to clean up states in split fetcher and
source reader.
+ if (!emptySplits.isEmpty()) {
+ recordsBySplits.finishedSplits.addAll(emptySplits);
+ emptySplits.clear();
+ }
+ }
+
@Override
public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit>
splitsChange) {
// Get all the partition assignments and stopping offsets.
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 1f740b59aeb..1ca071fbd72 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -77,6 +77,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Unite test class for {@link KafkaSource}. */
@@ -181,33 +182,36 @@ public class KafkaSourceITCase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
- final CloseableIterator<Integer> resultIterator =
+
+ try (CloseableIterator<Integer> resultIterator =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"testValueOnlyDeserializer")
- .executeAndCollect();
-
- AtomicInteger actualSum = new AtomicInteger();
- resultIterator.forEachRemaining(actualSum::addAndGet);
-
- // Calculate the actual sum of values
- // Values in a partition should start from partition ID, and end
with
- // (NUM_RECORDS_PER_PARTITION - 1)
- // e.g. Values in partition 5 should be {5, 6, 7, 8, 9}
- int expectedSum = 0;
- for (int partition = 0; partition <
KafkaSourceTestEnv.NUM_PARTITIONS; partition++) {
- for (int value = partition;
- value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
- value++) {
- expectedSum += value;
+ .executeAndCollect()) {
+ AtomicInteger actualSum = new AtomicInteger();
+ resultIterator.forEachRemaining(actualSum::addAndGet);
+
+ // Calculate the actual sum of values
+ // Values in a partition should start from partition ID, and
end with
+ // (NUM_RECORDS_PER_PARTITION - 1)
+ // e.g. Values in partition 5 should be {5, 6, 7, 8, 9}
+ int expectedSum = 0;
+ for (int partition = 0;
+ partition < KafkaSourceTestEnv.NUM_PARTITIONS;
+ partition++) {
+ for (int value = partition;
+ value <
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
+ value++) {
+ expectedSum += value;
+ }
}
- }
- // Since we have two topics, the expected sum value should be
doubled
- expectedSum *= 2;
+ // Since we have two topics, the expected sum value should be
doubled
+ expectedSum *= 2;
- assertEquals(expectedSum, actualSum.get());
+ assertEquals(expectedSum, actualSum.get());
+ }
}
@ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
@@ -304,6 +308,65 @@ public class KafkaSourceITCase {
});
env.execute();
}
+
+ @Test
+ public void testConsumingEmptyTopic() throws Throwable {
+ String emptyTopic = "emptyTopic-" + UUID.randomUUID();
+ KafkaSourceTestEnv.createTestTopic(emptyTopic, 3, 1);
+ KafkaSource<PartitionAndValue> source =
+ KafkaSource.<PartitionAndValue>builder()
+
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ .setTopics(emptyTopic)
+ .setGroupId("empty-topic-test")
+ .setDeserializer(new
TestingKafkaRecordDeserializationSchema(false))
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setBounded(OffsetsInitializer.latest())
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ try (CloseableIterator<PartitionAndValue> iterator =
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "testConsumingEmptyTopic")
+ .executeAndCollect()) {
+ assertFalse(iterator.hasNext());
+ }
+ }
+
+ @Test
+ public void testConsumingTopicWithEmptyPartitions() throws Throwable {
+ String topicWithEmptyPartitions = "topicWithEmptyPartitions-" +
UUID.randomUUID();
+ KafkaSourceTestEnv.createTestTopic(
+ topicWithEmptyPartitions,
KafkaSourceTestEnv.NUM_PARTITIONS, 1);
+ List<ProducerRecord<String, Integer>> records =
+
KafkaSourceTestEnv.getRecordsForTopicWithoutTimestamp(topicWithEmptyPartitions);
+ // Only keep records in partition 5
+ int partitionWithRecords = 5;
+ records.removeIf(record -> record.partition() !=
partitionWithRecords);
+ KafkaSourceTestEnv.produceToKafka(records);
+ KafkaSourceTestEnv.setupEarliestOffsets(
+ Collections.singletonList(
+ new TopicPartition(topicWithEmptyPartitions,
partitionWithRecords)));
+
+ KafkaSource<PartitionAndValue> source =
+ KafkaSource.<PartitionAndValue>builder()
+
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ .setTopics(topicWithEmptyPartitions)
+ .setGroupId("topic-with-empty-partition-test")
+ .setDeserializer(new
TestingKafkaRecordDeserializationSchema(false))
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setBounded(OffsetsInitializer.latest())
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ executeAndVerify(
+ env,
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "testConsumingTopicWithEmptyPartitions"));
+ }
}
/** Integration test based on connector testing framework. */
@@ -431,15 +494,20 @@ public class KafkaSourceITCase {
resultPerPartition
.computeIfAbsent(partitionAndValue.tp, ignored
-> new ArrayList<>())
.add(partitionAndValue.value));
+
+ // Expected elements from partition P should be an integer sequence
from P to
+ // NUM_RECORDS_PER_PARTITION.
resultPerPartition.forEach(
(tp, values) -> {
- int firstExpectedValue =
Integer.parseInt(tp.substring(tp.indexOf('-') + 1));
+ int firstExpectedValue =
+ Integer.parseInt(tp.substring(tp.lastIndexOf('-')
+ 1));
for (int i = 0; i < values.size(); i++) {
assertEquals(
firstExpectedValue + i,
(int) values.get(i),
String.format(
- "The %d-th value for partition %s
should be %d", i, tp, i));
+ "The %d-th value for partition %s
should be %d",
+ i, tp, firstExpectedValue + i));
}
});
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index e671520ad42..eac36739534 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -401,6 +401,40 @@ public class KafkaSourceReaderTest extends
SourceReaderTestBase<KafkaPartitionSp
}
}
+ @Test
+ public void testAssigningEmptySplitOnly() throws Exception {
+ // Empty split with no record
+ KafkaPartitionSplit emptySplit0 =
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC, 0), NUM_RECORDS_PER_SPLIT,
NUM_RECORDS_PER_SPLIT);
+ KafkaPartitionSplit emptySplit1 =
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC, 1), NUM_RECORDS_PER_SPLIT,
NUM_RECORDS_PER_SPLIT);
+ // Split finished hook for listening finished splits
+ final Set<String> finishedSplits = new HashSet<>();
+ final Consumer<Collection<String>> splitFinishedHook =
finishedSplits::addAll;
+
+ try (final KafkaSourceReader<Integer> reader =
+ (KafkaSourceReader<Integer>)
+ createReader(
+ Boundedness.BOUNDED,
+ "KafkaSourceReaderTestGroup",
+ new TestingReaderContext(),
+ splitFinishedHook)) {
+ reader.addSplits(Arrays.asList(emptySplit0, emptySplit1));
+ pollUntil(
+ reader,
+ new TestingReaderOutput<>(),
+ () -> reader.getNumAliveFetchers() == 0,
+ "The split fetcher did not exit before timeout.");
+ assertEquals(0, reader.getNumAliveFetchers());
+ assertEquals(2, finishedSplits.size());
+ assertTrue(
+ finishedSplits.containsAll(
+ Arrays.asList(emptySplit0.splitId(),
emptySplit1.splitId())));
+ }
+ }
+
// ------------------------------------------
@Override
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
index 286934b1e95..d8f93683d8c 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
@@ -220,6 +220,10 @@ public class KafkaSourceTestEnv extends KafkaTestBase {
public static void setupEarliestOffsets(String topic) throws Throwable {
// Delete some records to move the starting partition.
List<TopicPartition> partitions = getPartitionsForTopic(topic);
+ setupEarliestOffsets(partitions);
+ }
+
+ public static void setupEarliestOffsets(List<TopicPartition> partitions)
throws Throwable {
Map<TopicPartition, RecordsToDelete> toDelete = new HashMap<>();
getEarliestOffsets(partitions)
.forEach((tp, offset) -> toDelete.put(tp,
RecordsToDelete.beforeOffset(offset)));