This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new a4f83353fe6 [FLINK-27041][connector/kafka] Catch IllegalStateException 
in KafkaPartitionSplitReader.fetch() to handle no valid partition case
a4f83353fe6 is described below

commit a4f83353fe623b3f09363e964d6a9aeb7af0bf11
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Jun 16 16:26:34 2022 +0800

    [FLINK-27041][connector/kafka] Catch IllegalStateException in 
KafkaPartitionSplitReader.fetch() to handle no valid partition case
    
    This closes #19979.
---
 .../source/reader/KafkaPartitionSplitReader.java   |  29 ++++--
 .../connector/kafka/source/KafkaSourceITCase.java  | 112 +++++++++++++++++----
 .../kafka/source/reader/KafkaSourceReaderTest.java |  34 +++++++
 .../kafka/source/testutils/KafkaSourceTestEnv.java |   4 +
 4 files changed, 148 insertions(+), 31 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index ebadef32fd2..8a4d408df24 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -95,9 +95,16 @@ public class KafkaPartitionSplitReader
         ConsumerRecords<byte[], byte[]> consumerRecords;
         try {
             consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
-        } catch (WakeupException we) {
-            return new KafkaPartitionSplitRecords(
-                    ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+        } catch (WakeupException | IllegalStateException e) {
+            // IllegalStateException will be thrown if the consumer is not 
assigned any partitions.
+            // This happens if all assigned partitions are invalid or empty 
(starting offset >=
+            // stopping offset). We just mark empty partitions as finished and 
return an empty
+            // record container, and this consumer will be closed by 
SplitFetcherManager.
+            KafkaPartitionSplitRecords recordsBySplits =
+                    new KafkaPartitionSplitRecords(
+                            ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+            markEmptySplitsAsFinished(recordsBySplits);
+            return recordsBySplits;
         }
         KafkaPartitionSplitRecords recordsBySplits =
                 new KafkaPartitionSplitRecords(consumerRecords, 
kafkaSourceReaderMetrics);
@@ -128,12 +135,7 @@ public class KafkaPartitionSplitReader
             kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
         }
 
-        // Some splits are discovered as empty when handling split additions. 
These splits should be
-        // added to finished splits to clean up states in split fetcher and 
source reader.
-        if (!emptySplits.isEmpty()) {
-            recordsBySplits.finishedSplits.addAll(emptySplits);
-            emptySplits.clear();
-        }
+        markEmptySplitsAsFinished(recordsBySplits);
 
         // Unassign the partitions that has finished.
         if (!finishedPartitions.isEmpty()) {
@@ -147,6 +149,15 @@ public class KafkaPartitionSplitReader
         return recordsBySplits;
     }
 
+    private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords 
recordsBySplits) {
+        // Some splits are discovered as empty when handling split additions. 
These splits should be
+        // added to finished splits to clean up states in split fetcher and 
source reader.
+        if (!emptySplits.isEmpty()) {
+            recordsBySplits.finishedSplits.addAll(emptySplits);
+            emptySplits.clear();
+        }
+    }
+
     @Override
     public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> 
splitsChange) {
         // Get all the partition assignments and stopping offsets.
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 1f740b59aeb..1ca071fbd72 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -77,6 +77,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Unite test class for {@link KafkaSource}. */
@@ -181,33 +182,36 @@ public class KafkaSourceITCase {
 
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
             env.setParallelism(1);
-            final CloseableIterator<Integer> resultIterator =
+
+            try (CloseableIterator<Integer> resultIterator =
                     env.fromSource(
                                     source,
                                     WatermarkStrategy.noWatermarks(),
                                     "testValueOnlyDeserializer")
-                            .executeAndCollect();
-
-            AtomicInteger actualSum = new AtomicInteger();
-            resultIterator.forEachRemaining(actualSum::addAndGet);
-
-            // Calculate the actual sum of values
-            // Values in a partition should start from partition ID, and end 
with
-            // (NUM_RECORDS_PER_PARTITION - 1)
-            // e.g. Values in partition 5 should be {5, 6, 7, 8, 9}
-            int expectedSum = 0;
-            for (int partition = 0; partition < 
KafkaSourceTestEnv.NUM_PARTITIONS; partition++) {
-                for (int value = partition;
-                        value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
-                        value++) {
-                    expectedSum += value;
+                            .executeAndCollect()) {
+                AtomicInteger actualSum = new AtomicInteger();
+                resultIterator.forEachRemaining(actualSum::addAndGet);
+
+                // Calculate the actual sum of values
+                // Values in a partition should start from partition ID, and 
end with
+                // (NUM_RECORDS_PER_PARTITION - 1)
+                // e.g. Values in partition 5 should be {5, 6, 7, 8, 9}
+                int expectedSum = 0;
+                for (int partition = 0;
+                        partition < KafkaSourceTestEnv.NUM_PARTITIONS;
+                        partition++) {
+                    for (int value = partition;
+                            value < 
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
+                            value++) {
+                        expectedSum += value;
+                    }
                 }
-            }
 
-            // Since we have two topics, the expected sum value should be 
doubled
-            expectedSum *= 2;
+                // Since we have two topics, the expected sum value should be 
doubled
+                expectedSum *= 2;
 
-            assertEquals(expectedSum, actualSum.get());
+                assertEquals(expectedSum, actualSum.get());
+            }
         }
 
         @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
@@ -304,6 +308,65 @@ public class KafkaSourceITCase {
                             });
             env.execute();
         }
