This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e0dc2d8895 [flink] Migrate paimon-flink-cdc from 
KafkaDeserializationSchema to KafkaRecordDeserializationSchema (#7396)
e0dc2d8895 is described below

commit e0dc2d8895a43c52af03b8f411e3296c688c0f86
Author: Umesh Dangat <[email protected]>
AuthorDate: Tue Mar 10 17:24:53 2026 -0700

    [flink] Migrate paimon-flink-cdc from KafkaDeserializationSchema to 
KafkaRecordDeserializationSchema (#7396)
    
    flink-connector-kafka 4.x
    (https://issues.apache.org/jira/browse/FLINK-36648,
    https://github.com/apache/flink-connector-kafka/pull/140) removed the
    legacy KafkaDeserializationSchema interface as part of the Flink 2.0
    release. This PR updates paimon-flink-cdc to compile and run correctly
    against flink-connector-kafka 4.x by replacing all usages of the removed
    interface with KafkaRecordDeserializationSchema, which uses a
    Collector-based deserialize() instead of the single-return-value
    pattern.
    
    KafkaRecordDeserializationSchema is present in both
    flink-connector-kafka 3.3.0-1.20 (Flink 1.20) and 4.x (Flink 2.x), so
    the migration is backward-compatible. The pom.xml retains
    flink.version=1.20.1 and flink.connector.kafka.version=3.3.0-1.20 to
    preserve compatibility with the Flink 1.x CI runners (which use JDK 8
    and cannot load Flink 2.x class files compiled for Java 11).
    
    This is a partial step toward full Flink 2.x support tracked in #5350.
    The scope is limited to paimon-flink-cdc and its Kafka CDC ingestion
    path.
---
 .../action/cdc/format/AbstractDataFormat.java      |  6 ++--
 .../action/cdc/format/AbstractJsonDataFormat.java  |  4 +--
 .../paimon/flink/action/cdc/format/DataFormat.java |  4 +--
 .../format/debezium/DebeziumAvroDataFormat.java    |  4 +--
 .../flink/action/cdc/kafka/KafkaActionUtils.java   | 34 +++++++++++-----------
 .../KafkaDebeziumAvroDeserializationSchema.java    | 20 ++++++-------
 .../KafkaDebeziumJsonDeserializationSchema.java    | 17 +++++------
 .../pipeline/cdc/schema/PaimonMetadataApplier.java |  4 +--
 .../debezium/DebeziumBsonRecordParserTest.java     | 13 ++++++---
 ...KafkaDebeziumJsonDeserializationSchemaTest.java | 10 +++++--
 10 files changed, 61 insertions(+), 55 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
index 66deba9b80..2d495adfd5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
@@ -24,7 +24,7 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 
 import java.util.List;
 import java.util.function.Function;
@@ -36,7 +36,7 @@ public abstract class AbstractDataFormat implements 
DataFormat {
     protected abstract RecordParserFactory parser();
 
     /** Deserializer for Kafka Record. */
-    protected abstract Function<Configuration, 
KafkaDeserializationSchema<CdcSourceRecord>>
+    protected abstract Function<Configuration, 
KafkaRecordDeserializationSchema<CdcSourceRecord>>
             kafkaDeserializer();
 
     /** Deserializer for Pulsar Record. */
@@ -50,7 +50,7 @@ public abstract class AbstractDataFormat implements 
DataFormat {
     }
 
     @Override
-    public KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
+    public KafkaRecordDeserializationSchema<CdcSourceRecord> 
createKafkaDeserializer(
             Configuration cdcSourceConfig) {
         return kafkaDeserializer().apply(cdcSourceConfig);
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonDataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonDataFormat.java
index 52b7b4ce83..6fad39b7fe 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonDataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonDataFormat.java
@@ -24,7 +24,7 @@ import 
org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSc
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 
 import java.util.function.Function;
 
@@ -35,7 +35,7 @@ import java.util.function.Function;
 public abstract class AbstractJsonDataFormat extends AbstractDataFormat {
 
     @Override
-    protected Function<Configuration, 
KafkaDeserializationSchema<CdcSourceRecord>>
+    protected Function<Configuration, 
KafkaRecordDeserializationSchema<CdcSourceRecord>>
             kafkaDeserializer() {
         return KafkaDebeziumJsonDeserializationSchema::new;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
index 78c9828ee1..4b9ad2c6ba 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
@@ -25,7 +25,7 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 
 import java.util.List;
 
@@ -63,7 +63,7 @@ public interface DataFormat {
                 .withMetadataConverters(metadataConverters);
     }
 
-    KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
+    KafkaRecordDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
             Configuration cdcSourceConfig);
 
     DeserializationSchema<CdcSourceRecord> 
createPulsarDeserializer(Configuration cdcSourceConfig);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java
index e75a49fb50..9ce736f0ad 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java
@@ -26,7 +26,7 @@ import 
org.apache.paimon.flink.action.cdc.pulsar.PulsarDebeziumAvroDeserializati
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 
 import java.util.function.Function;
 
@@ -42,7 +42,7 @@ public class DebeziumAvroDataFormat extends 
AbstractDataFormat {
     }
 
     @Override
-    protected Function<Configuration, 
KafkaDeserializationSchema<CdcSourceRecord>>
+    protected Function<Configuration, 
KafkaRecordDeserializationSchema<CdcSourceRecord>>
             kafkaDeserializer() {
         return KafkaDebeziumAvroDeserializationSchema::new;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 6391981d0f..da4d1a5d6e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -29,7 +29,6 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
 import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
@@ -59,7 +58,6 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
@@ -74,7 +72,7 @@ public class KafkaActionUtils {
 
     public static KafkaSource<CdcSourceRecord> buildKafkaSource(
             Configuration kafkaConfig,
-            KafkaDeserializationSchema<CdcSourceRecord> deserializationSchema) 
{
+            KafkaRecordDeserializationSchema<CdcSourceRecord> 
deserializationSchema) {
         KafkaSourceBuilder<CdcSourceRecord> kafkaSourceBuilder = 
KafkaSource.builder();
 
         if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
@@ -89,7 +87,7 @@ public class KafkaActionUtils {
         }
 
         kafkaSourceBuilder
-                
.setDeserializer(KafkaRecordDeserializationSchema.of(deserializationSchema))
+                .setDeserializer(deserializationSchema)
                 .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
 
         Properties properties = createKafkaProperties(kafkaConfig);
@@ -250,7 +248,7 @@ public class KafkaActionUtils {
 
     public static MessageQueueSchemaUtils.ConsumerWrapper 
getKafkaEarliestConsumer(
             Configuration kafkaConfig,
-            KafkaDeserializationSchema<CdcSourceRecord> deserializationSchema) 
{
+            KafkaRecordDeserializationSchema<CdcSourceRecord> 
deserializationSchema) {
         Properties props = createKafkaProperties(kafkaConfig);
 
         props.put(
@@ -337,12 +335,12 @@ public class KafkaActionUtils {
 
         private final KafkaConsumer<byte[], byte[]> consumer;
         private final String topic;
-        private final KafkaDeserializationSchema<CdcSourceRecord> 
deserializationSchema;
+        private final KafkaRecordDeserializationSchema<CdcSourceRecord> 
deserializationSchema;
 
         KafkaConsumerWrapper(
                 KafkaConsumer<byte[], byte[]> kafkaConsumer,
                 String topic,
-                KafkaDeserializationSchema<CdcSourceRecord> 
deserializationSchema) {
+                KafkaRecordDeserializationSchema<CdcSourceRecord> 
deserializationSchema) {
             this.consumer = kafkaConsumer;
             this.topic = topic;
             this.deserializationSchema = deserializationSchema;
@@ -352,16 +350,18 @@ public class KafkaActionUtils {
         public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
             ConsumerRecords<byte[], byte[]> consumerRecords =
                     consumer.poll(Duration.ofMillis(pollTimeOutMills));
-            return 
StreamSupport.stream(consumerRecords.records(topic).spliterator(), false)
-                    .map(
-                            consumerRecord -> {
-                                try {
-                                    return 
deserializationSchema.deserialize(consumerRecord);
-                                } catch (Exception e) {
-                                    throw new RuntimeException(e);
-                                }
-                            })
-                    .collect(Collectors.toList());
+            List<CdcSourceRecord> results = new java.util.ArrayList<>();
+            
org.apache.flink.api.common.functions.util.ListCollector<CdcSourceRecord> 
collector =
+                    new 
org.apache.flink.api.common.functions.util.ListCollector<>(results);
+            for (org.apache.kafka.clients.consumer.ConsumerRecord<byte[], 
byte[]> consumerRecord :
+                    consumerRecords.records(topic)) {
+                try {
+                    deserializationSchema.deserialize(consumerRecord, 
collector);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            return results;
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
index 1f98c60e8d..6c53dd05d5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
@@ -26,7 +26,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.Collector;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.io.IOException;
@@ -36,7 +37,7 @@ import static 
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.SCHEMA_
 
 /** A simple deserialization schema for {@link CdcSourceRecord}. */
 public class KafkaDebeziumAvroDeserializationSchema
-        implements KafkaDeserializationSchema<CdcSourceRecord> {
+        implements KafkaRecordDeserializationSchema<CdcSourceRecord> {
 
     private static final long serialVersionUID = 1L;
 
@@ -57,10 +58,11 @@ public class KafkaDebeziumAvroDeserializationSchema
     }
 
     @Override
-    public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message) 
throws IOException {
+    public void deserialize(ConsumerRecord<byte[], byte[]> message, 
Collector<CdcSourceRecord> out)
+            throws IOException {
         if (message.value() == null) {
             // skip tombstone messages
-            return null;
+            return;
         }
 
         if (this.avroDeserializer == null) {
@@ -77,13 +79,9 @@ public class KafkaDebeziumAvroDeserializationSchema
         }
         GenericRecord value = (GenericRecord) 
valueContainerWithVersion.container();
 
-        return new CdcSourceRecord(
-                topic, key, value, 
KafkaActionUtils.extractKafkaMetadata(message));
-    }
-
-    @Override
-    public boolean isEndOfStream(CdcSourceRecord nextElement) {
-        return false;
+        out.collect(
+                new CdcSourceRecord(
+                        topic, key, value, 
KafkaActionUtils.extractKafkaMetadata(message)));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
index 1bd7ed25a0..c708dbca2d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
@@ -26,7 +26,8 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMap
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.Collector;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import static 
org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
 
 /** A simple deserialization schema for {@link CdcSourceRecord}. */
 public class KafkaDebeziumJsonDeserializationSchema
-        implements KafkaDeserializationSchema<CdcSourceRecord> {
+        implements KafkaRecordDeserializationSchema<CdcSourceRecord> {
 
     private static final long serialVersionUID = 1L;
 
@@ -58,10 +59,11 @@ public class KafkaDebeziumJsonDeserializationSchema
     }
 
     @Override
-    public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message) 
throws IOException {
+    public void deserialize(ConsumerRecord<byte[], byte[]> message, 
Collector<CdcSourceRecord> out)
+            throws IOException {
         if (message.value() == null) {
             // skip tombstone messages
-            return null;
+            return;
         }
 
         try {
@@ -79,18 +81,13 @@ public class KafkaDebeziumJsonDeserializationSchema
             JsonNode valueNode = objectMapper.readValue(message.value(), 
JsonNode.class);
 
             Map<String, Object> kafkaMetadata = 
KafkaActionUtils.extractKafkaMetadata(message);
-            return new CdcSourceRecord(message.topic(), keyNode, valueNode, 
kafkaMetadata);
+            out.collect(new CdcSourceRecord(message.topic(), keyNode, 
valueNode, kafkaMetadata));
         } catch (Exception e) {
             LOG.error("Invalid Json:\n{}", new String(message.value()));
             throw e;
         }
     }
 
-    @Override
-    public boolean isEndOfStream(CdcSourceRecord nextElement) {
-        return false;
-    }
-
     @Override
     public TypeInformation<CdcSourceRecord> getProducedType() {
         return getForClass(CdcSourceRecord.class);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java
index 6a5010b772..4798d9a6e7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java
@@ -44,12 +44,12 @@ import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventExcept
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
-import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -106,7 +106,7 @@ public class PaimonMetadataApplier implements 
MetadataApplier {
 
     @Override
     public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
-        return Sets.newHashSet(
+        return EnumSet.of(
                 SchemaChangeEventType.CREATE_TABLE,
                 SchemaChangeEventType.ADD_COLUMN,
                 SchemaChangeEventType.DROP_COLUMN,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
index 17277d5d7d..9c8dafc291 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
@@ -34,7 +34,8 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
 
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -63,7 +64,8 @@ public class DebeziumBsonRecordParserTest {
 
     private static final Map<String, String> keyEvent = new HashMap<>();
 
-    private static KafkaDeserializationSchema<CdcSourceRecord> 
kafkaDeserializationSchema = null;
+    private static KafkaRecordDeserializationSchema<CdcSourceRecord> 
kafkaDeserializationSchema =
+            null;
 
     private static final Map<String, String> beforeEvent = new HashMap<>();
 
@@ -259,7 +261,10 @@ public class DebeziumBsonRecordParserTest {
 
     private static CdcSourceRecord deserializeKafkaSchema(String key, String 
value)
             throws Exception {
-        return kafkaDeserializationSchema.deserialize(
-                new ConsumerRecord<>("topic", 0, 0, key.getBytes(), 
value.getBytes()));
+        List<CdcSourceRecord> results = new ArrayList<>();
+        kafkaDeserializationSchema.deserialize(
+                new ConsumerRecord<>("topic", 0, 0, key.getBytes(), 
value.getBytes()),
+                new ListCollector<>(results));
+        return results.isEmpty() ? null : results.get(0);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchemaTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchemaTest.java
index be29047ab3..578ea0ca7e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchemaTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchemaTest.java
@@ -22,11 +22,14 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
+import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Unit tests for {@link KafkaDebeziumJsonDeserializationSchema}. Ensures that 
deserialization
@@ -43,8 +46,11 @@ public class KafkaDebeziumJsonDeserializationSchemaTest {
         byte[] rawKey = "non-json-key".getBytes(StandardCharsets.UTF_8);
         byte[] jsonValue = 
"{\"after\":{\"id\":1},\"op\":\"c\"}".getBytes(StandardCharsets.UTF_8);
 
-        CdcSourceRecord record =
-                schema.deserialize(new ConsumerRecord<>("topic", 0, 0L, 
rawKey, jsonValue));
+        List<CdcSourceRecord> results = new ArrayList<>();
+        schema.deserialize(
+                new ConsumerRecord<>("topic", 0, 0L, rawKey, jsonValue),
+                new ListCollector<>(results));
+        CdcSourceRecord record = results.isEmpty() ? null : results.get(0);
 
         Assertions.assertNotNull(record, "Deserialization should succeed and 
return a record");
         Assertions.assertNull(record.getKey(), "Key should be null when the 
Kafka key is not JSON");

Reply via email to