This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 122a7439 [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by
adjusting stopping condition on split reader (#100)
122a7439 is described below
commit 122a74394629701ce1096b6ea49a03a0e0744b2b
Author: dongwoo kim <[email protected]>
AuthorDate: Tue Sep 17 21:34:10 2024 +0800
[FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting
stopping condition on split reader (#100)
Problem: In batch mode, flink kafka connector could hang when
consuming transactional messages or reading from deleted records.
Solution: Use consumer.position() instead of lastRecord's offset to skip
control and deleted messages, preventing the hang.
---
.../source/reader/KafkaPartitionSplitReader.java | 44 ++++++++---------
.../connector/kafka/source/KafkaSourceITCase.java | 35 +++++++++++++
.../kafka/source/reader/KafkaSourceReaderTest.java | 2 +-
.../kafka/testutils/KafkaSourceTestEnv.java | 8 ++-
.../streaming/connectors/kafka/KafkaTestBase.java | 15 +++++-
.../connectors/kafka/KafkaTestEnvironment.java | 7 +++
.../connectors/kafka/table/KafkaTableITCase.java | 57 ++++++++++++++++++++++
.../connectors/kafka/table/KafkaTableTestBase.java | 19 ++++++++
8 files changed, 162 insertions(+), 25 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 94940b8e..23956f5d 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -122,32 +122,32 @@ public class KafkaPartitionSplitReader
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(consumerRecords,
kafkaSourceReaderMetrics);
List<TopicPartition> finishedPartitions = new ArrayList<>();
- for (TopicPartition tp : consumerRecords.partitions()) {
+ for (TopicPartition tp : consumer.assignment()) {
long stoppingOffset = getStoppingOffset(tp);
- final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
- consumerRecords.records(tp);
-
- if (recordsFromPartition.size() > 0) {
- final ConsumerRecord<byte[], byte[]> lastRecord =
- recordsFromPartition.get(recordsFromPartition.size() -
1);
-
- // After processing a record with offset of "stoppingOffset -
1", the split reader
- // should not continue fetching because the record with
stoppingOffset may not
- // exist. Keep polling will just block forever.
- if (lastRecord.offset() >= stoppingOffset - 1) {
- recordsBySplits.setPartitionStoppingOffset(tp,
stoppingOffset);
- finishSplitAtRecord(
- tp,
- stoppingOffset,
- lastRecord.offset(),
- finishedPartitions,
- recordsBySplits);
- }
+ long consumerPosition = consumer.position(tp);
+ // Stop fetching when the consumer's position reaches the
stoppingOffset.
+ // Control messages may follow the last record; therefore, using
the last record's
+ // offset as a stopping condition could result in indefinite
blocking.
+ if (consumerPosition >= stoppingOffset) {
+ LOG.debug(
+ "Position of {}: {}, has reached stopping offset: {}",
+ tp,
+ consumerPosition,
+ stoppingOffset);
+ recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
+ finishSplitAtRecord(
+ tp, stoppingOffset, consumerPosition,
finishedPartitions, recordsBySplits);
}
- // Track this partition's record lag if it never appears before
- kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
}
+ // Only track non-empty partition's record lag if it never appears
before
+ consumerRecords
+ .partitions()
+ .forEach(
+ trackTp -> {
+
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp);
+ });
+
markEmptySplitsAsFinished(recordsBySplits);
// Unassign the partitions that has finished.
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 38ef80d5..6c0bd7e5 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.PARTITION;
import static
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC;
+import static
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.kafkaServer;
import static org.assertj.core.api.Assertions.assertThat;
/** Unite test class for {@link KafkaSource}. */
@@ -369,6 +370,40 @@ public class KafkaSourceITCase {
WatermarkStrategy.noWatermarks(),
"testConsumingTopicWithEmptyPartitions"));
}
+
+ @Test
+ public void testConsumingTransactionalMessage() throws Throwable {
+ String transactionalTopic = "transactionalTopic-" +
UUID.randomUUID();
+ KafkaSourceTestEnv.createTestTopic(
+ transactionalTopic, KafkaSourceTestEnv.NUM_PARTITIONS, 1);
+ List<ProducerRecord<String, Integer>> records =
+ KafkaSourceTestEnv.getRecordsForTopic(transactionalTopic);
+ KafkaSourceTestEnv.produceToKafka(
+ records, kafkaServer.getTransactionalProducerConfig());
+ // After running
KafkaSourceTestEnv.setupEarliestOffsets(transactionalTopic):
+ // - For each partition, records with offsets before partition
number P are deleted.
+ // - Partition 0: offset 0 is earliest
+ // - Partition 5: offset 5 is earliest, 0-4 are deleted.
+ // - Partition 9: offset 9 is earliest, 0-8 are deleted.
+ KafkaSourceTestEnv.setupEarliestOffsets(transactionalTopic);
+ KafkaSource<PartitionAndValue> source =
+ KafkaSource.<PartitionAndValue>builder()
+
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ .setTopics(transactionalTopic)
+
.setGroupId("topic-with-transactional-message-test")
+ .setDeserializer(new
TestingKafkaRecordDeserializationSchema(false))
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setBounded(OffsetsInitializer.latest())
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ executeAndVerify(
+ env,
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "testConsumingTransactionalMessage"));
+ }
}
/** Integration test based on connector testing framework. */
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index f5aa7f5f..5ad87ffc 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -116,7 +116,7 @@ public class KafkaSourceReaderTest extends
SourceReaderTestBase<KafkaPartitionSp
"Waiting for offsets topic creation failed.");
}
KafkaSourceTestEnv.produceToKafka(
- getRecords(), StringSerializer.class, IntegerSerializer.class);
+ getRecords(), StringSerializer.class, IntegerSerializer.class,
null);
}
@AfterAll
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
index 5173f9dc..d82425f5 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
@@ -249,7 +249,13 @@ public class KafkaSourceTestEnv extends KafkaTestBase {
public static void produceToKafka(Collection<ProducerRecord<String,
Integer>> records)
throws Throwable {
- produceToKafka(records, StringSerializer.class,
IntegerSerializer.class);
+ produceToKafka(records, StringSerializer.class,
IntegerSerializer.class, null);
+ }
+
+ public static void produceToKafka(
+ Collection<ProducerRecord<String, Integer>> records, Properties
extraProps)
+ throws Throwable {
+ produceToKafka(records, StringSerializer.class,
IntegerSerializer.class, extraProps);
}
public static void setupTopic(
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index deafb7d6..4b9acbf1 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -46,6 +46,8 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -242,11 +244,15 @@ public abstract class KafkaTestBase extends TestLogger {
Collection<ProducerRecord<K, V>> records,
Class<? extends
org.apache.kafka.common.serialization.Serializer<K>> keySerializerClass,
Class<? extends
org.apache.kafka.common.serialization.Serializer<V>>
- valueSerializerClass)
+ valueSerializerClass,
+ @Nullable Properties extraProps)
throws Throwable {
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(kafkaServer.getIdempotentProducerConfig());
+ if (extraProps != null) {
+ props.putAll(extraProps);
+ }
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializerClass.getName());
props.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializerClass.getName());
@@ -261,9 +267,16 @@ public abstract class KafkaTestBase extends TestLogger {
}
};
try (KafkaProducer<K, V> producer = new KafkaProducer<>(props)) {
+ if (props.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
+ producer.initTransactions();
+ producer.beginTransaction();
+ }
for (ProducerRecord<K, V> record : records) {
producer.send(record, callback);
}
+ if (props.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
+ producer.commitTransaction();
+ }
}
if (sendingError.get() != null) {
throw sendingError.get();
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 6687cd52..1494ff1f 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.UUID;
/** Abstract class providing a Kafka test environment. */
public abstract class KafkaTestEnvironment {
@@ -115,6 +116,12 @@ public abstract class KafkaTestEnvironment {
return props;
}
+ public Properties getTransactionalProducerConfig() {
+ Properties props = new Properties();
+ props.put("transactional.id", UUID.randomUUID().toString());
+ return props;
+ }
+
// -- consumer / producer instances:
public <T> FlinkKafkaConsumerBase<T> getConsumer(
List<String> topics, DeserializationSchema<T>
deserializationSchema, Properties props) {
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 15aa722f..8630120b 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -54,6 +54,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -352,6 +353,62 @@ public class KafkaTableITCase extends KafkaTableTestBase {
deleteTestTopic(topic2);
}
+ @Test
+ public void testKafkaSourceEmptyResultOnDeletedOffsets() throws Exception {
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ final String topic = "bounded_" + format + "_" + UUID.randomUUID();
+ createTestTopic(topic, 1, 1);
+ // ---------- Produce an event time stream into Kafka
-------------------
+ String groupId = getStandardProps().getProperty("group.id");
+ String bootstraps = getBootstrapServers();
+
+ final String createTable =
+ String.format(
+ "CREATE TABLE kafka (\n"
+ + " `user_id` INT,\n"
+ + " `item_id` INT,\n"
+ + " `behavior` STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = '%s',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'properties.group.id' = '%s',\n"
+ + " 'scan.startup.mode' =
'specific-offsets',\n"
+ + " 'scan.bounded.mode' =
'specific-offsets',\n"
+ + " 'scan.startup.specific-offsets' =
'partition:0,offset:1',\n"
+ + " 'scan.bounded.specific-offsets' =
'partition:0,offset:3',\n"
+ + " %s\n"
+ + ")\n",
+ KafkaDynamicTableFactory.IDENTIFIER,
+ topic,
+ bootstraps,
+ groupId,
+ formatOptions());
+ tEnv.executeSql(createTable);
+ List<Row> values =
+ Arrays.asList(
+ Row.of(1, 1102, "behavior 1"),
+ Row.of(2, 1103, "behavior 2"),
+ Row.of(3, 1104, "behavior 3"));
+ tEnv.fromValues(values).insertInto("kafka").execute().await();
+ // ---------- Delete events from Kafka -------------------
+ Map<Integer, Long> partitionOffsetsToDelete = new HashMap<>();
+ partitionOffsetsToDelete.put(0, 3L);
+ deleteRecords(topic, partitionOffsetsToDelete);
+ // ---------- Consume stream from Kafka -------------------
+ List<Row> results = new ArrayList<>();
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(env);
+ tEnv.executeSql(createTable);
+ results.addAll(collectAllRows(tEnv.sqlQuery("SELECT * FROM kafka")));
+ assertThat(results).isEmpty();
+
+ // ------------- cleanup -------------------
+
+ deleteTestTopic(topic);
+ }
+
@Test
public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws
Exception {
// we always use a different topic name for each parameterized topic,
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index cffe2d6c..e9d8bb76 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -170,6 +171,24 @@ public abstract class KafkaTableTestBase extends
AbstractTestBase {
}
}
+ public void deleteRecords(String topic, Map<Integer, Long>
partitionOffsetsToDelete) {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
getBootstrapServers());
+
+ try (AdminClient admin = AdminClient.create(properties)) {
+ Map<TopicPartition, RecordsToDelete> recordsToDelete = new
HashMap<>();
+ for (Map.Entry<Integer, Long> entry :
partitionOffsetsToDelete.entrySet()) {
+ TopicPartition partition = new TopicPartition(topic,
entry.getKey());
+ RecordsToDelete records =
RecordsToDelete.beforeOffset(entry.getValue());
+ recordsToDelete.put(partition, records);
+ }
+ admin.deleteRecords(recordsToDelete).all().get();
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Fail to delete records on topic [%s].",
topic), e);
+ }
+ }
+
// ------------------------ For Debug Logging Purpose
----------------------------------
private void scheduleTimeoutLogger(Duration period, Runnable
loggingAction) {