+
+        @Test
+        public void testConsumingEmptyTopic() throws Throwable {
+            String emptyTopic = "emptyTopic-" + UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(emptyTopic, 3, 1);
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(emptyTopic)
+                            .setGroupId("empty-topic-test")
+                            .setDeserializer(new 
TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            try (CloseableIterator<PartitionAndValue> iterator =
+                    env.fromSource(
+                                    source,
+                                    WatermarkStrategy.noWatermarks(),
+                                    "testConsumingEmptyTopic")
+                            .executeAndCollect()) {
+                assertFalse(iterator.hasNext());
+            }
+        }
+
+        @Test
+        public void testConsumingTopicWithEmptyPartitions() throws Throwable {
+            String topicWithEmptyPartitions = "topicWithEmptyPartitions-" + 
UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(
+                    topicWithEmptyPartitions, 
KafkaSourceTestEnv.NUM_PARTITIONS, 1);
+            List<ProducerRecord<String, Integer>> records =
+                    
KafkaSourceTestEnv.getRecordsForTopicWithoutTimestamp(topicWithEmptyPartitions);
+            // Only keep records in partition 5
+            int partitionWithRecords = 5;
+            records.removeIf(record -> record.partition() != 
partitionWithRecords);
+            KafkaSourceTestEnv.produceToKafka(records);
+            KafkaSourceTestEnv.setupEarliestOffsets(
+                    Collections.singletonList(
+                            new TopicPartition(topicWithEmptyPartitions, 
partitionWithRecords)));
+
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(topicWithEmptyPartitions)
+                            .setGroupId("topic-with-empty-partition-test")
+                            .setDeserializer(new 
TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(2);
+            executeAndVerify(
+                    env,
+                    env.fromSource(
+                            source,
+                            WatermarkStrategy.noWatermarks(),
+                            "testConsumingTopicWithEmptyPartitions"));
+        }
     }
 
     /** Integration test based on connector testing framework. */
@@ -431,15 +494,20 @@ public class KafkaSourceITCase {
                         resultPerPartition
                                 .computeIfAbsent(partitionAndValue.tp, ignored 
-> new ArrayList<>())
                                 .add(partitionAndValue.value));
+
+        // Expected elements from partition P should be an integer sequence 
from P to
+        // NUM_RECORDS_PER_PARTITION.
         resultPerPartition.forEach(
                 (tp, values) -> {
-                    int firstExpectedValue = 
Integer.parseInt(tp.substring(tp.indexOf('-') + 1));
+                    int firstExpectedValue =
+                            Integer.parseInt(tp.substring(tp.lastIndexOf('-') 
+ 1));
                     for (int i = 0; i < values.size(); i++) {
                         assertEquals(
                                 firstExpectedValue + i,
                                 (int) values.get(i),
                                 String.format(
-                                        "The %d-th value for partition %s 
should be %d", i, tp, i));
+                                        "The %d-th value for partition %s 
should be %d",
+                                        i, tp, firstExpectedValue + i));
                     }
                 });
     }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index e671520ad42..eac36739534 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -401,6 +401,40 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
         }
     }
 
+    @Test
+    public void testAssigningEmptySplitOnly() throws Exception {
+        // Empty split with no record
+        KafkaPartitionSplit emptySplit0 =
+                new KafkaPartitionSplit(
+                        new TopicPartition(TOPIC, 0), NUM_RECORDS_PER_SPLIT, 
NUM_RECORDS_PER_SPLIT);
+        KafkaPartitionSplit emptySplit1 =
+                new KafkaPartitionSplit(
+                        new TopicPartition(TOPIC, 1), NUM_RECORDS_PER_SPLIT, 
NUM_RECORDS_PER_SPLIT);
+        // Split finished hook for listening finished splits
+        final Set<String> finishedSplits = new HashSet<>();
+        final Consumer<Collection<String>> splitFinishedHook = 
finishedSplits::addAll;
+
+        try (final KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(
+                                Boundedness.BOUNDED,
+                                "KafkaSourceReaderTestGroup",
+                                new TestingReaderContext(),
+                                splitFinishedHook)) {
+            reader.addSplits(Arrays.asList(emptySplit0, emptySplit1));
+            pollUntil(
+                    reader,
+                    new TestingReaderOutput<>(),
+                    () -> reader.getNumAliveFetchers() == 0,
+                    "The split fetcher did not exit before timeout.");
+            assertEquals(0, reader.getNumAliveFetchers());
+            assertEquals(2, finishedSplits.size());
+            assertTrue(
+                    finishedSplits.containsAll(
+                            Arrays.asList(emptySplit0.splitId(), 
emptySplit1.splitId())));
+        }
+    }
+
     // ------------------------------------------
 
     @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
index 286934b1e95..d8f93683d8c 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
@@ -220,6 +220,10 @@ public class KafkaSourceTestEnv extends KafkaTestBase {
     public static void setupEarliestOffsets(String topic) throws Throwable {
         // Delete some records to move the starting partition.
         List<TopicPartition> partitions = getPartitionsForTopic(topic);
+        setupEarliestOffsets(partitions);
+    }
+
+    public static void setupEarliestOffsets(List<TopicPartition> partitions) 
throws Throwable {
         Map<TopicPartition, RecordsToDelete> toDelete = new HashMap<>();
         getEarliestOffsets(partitions)
                 .forEach((tp, offset) -> toDelete.put(tp, 
RecordsToDelete.beforeOffset(offset)));

Reply via email to