This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 122a7439 [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by 
adjusting stopping condition on split reader (#100)
122a7439 is described below

commit 122a74394629701ce1096b6ea49a03a0e0744b2b
Author: dongwoo kim <[email protected]>
AuthorDate: Tue Sep 17 21:34:10 2024 +0800

    [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting 
stopping condition on split reader (#100)
    
    Problem: In batch mode, flink kafka connector could hang when
    consuming transactional messages or reading from deleted records.
    
    Solution: Use consumer.position() instead of lastRecord's offset to skip
    control and deleted messages, preventing the hang.
---
 .../source/reader/KafkaPartitionSplitReader.java   | 44 ++++++++---------
 .../connector/kafka/source/KafkaSourceITCase.java  | 35 +++++++++++++
 .../kafka/source/reader/KafkaSourceReaderTest.java |  2 +-
 .../kafka/testutils/KafkaSourceTestEnv.java        |  8 ++-
 .../streaming/connectors/kafka/KafkaTestBase.java  | 15 +++++-
 .../connectors/kafka/KafkaTestEnvironment.java     |  7 +++
 .../connectors/kafka/table/KafkaTableITCase.java   | 57 ++++++++++++++++++++++
 .../connectors/kafka/table/KafkaTableTestBase.java | 19 ++++++++
 8 files changed, 162 insertions(+), 25 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 94940b8e..23956f5d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -122,32 +122,32 @@ public class KafkaPartitionSplitReader
         KafkaPartitionSplitRecords recordsBySplits =
                 new KafkaPartitionSplitRecords(consumerRecords, 
kafkaSourceReaderMetrics);
         List<TopicPartition> finishedPartitions = new ArrayList<>();
-        for (TopicPartition tp : consumerRecords.partitions()) {
+        for (TopicPartition tp : consumer.assignment()) {
             long stoppingOffset = getStoppingOffset(tp);
-            final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
-                    consumerRecords.records(tp);
-
-            if (recordsFromPartition.size() > 0) {
-                final ConsumerRecord<byte[], byte[]> lastRecord =
-                        recordsFromPartition.get(recordsFromPartition.size() - 
1);
-
-                // After processing a record with offset of "stoppingOffset - 
1", the split reader
-                // should not continue fetching because the record with 
stoppingOffset may not
-                // exist. Keep polling will just block forever.
-                if (lastRecord.offset() >= stoppingOffset - 1) {
-                    recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
-                    finishSplitAtRecord(
-                            tp,
-                            stoppingOffset,
-                            lastRecord.offset(),
-                            finishedPartitions,
-                            recordsBySplits);
-                }
+            long consumerPosition = consumer.position(tp);
+            // Stop fetching when the consumer's position reaches the 
stoppingOffset.
+            // Control messages may follow the last record; therefore, using 
the last record's
+            // offset as a stopping condition could result in indefinite 
blocking.
+            if (consumerPosition >= stoppingOffset) {
+                LOG.debug(
+                        "Position of {}: {}, has reached stopping offset: {}",
+                        tp,
+                        consumerPosition,
+                        stoppingOffset);
+                recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
+                finishSplitAtRecord(
+                        tp, stoppingOffset, consumerPosition, 
finishedPartitions, recordsBySplits);
             }
-            // Track this partition's record lag if it never appears before
-            kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
         }
 
+        // Only track non-empty partition's record lag if it never appears 
before
+        consumerRecords
+                .partitions()
+                .forEach(
+                        trackTp -> {
+                            
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp);
+                        });
+
         markEmptySplitsAsFinished(recordsBySplits);
 
         // Unassign the partitions that has finished.
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 38ef80d5..6c0bd7e5 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.PARTITION;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC;
+import static 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.kafkaServer;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unite test class for {@link KafkaSource}. */
@@ -369,6 +370,40 @@ public class KafkaSourceITCase {
                             WatermarkStrategy.noWatermarks(),
                             "testConsumingTopicWithEmptyPartitions"));
         }
+
+        @Test
+        public void testConsumingTransactionalMessage() throws Throwable {
+            String transactionalTopic = "transactionalTopic-" + 
UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(
+                    transactionalTopic, KafkaSourceTestEnv.NUM_PARTITIONS, 1);
+            List<ProducerRecord<String, Integer>> records =
+                    KafkaSourceTestEnv.getRecordsForTopic(transactionalTopic);
+            KafkaSourceTestEnv.produceToKafka(
+                    records, kafkaServer.getTransactionalProducerConfig());
+            // After running 
KafkaSourceTestEnv.setupEarliestOffsets(transactionalTopic):
+            // - For each partition, records with offsets before partition 
number P are deleted.
+            //   - Partition 0: offset 0 is earliest
+            //   - Partition 5: offset 5 is earliest, 0-4 are deleted.
+            //   - Partition 9: offset 9 is earliest, 0-8 are deleted.
+            KafkaSourceTestEnv.setupEarliestOffsets(transactionalTopic);
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(transactionalTopic)
+                            
.setGroupId("topic-with-transactional-message-test")
+                            .setDeserializer(new 
TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            executeAndVerify(
+                    env,
+                    env.fromSource(
+                            source,
+                            WatermarkStrategy.noWatermarks(),
+                            "testConsumingTransactionalMessage"));
+        }
     }
 
     /** Integration test based on connector testing framework. */
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index f5aa7f5f..5ad87ffc 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -116,7 +116,7 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
                     "Waiting for offsets topic creation failed.");
         }
         KafkaSourceTestEnv.produceToKafka(
-                getRecords(), StringSerializer.class, IntegerSerializer.class);
+                getRecords(), StringSerializer.class, IntegerSerializer.class, 
null);
     }
 
     @AfterAll
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
index 5173f9dc..d82425f5 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
@@ -249,7 +249,13 @@ public class KafkaSourceTestEnv extends KafkaTestBase {
 
     public static void produceToKafka(Collection<ProducerRecord<String, 
Integer>> records)
             throws Throwable {
-        produceToKafka(records, StringSerializer.class, 
IntegerSerializer.class);
+        produceToKafka(records, StringSerializer.class, 
IntegerSerializer.class, null);
+    }
+
+    public static void produceToKafka(
+            Collection<ProducerRecord<String, Integer>> records, Properties 
extraProps)
+            throws Throwable {
+        produceToKafka(records, StringSerializer.class, 
IntegerSerializer.class, extraProps);
     }
 
     public static void setupTopic(
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index deafb7d6..4b9acbf1 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -46,6 +46,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -242,11 +244,15 @@ public abstract class KafkaTestBase extends TestLogger {
             Collection<ProducerRecord<K, V>> records,
             Class<? extends 
org.apache.kafka.common.serialization.Serializer<K>> keySerializerClass,
             Class<? extends 
org.apache.kafka.common.serialization.Serializer<V>>
-                    valueSerializerClass)
+                    valueSerializerClass,
+            @Nullable Properties extraProps)
             throws Throwable {
         Properties props = new Properties();
         props.putAll(standardProps);
         props.putAll(kafkaServer.getIdempotentProducerConfig());
+        if (extraProps != null) {
+            props.putAll(extraProps);
+        }
         props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass.getName());
         props.setProperty(
                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass.getName());
@@ -261,9 +267,16 @@ public abstract class KafkaTestBase extends TestLogger {
                     }
                 };
         try (KafkaProducer<K, V> producer = new KafkaProducer<>(props)) {
+            if (props.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
+                producer.initTransactions();
+                producer.beginTransaction();
+            }
             for (ProducerRecord<K, V> record : records) {
                 producer.send(record, callback);
             }
+            if (props.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
+                producer.commitTransaction();
+            }
         }
         if (sendingError.get() != null) {
             throw sendingError.get();
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 6687cd52..1494ff1f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Abstract class providing a Kafka test environment. */
 public abstract class KafkaTestEnvironment {
@@ -115,6 +116,12 @@ public abstract class KafkaTestEnvironment {
         return props;
     }
 
+    public Properties getTransactionalProducerConfig() {
+        Properties props = new Properties();
+        props.put("transactional.id", UUID.randomUUID().toString());
+        return props;
+    }
+
     // -- consumer / producer instances:
     public <T> FlinkKafkaConsumerBase<T> getConsumer(
             List<String> topics, DeserializationSchema<T> 
deserializationSchema, Properties props) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 15aa722f..8630120b 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -54,6 +54,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -352,6 +353,62 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         deleteTestTopic(topic2);
     }
 
+    @Test
+    public void testKafkaSourceEmptyResultOnDeletedOffsets() throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic = "bounded_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        String groupId = getStandardProps().getProperty("group.id");
+        String bootstraps = getBootstrapServers();
+
+        final String createTable =
+                String.format(
+                        "CREATE TABLE kafka (\n"
+                                + "  `user_id` INT,\n"
+                                + "  `item_id` INT,\n"
+                                + "  `behavior` STRING\n"
+                                + ") WITH (\n"
+                                + "  'connector' = '%s',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'properties.group.id' = '%s',\n"
+                                + "  'scan.startup.mode' = 
'specific-offsets',\n"
+                                + "  'scan.bounded.mode' = 
'specific-offsets',\n"
+                                + "  'scan.startup.specific-offsets' = 
'partition:0,offset:1',\n"
+                                + "  'scan.bounded.specific-offsets' = 
'partition:0,offset:3',\n"
+                                + "  %s\n"
+                                + ")\n",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topic,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+        tEnv.executeSql(createTable);
+        List<Row> values =
+                Arrays.asList(
+                        Row.of(1, 1102, "behavior 1"),
+                        Row.of(2, 1103, "behavior 2"),
+                        Row.of(3, 1104, "behavior 3"));
+        tEnv.fromValues(values).insertInto("kafka").execute().await();
+        // ---------- Delete events from Kafka -------------------
+        Map<Integer, Long> partitionOffsetsToDelete = new HashMap<>();
+        partitionOffsetsToDelete.put(0, 3L);
+        deleteRecords(topic, partitionOffsetsToDelete);
+        // ---------- Consume stream from Kafka -------------------
+        List<Row> results = new ArrayList<>();
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql(createTable);
+        results.addAll(collectAllRows(tEnv.sqlQuery("SELECT * FROM kafka")));
+        assertThat(results).isEmpty();
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);
+    }
+
     @Test
     public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
         // we always use a different topic name for each parameterized topic,
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index cffe2d6c..e9d8bb76 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -170,6 +171,24 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
         }
     }
 
+    public void deleteRecords(String topic, Map<Integer, Long> 
partitionOffsetsToDelete) {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
getBootstrapServers());
+
+        try (AdminClient admin = AdminClient.create(properties)) {
+            Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<>();
+            for (Map.Entry<Integer, Long> entry : 
partitionOffsetsToDelete.entrySet()) {
+                TopicPartition partition = new TopicPartition(topic, 
entry.getKey());
+                RecordsToDelete records = 
RecordsToDelete.beforeOffset(entry.getValue());
+                recordsToDelete.put(partition, records);
+            }
+            admin.deleteRecords(recordsToDelete).all().get();
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    String.format("Fail to delete records on topic [%s].", 
topic), e);
+        }
+    }
+
     // ------------------------ For Debug Logging Purpose 
----------------------------------
 
     private void scheduleTimeoutLogger(Duration period, Runnable 
loggingAction) {

Reply via email to