This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e0dc2d8895 [flink] Migrate paimon-flink-cdc from
KafkaDeserializationSchema to KafkaRecordDeserializationSchema (#7396)
e0dc2d8895 is described below
commit e0dc2d8895a43c52af03b8f411e3296c688c0f86
Author: Umesh Dangat <[email protected]>
AuthorDate: Tue Mar 10 17:24:53 2026 -0700
[flink] Migrate paimon-flink-cdc from KafkaDeserializationSchema to
KafkaRecordDeserializationSchema (#7396)
flink-connector-kafka 4.x
(https://issues.apache.org/jira/browse/FLINK-36648,
https://github.com/apache/flink-connector-kafka/pull/140) removed the
legacy KafkaDeserializationSchema interface as part of the Flink 2.0
release. This PR updates paimon-flink-cdc to compile and run correctly
against flink-connector-kafka 4.x by replacing all usages of the removed
interface with KafkaRecordDeserializationSchema, which uses a
Collector-based deserialize() instead of the single-return-value
pattern.
KafkaRecordDeserializationSchema is present in both
flink-connector-kafka 3.3.0-1.20 (Flink 1.20) and 4.x (Flink 2.x), so
the migration is backward-compatible. The pom.xml retains
flink.version=1.20.1 and flink.connector.kafka.version=3.3.0-1.20 to
preserve compatibility with the Flink 1.x CI runners (which use JDK 8
and cannot load Flink 2.x class files compiled for Java 11).
This is a partial step toward full Flink 2.x support tracked in #5350.
The scope is limited to paimon-flink-cdc and its Kafka CDC ingestion
path.
---
.../action/cdc/format/AbstractDataFormat.java | 6 ++--
.../action/cdc/format/AbstractJsonDataFormat.java | 4 +--
.../paimon/flink/action/cdc/format/DataFormat.java | 4 +--
.../format/debezium/DebeziumAvroDataFormat.java | 4 +--
.../flink/action/cdc/kafka/KafkaActionUtils.java | 34 +++++++++++-----------
.../KafkaDebeziumAvroDeserializationSchema.java | 20 ++++++-------
.../KafkaDebeziumJsonDeserializationSchema.java | 17 +++++------
.../pipeline/cdc/schema/PaimonMetadataApplier.java | 4 +--
.../debezium/DebeziumBsonRecordParserTest.java | 13 ++++++---
...KafkaDebeziumJsonDeserializationSchemaTest.java | 10 +++++--
10 files changed, 61 insertions(+), 55 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
index 66deba9b80..2d495adfd5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
@@ -24,7 +24,7 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import java.util.List;
import java.util.function.Function;
@@ -36,7 +36,7 @@ public abstract class AbstractDataFormat implements
DataFormat {
protected abstract RecordParserFactory parser();
/** Deserializer for Kafka Record. */
- protected abstract Function<Configuration,
KafkaDeserializationSchema<CdcSourceRecord>>
+ protected abstract Function<Configuration,
KafkaRecordDeserializationSchema<CdcSourceRecord>>
kafkaDeserializer();
/** Deserializer for Pulsar Record. */
@@ -50,7 +50,7 @@ public abstract class AbstractDataFormat implements
DataFormat {
}
@Override
- public KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
+ public KafkaRecordDeserializationSchema<CdcSourceRecord>
createKafkaDeserializer(
Configuration cdcSourceConfig) {
return kafkaDeserializer().apply(cdcSourceConfig);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonDataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonDataFormat.java
index 52b7b4ce83..6fad39b7fe 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonDataFormat.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonDataFormat.java
@@ -24,7 +24,7 @@ import
org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSc
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import java.util.function.Function;
@@ -35,7 +35,7 @@ import java.util.function.Function;
public abstract class AbstractJsonDataFormat extends AbstractDataFormat {
@Override
- protected Function<Configuration,
KafkaDeserializationSchema<CdcSourceRecord>>
+ protected Function<Configuration,
KafkaRecordDeserializationSchema<CdcSourceRecord>>
kafkaDeserializer() {
return KafkaDebeziumJsonDeserializationSchema::new;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
index 78c9828ee1..4b9ad2c6ba 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
@@ -25,7 +25,7 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import java.util.List;
@@ -63,7 +63,7 @@ public interface DataFormat {
.withMetadataConverters(metadataConverters);
}
- KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
+ KafkaRecordDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig);
DeserializationSchema<CdcSourceRecord>
createPulsarDeserializer(Configuration cdcSourceConfig);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java
index e75a49fb50..9ce736f0ad 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java
@@ -26,7 +26,7 @@ import
org.apache.paimon.flink.action.cdc.pulsar.PulsarDebeziumAvroDeserializati
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import java.util.function.Function;
@@ -42,7 +42,7 @@ public class DebeziumAvroDataFormat extends
AbstractDataFormat {
}
@Override
- protected Function<Configuration,
KafkaDeserializationSchema<CdcSourceRecord>>
+ protected Function<Configuration,
KafkaRecordDeserializationSchema<CdcSourceRecord>>
kafkaDeserializer() {
return KafkaDebeziumAvroDeserializationSchema::new;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 6391981d0f..da4d1a5d6e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -29,7 +29,6 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
@@ -59,7 +58,6 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
@@ -74,7 +72,7 @@ public class KafkaActionUtils {
public static KafkaSource<CdcSourceRecord> buildKafkaSource(
Configuration kafkaConfig,
- KafkaDeserializationSchema<CdcSourceRecord> deserializationSchema)
{
+ KafkaRecordDeserializationSchema<CdcSourceRecord>
deserializationSchema) {
KafkaSourceBuilder<CdcSourceRecord> kafkaSourceBuilder =
KafkaSource.builder();
if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
@@ -89,7 +87,7 @@ public class KafkaActionUtils {
}
kafkaSourceBuilder
-
.setDeserializer(KafkaRecordDeserializationSchema.of(deserializationSchema))
+ .setDeserializer(deserializationSchema)
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
Properties properties = createKafkaProperties(kafkaConfig);
@@ -250,7 +248,7 @@ public class KafkaActionUtils {
public static MessageQueueSchemaUtils.ConsumerWrapper
getKafkaEarliestConsumer(
Configuration kafkaConfig,
- KafkaDeserializationSchema<CdcSourceRecord> deserializationSchema)
{
+ KafkaRecordDeserializationSchema<CdcSourceRecord>
deserializationSchema) {
Properties props = createKafkaProperties(kafkaConfig);
props.put(
@@ -337,12 +335,12 @@ public class KafkaActionUtils {
private final KafkaConsumer<byte[], byte[]> consumer;
private final String topic;
- private final KafkaDeserializationSchema<CdcSourceRecord>
deserializationSchema;
+ private final KafkaRecordDeserializationSchema<CdcSourceRecord>
deserializationSchema;
KafkaConsumerWrapper(
KafkaConsumer<byte[], byte[]> kafkaConsumer,
String topic,
- KafkaDeserializationSchema<CdcSourceRecord>
deserializationSchema) {
+ KafkaRecordDeserializationSchema<CdcSourceRecord>
deserializationSchema) {
this.consumer = kafkaConsumer;
this.topic = topic;
this.deserializationSchema = deserializationSchema;
@@ -352,16 +350,18 @@ public class KafkaActionUtils {
public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeOutMills));
- return
StreamSupport.stream(consumerRecords.records(topic).spliterator(), false)
- .map(
- consumerRecord -> {
- try {
- return
deserializationSchema.deserialize(consumerRecord);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- })
- .collect(Collectors.toList());
+ List<CdcSourceRecord> results = new java.util.ArrayList<>();
+
org.apache.flink.api.common.functions.util.ListCollector<CdcSourceRecord>
collector =
+ new
org.apache.flink.api.common.functions.util.ListCollector<>(results);
+ for (org.apache.kafka.clients.consumer.ConsumerRecord<byte[],
byte[]> consumerRecord :
+ consumerRecords.records(topic)) {
+ try {
+ deserializationSchema.deserialize(consumerRecord,
collector);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return results;
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
index 1f98c60e8d..6c53dd05d5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
@@ -26,7 +26,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
@@ -36,7 +37,7 @@ import static
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.SCHEMA_
/** A simple deserialization schema for {@link CdcSourceRecord}. */
public class KafkaDebeziumAvroDeserializationSchema
- implements KafkaDeserializationSchema<CdcSourceRecord> {
+ implements KafkaRecordDeserializationSchema<CdcSourceRecord> {
private static final long serialVersionUID = 1L;
@@ -57,10 +58,11 @@ public class KafkaDebeziumAvroDeserializationSchema
}
@Override
- public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message)
throws IOException {
+ public void deserialize(ConsumerRecord<byte[], byte[]> message,
Collector<CdcSourceRecord> out)
+ throws IOException {
if (message.value() == null) {
// skip tombstone messages
- return null;
+ return;
}
if (this.avroDeserializer == null) {
@@ -77,13 +79,9 @@ public class KafkaDebeziumAvroDeserializationSchema
}
GenericRecord value = (GenericRecord)
valueContainerWithVersion.container();
- return new CdcSourceRecord(
- topic, key, value,
KafkaActionUtils.extractKafkaMetadata(message));
- }
-
- @Override
- public boolean isEndOfStream(CdcSourceRecord nextElement) {
- return false;
+ out.collect(
+ new CdcSourceRecord(
+ topic, key, value,
KafkaActionUtils.extractKafkaMetadata(message)));
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
index 1bd7ed25a0..c708dbca2d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
@@ -26,7 +26,8 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import static
org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
/** A simple deserialization schema for {@link CdcSourceRecord}. */
public class KafkaDebeziumJsonDeserializationSchema
- implements KafkaDeserializationSchema<CdcSourceRecord> {
+ implements KafkaRecordDeserializationSchema<CdcSourceRecord> {
private static final long serialVersionUID = 1L;
@@ -58,10 +59,11 @@ public class KafkaDebeziumJsonDeserializationSchema
}
@Override
- public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message)
throws IOException {
+ public void deserialize(ConsumerRecord<byte[], byte[]> message,
Collector<CdcSourceRecord> out)
+ throws IOException {
if (message.value() == null) {
// skip tombstone messages
- return null;
+ return;
}
try {
@@ -79,18 +81,13 @@ public class KafkaDebeziumJsonDeserializationSchema
JsonNode valueNode = objectMapper.readValue(message.value(),
JsonNode.class);
Map<String, Object> kafkaMetadata =
KafkaActionUtils.extractKafkaMetadata(message);
- return new CdcSourceRecord(message.topic(), keyNode, valueNode,
kafkaMetadata);
+ out.collect(new CdcSourceRecord(message.topic(), keyNode,
valueNode, kafkaMetadata));
} catch (Exception e) {
LOG.error("Invalid Json:\n{}", new String(message.value()));
throw e;
}
}
- @Override
- public boolean isEndOfStream(CdcSourceRecord nextElement) {
- return false;
- }
-
@Override
public TypeInformation<CdcSourceRecord> getProducedType() {
return getForClass(CdcSourceRecord.class);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java
index 6a5010b772..4798d9a6e7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java
@@ -44,12 +44,12 @@ import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventExcept
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
-import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.table.factories.FactoryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -106,7 +106,7 @@ public class PaimonMetadataApplier implements
MetadataApplier {
@Override
public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
- return Sets.newHashSet(
+ return EnumSet.of(
SchemaChangeEventType.CREATE_TABLE,
SchemaChangeEventType.ADD_COLUMN,
SchemaChangeEventType.DROP_COLUMN,
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
index 17277d5d7d..9c8dafc291 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
@@ -34,7 +34,8 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -63,7 +64,8 @@ public class DebeziumBsonRecordParserTest {
private static final Map<String, String> keyEvent = new HashMap<>();
- private static KafkaDeserializationSchema<CdcSourceRecord>
kafkaDeserializationSchema = null;
+ private static KafkaRecordDeserializationSchema<CdcSourceRecord>
kafkaDeserializationSchema =
+ null;
private static final Map<String, String> beforeEvent = new HashMap<>();
@@ -259,7 +261,10 @@ public class DebeziumBsonRecordParserTest {
private static CdcSourceRecord deserializeKafkaSchema(String key, String
value)
throws Exception {
- return kafkaDeserializationSchema.deserialize(
- new ConsumerRecord<>("topic", 0, 0, key.getBytes(),
value.getBytes()));
+ List<CdcSourceRecord> results = new ArrayList<>();
+ kafkaDeserializationSchema.deserialize(
+ new ConsumerRecord<>("topic", 0, 0, key.getBytes(),
value.getBytes()),
+ new ListCollector<>(results));
+ return results.isEmpty() ? null : results.get(0);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchemaTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchemaTest.java
index be29047ab3..578ea0ca7e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchemaTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchemaTest.java
@@ -22,11 +22,14 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
/**
* Unit tests for {@link KafkaDebeziumJsonDeserializationSchema}. Ensures that
deserialization
@@ -43,8 +46,11 @@ public class KafkaDebeziumJsonDeserializationSchemaTest {
byte[] rawKey = "non-json-key".getBytes(StandardCharsets.UTF_8);
byte[] jsonValue =
"{\"after\":{\"id\":1},\"op\":\"c\"}".getBytes(StandardCharsets.UTF_8);
- CdcSourceRecord record =
- schema.deserialize(new ConsumerRecord<>("topic", 0, 0L,
rawKey, jsonValue));
+ List<CdcSourceRecord> results = new ArrayList<>();
+ schema.deserialize(
+ new ConsumerRecord<>("topic", 0, 0L, rawKey, jsonValue),
+ new ListCollector<>(results));
+ CdcSourceRecord record = results.isEmpty() ? null : results.get(0);
Assertions.assertNotNull(record, "Deserialization should succeed and
return a record");
Assertions.assertNull(record.getKey(), "Key should be null when the
Kafka key is not JSON");