nictownsend commented on code in PR #99:
URL: 
https://github.com/apache/flink-connector-kafka/pull/99#discussion_r1618477190


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -55,6 +61,34 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
     private final TypeInformation<RowData> producedTypeInfo;
 
     private final boolean upsertMode;
+    private static Method deserializeWithAdditionalPropertiesMethod = null;
+    protected static final String IS_KEY = "IS_KEY";
+
+    protected static final String HEADERS = "HEADERS";
+
+    static {
+        initializeMethod();
+    }
+
+    protected static void initializeMethod() {

Review Comment:
   Probably worth some javadoc here to explain that this is for backwards 
compatibility (versions of flink that don't have the method)



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]> 
record, Collector<RowData
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(record.value(), outputCollector);
+            doDeserialize(record, additionalParameters, outputCollector, 
false);
         }
         keyCollector.buffer.clear();
     }
 
+    private void doDeserialize(
+            ConsumerRecord<byte[], byte[]> record,
+            Map<String, Object> additionalParameters,
+            Collector<RowData> collector,
+            boolean isKey)
+            throws IOException {
+        if (deserializeWithAdditionalPropertiesMethod == null) {
+            if (isKey) {
+                keyDeserialization.deserialize(record.key(), collector);
+            } else {
+                valueDeserialization.deserialize(record.value(), collector);
+            }
+
+        } else {
+            additionalParameters.put(IS_KEY, isKey);
+            try {
+                if (isKey) {
+                    deserializeWithAdditionalPropertiesMethod.invoke(
+                            keyDeserialization, record.key(), 
additionalParameters, collector);
+                } else {
+                    deserializeWithAdditionalPropertiesMethod.invoke(
+                            valueDeserialization, record.value(), 
additionalParameters, collector);
+                }
+            } catch (IllegalAccessException e) {

Review Comment:
   Syntax may be cleaner with `|` as you're handling both exceptions in the 
same way



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java:
##########
@@ -36,10 +42,36 @@ public class KafkaDeserializationSchemaWrapper<T> 
implements KafkaDeserializatio
 
     private static final long serialVersionUID = 2651665280744549932L;
 
+    private static Method deserializeWithAdditionalPropertiesMethod = null;
+
+    protected static final String IS_KEY = "IS_KEY";
+
+    protected static final String HEADERS = "HEADERS";
+
+    static {

Review Comment:
   Why is 
https://github.com/apache/flink-connector-kafka/pull/99/files?file-filters%5B%5D=.java&show-viewed-files=true#diff-0a1bb10356e97bd2de4e3e8a257cf8c94290dd0aadb133d9288b6d1bc60b5994R73
 different in terms of the `static` block?
   
   You use a static block here, but a static method in 
DynamicKafkaDeserializationSchema.
   
   However, I like the idea of an interface method `initializeMethod` that 
returns `Method<DeserializationSchema>` rather than `void` - the typing gives 
safety that `intializeMethod` returns the Method from a class structure that 
has Deserialization as the parent.
   
   I do appreciate that the typing doesn't guarantee the method returned is one 
with the correct parameters - but I think catching the runtime exception is as 
good as trying to validate the method before using it 
https://www.tutorialspoint.com/javareflect/javareflect_method_getparametertypes.htm



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -74,25 +113,63 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
     @Override
     public ProducerRecord<byte[], byte[]> serialize(
             RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+
+        // keeping the metadataHeaders maps for keys and values separate so 
these maps
+        // are output only for the methods; avoiding changing the inputs.
+        // define the input map
+        Map<String, Object> inputMap = new HashMap<>();
+
+        // define the output maps, it is 2 levels deep in case we need to add 
new information in
+        // the future
+        Map<String, Object> outputKeyMap = new HashMap<>();
+        Map<String, Object> outputValueMap = new HashMap<>();
+
+        inputMap.put(TOPIC_NAME, topic);
+
         // shortcut in case no input projection is required
         if (keySerialization == null && !hasMetadata) {
-            final byte[] valueSerialized = 
valueSerialization.serialize(consumedRow);
-            return new ProducerRecord<>(
-                    topic,
-                    extractPartition(
-                            consumedRow,
-                            null,
-                            valueSerialized,
-                            context.getPartitionsForTopic(topic)),
-                    null,
-                    valueSerialized);
+            byte[] valueSerialized = getSerialized(consumedRow, inputMap, 
outputValueMap, false);
+            if (serializeWithAdditionalPropertiesMethod == null) {

Review Comment:
   This logic seems odd - you check for 
`serializeWithAdditionalPropertiesMethod` but then you do not use it in the 
`else` block.
   
   It could be re-written to be cleaner regardless - the logic is only about 
adding headers, so you can put the header generation in an if  block and then 
create ProducerRecord later in both cases.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java:
##########
@@ -55,7 +87,25 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) 
throws Exception {
     @Override
     public void deserialize(ConsumerRecord<byte[], byte[]> message, 
Collector<T> out)
             throws Exception {
-        deserializationSchema.deserialize(message.value(), out);
+        Map<String, Object> headersMapFromKafka = new HashMap<>();
+        for (Header header : message.headers()) {
+            headersMapFromKafka.put(header.key(), header.value());
+        }
+        if (deserializeWithAdditionalPropertiesMethod == null) {
+            deserializationSchema.deserialize(message.value(), out);
+        } else {
+            Map<String, Object> additionalParameters = new HashMap<>();
+            additionalParameters.put(IS_KEY, false);
+            additionalParameters.put(HEADERS, headersMapFromKafka);
+            try {
+                deserializeWithAdditionalPropertiesMethod.invoke(

Review Comment:
   This seems inconsistent, in some places you call 
`deserializeWithAdditionalPropertiesMethod.invoke()` with 4 parameters, but 
here it's just three. 
   
   I _think_ this call means you're invoking the method on `message.value()` 
(which won't have that method)



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.IntType;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for DynamicKafkaRecordSerializationSchema. */
+public class DynamicKafkaRecordSerializationSchemaTest {
+
+    @Test
+    public void testserializewithoutMethod() {
+
+        
DynamicKafkaRecordSerializationSchema.nullifySerializeWithAdditionalPropertiesMethod();
+        try {
+            FlinkKafkaPartitioner<RowData> partitioner = null;
+            MockSerializationSchema<RowData> keySerialization = null;
+
+            RowData.FieldGetter[] keyFieldGetters = null;
+
+            keySerialization = new MockSerializationSchema<>(true);
+            RowData.FieldGetter keyFieldGetter = RowData.createFieldGetter(new 
IntType(), 0);
+            keyFieldGetters = new RowData.FieldGetter[] {keyFieldGetter};
+
+            MockSerializationSchema<RowData> valueSerialization =
+                    new MockSerializationSchema<>(false);
+            RowData.FieldGetter valueFieldGetter = 
RowData.createFieldGetter(new IntType(), 0);
+            RowData.FieldGetter[] valueFieldGetters = {valueFieldGetter};
+
+            DynamicKafkaRecordSerializationSchema 
dynamicKafkaRecordSerializationSchema =
+                    new DynamicKafkaRecordSerializationSchema(
+                            "test",
+                            partitioner,
+                            keySerialization,
+                            valueSerialization,
+                            keyFieldGetters,
+                            valueFieldGetters,
+                            false,
+                            new int[] {},
+                            false);
+
+            RowData consumedRow = new GenericRowData(1);
+            KafkaRecordSerializationSchema.KafkaSinkContext context =
+                    new KafkaRecordSerializationSchema.KafkaSinkContext() {
+                        @Override
+                        public int getParallelInstanceId() {
+                            return 0;
+                        }
+
+                        @Override
+                        public int getNumberOfParallelInstances() {
+                            return 0;
+                        }
+
+                        @Override
+                        public int[] getPartitionsForTopic(String topic) {
+                            return new int[0];
+                        }
+                    };
+
+            Long timestamp = null;
+            final ProducerRecord<byte[], byte[]> producerRecord =
+                    dynamicKafkaRecordSerializationSchema.serialize(
+                            consumedRow, context, timestamp);
+            Headers headers = producerRecord.headers();
+            assertThat(headers).isEmpty();
+        } finally {
+            // ensure the method is present after this test.
+            DynamicKafkaRecordSerializationSchema.initializeMethod();

Review Comment:
   Similar, I don't think it's good practice to use `finally` as a 
`beforeEach/afterEach`



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Optional;
+
+/** test. */
+public class DynamicKafkaDeserializationSchemaTest {
+
+    @ParameterizedTest
+    @MethodSource("configProvider")
+    public void testdeserialize(TestSpec testSpec) {
+        // ensure we have the method.
+        DynamicKafkaRecordSerializationSchema.initializeMethod();
+        MockDeserializationSchema<RowData> keyDeserialization = null;
+        if (!testSpec.valueOnly) {
+            keyDeserialization = new MockDeserializationSchema<>(true, 
testSpec.providesHeaders);
+        }
+        MockDeserializationSchema<RowData> valueDeserialization =
+                new MockDeserializationSchema<>(false, 
testSpec.providesHeaders);
+
+        int[] keyProjection = {};
+        if (!testSpec.valueOnly) {
+            keyProjection = new int[] {0, 1};
+        }
+        int[] valueProjection = {2, 3};
+        TypeInformation producedTypeInfo = null;
+        DynamicKafkaDeserializationSchema 
dynamicKafkaRecordDeserializationSchema =
+                new DynamicKafkaDeserializationSchema(
+                        4,
+                        keyDeserialization,
+                        keyProjection,
+                        valueDeserialization,
+                        valueProjection,
+                        false,
+                        null,
+                        producedTypeInfo,
+                        false);
+        byte[] key = {};
+        if (!testSpec.valueOnly) {
+            key = new byte[] {0};
+        }
+        byte[] value = {0};
+        ConsumerRecord<byte[], byte[]> message =
+                new ConsumerRecord<>(
+                        "",
+                        0,
+                        1L,
+                        1L,
+                        TimestampType.NO_TIMESTAMP_TYPE,
+                        testSpec.valueOnly ? 0 : 1,
+                        1,
+                        key,
+                        value,
+                        new RecordHeaders(),
+                        Optional.empty());
+        try {
+            dynamicKafkaRecordDeserializationSchema.deserialize(message, null);
+        } catch (Exception e) {
+            throw new RuntimeException(e);

Review Comment:
   You can use 
https://junit.org/junit5/docs/5.0.1/api/org/junit/jupiter/api/Assertions.html#fail-java.lang.Throwable-
 `fail(cause)` to fail the junit test - rather than wrapping and rethrowing



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]> 
record, Collector<RowData
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(record.value(), outputCollector);
+            doDeserialize(record, additionalParameters, outputCollector, 
false);
         }
         keyCollector.buffer.clear();
     }
 
+    private void doDeserialize(
+            ConsumerRecord<byte[], byte[]> record,
+            Map<String, Object> additionalParameters,
+            Collector<RowData> collector,
+            boolean isKey)
+            throws IOException {
+        if (deserializeWithAdditionalPropertiesMethod == null) {
+            if (isKey) {
+                keyDeserialization.deserialize(record.key(), collector);
+            } else {
+                valueDeserialization.deserialize(record.value(), collector);
+            }
+
+        } else {
+            additionalParameters.put(IS_KEY, isKey);
+            try {
+                if (isKey) {
+                    deserializeWithAdditionalPropertiesMethod.invoke(
+                            keyDeserialization, record.key(), 
additionalParameters, collector);
+                } else {
+                    deserializeWithAdditionalPropertiesMethod.invoke(
+                            valueDeserialization, record.value(), 
additionalParameters, collector);
+                }
+            } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+            } catch (InvocationTargetException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void performDeserialiserialize(

Review Comment:
   ```suggestion
       private void performDeserialize(
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]> 
record, Collector<RowData
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(record.value(), outputCollector);
+            doDeserialize(record, additionalParameters, outputCollector, 
false);
         }
         keyCollector.buffer.clear();
     }
 
+    private void doDeserialize(
+            ConsumerRecord<byte[], byte[]> record,
+            Map<String, Object> additionalParameters,
+            Collector<RowData> collector,
+            boolean isKey)
+            throws IOException {
+        if (deserializeWithAdditionalPropertiesMethod == null) {
+            if (isKey) {
+                keyDeserialization.deserialize(record.key(), collector);

Review Comment:
   `keyDeserialization` can be null (looks like other places in the file are 
explicitly null checking before access)



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]> 
record, Collector<RowData
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(record.value(), outputCollector);
+            doDeserialize(record, additionalParameters, outputCollector, 
false);
         }
         keyCollector.buffer.clear();
     }
 
+    private void doDeserialize(
+            ConsumerRecord<byte[], byte[]> record,
+            Map<String, Object> additionalParameters,
+            Collector<RowData> collector,
+            boolean isKey)
+            throws IOException {
+        if (deserializeWithAdditionalPropertiesMethod == null) {
+            if (isKey) {
+                keyDeserialization.deserialize(record.key(), collector);
+            } else {
+                valueDeserialization.deserialize(record.value(), collector);
+            }
+
+        } else {
+            additionalParameters.put(IS_KEY, isKey);
+            try {
+                if (isKey) {
+                    deserializeWithAdditionalPropertiesMethod.invoke(
+                            keyDeserialization, record.key(), 
additionalParameters, collector);
+                } else {
+                    deserializeWithAdditionalPropertiesMethod.invoke(
+                            valueDeserialization, record.value(), 
additionalParameters, collector);
+                }
+            } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+            } catch (InvocationTargetException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void performDeserialiserialize(

Review Comment:
   Also, I can't find a reference to this `private` method, should it be 
deleted?



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -74,25 +113,63 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
     @Override
     public ProducerRecord<byte[], byte[]> serialize(
             RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+
+        // keeping the metadataHeaders maps for keys and values separate so 
these maps
+        // are output only for the methods; avoiding changing the inputs.
+        // define the input map
+        Map<String, Object> inputMap = new HashMap<>();
+
+        // define the output maps, it is 2 levels deep in case we need to add 
new information in
+        // the future
+        Map<String, Object> outputKeyMap = new HashMap<>();
+        Map<String, Object> outputValueMap = new HashMap<>();
+
+        inputMap.put(TOPIC_NAME, topic);
+
         // shortcut in case no input projection is required
         if (keySerialization == null && !hasMetadata) {
-            final byte[] valueSerialized = 
valueSerialization.serialize(consumedRow);
-            return new ProducerRecord<>(
-                    topic,
-                    extractPartition(
-                            consumedRow,
-                            null,
-                            valueSerialized,
-                            context.getPartitionsForTopic(topic)),
-                    null,
-                    valueSerialized);
+            byte[] valueSerialized = getSerialized(consumedRow, inputMap, 
outputValueMap, false);
+            if (serializeWithAdditionalPropertiesMethod == null) {
+                return new ProducerRecord<>(
+                        topic,
+                        extractPartition(
+                                consumedRow,
+                                null,
+                                valueSerialized,
+                                context.getPartitionsForTopic(topic)),
+                        null,
+                        valueSerialized);
+            } else {
+                List<Header> headers = new ArrayList<>();
+                Map<String, Object> headersToAddForValue =
+                        (Map<String, Object>) outputValueMap.get(HEADERS);
+                if (!MapUtils.isEmpty(headersToAddForValue)) {
+                    for (String headerKey : headersToAddForValue.keySet()) {
+                        KafkaDynamicSink.KafkaHeader kafkaHeader =
+                                new KafkaDynamicSink.KafkaHeader(
+                                        headerKey, (byte[]) 
headersToAddForValue.get(headerKey));
+                        headers.add(kafkaHeader);
+                    }
+                }
+                return new ProducerRecord<>(
+                        topic,
+                        extractPartition(
+                                consumedRow,
+                                null,
+                                valueSerialized,
+                                context.getPartitionsForTopic(topic)),
+                        null, // timestamp will be current time
+                        null,
+                        valueSerialized,
+                        headers);
+            }

Review Comment:
   ```suggestion
   
                   List<Header> headers = new ArrayList<>();
               if (serializeWithAdditionalPropertiesMethod != null) {
                  Map<String, Object> headersToAddForValue =
                           (Map<String, Object>) outputValueMap.get(HEADERS);
                   if (!MapUtils.isEmpty(headersToAddForValue)) {
                       for (String headerKey : headersToAddForValue.keySet()) {
                           KafkaDynamicSink.KafkaHeader kafkaHeader =
                                   new KafkaDynamicSink.KafkaHeader(
                                           headerKey, (byte[]) 
headersToAddForValue.get(headerKey));
                           headers.add(kafkaHeader);
                       }
                   }
                  
                   return new ProducerRecord<>(
                           topic,
                           extractPartition(
                                   consumedRow,
                                   null,
                                   valueSerialized,
                                   context.getPartitionsForTopic(topic)),
                           null, // timestamp will be current time
                           null,
                           valueSerialized,
                           headers);
               }
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]> 
record, Collector<RowData
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(record.value(), outputCollector);
+            doDeserialize(record, additionalParameters, outputCollector, 
false);
         }
         keyCollector.buffer.clear();
     }
 
+    private void doDeserialize(
+            ConsumerRecord<byte[], byte[]> record,
+            Map<String, Object> additionalParameters,
+            Collector<RowData> collector,
+            boolean isKey)
+            throws IOException {
+        if (deserializeWithAdditionalPropertiesMethod == null) {
+            if (isKey) {
+                keyDeserialization.deserialize(record.key(), collector);
+            } else {
+                valueDeserialization.deserialize(record.value(), collector);
+            }
+
+        } else {
+            additionalParameters.put(IS_KEY, isKey);

Review Comment:
   Safe to mutate the input, or better to copy into a new collection and then 
add `IS_KEY`?



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Optional;
+
+/** test. */
+public class DynamicKafkaDeserializationSchemaTest {
+
+    @ParameterizedTest
+    @MethodSource("configProvider")
+    public void testdeserialize(TestSpec testSpec) {
+        // ensure we have the method.
+        DynamicKafkaRecordSerializationSchema.initializeMethod();
+        MockDeserializationSchema<RowData> keyDeserialization = null;
+        if (!testSpec.valueOnly) {
+            keyDeserialization = new MockDeserializationSchema<>(true, 
testSpec.providesHeaders);
+        }
+        MockDeserializationSchema<RowData> valueDeserialization =
+                new MockDeserializationSchema<>(false, 
testSpec.providesHeaders);
+
+        int[] keyProjection = {};
+        if (!testSpec.valueOnly) {
+            keyProjection = new int[] {0, 1};
+        }
+        int[] valueProjection = {2, 3};
+        TypeInformation producedTypeInfo = null;
+        DynamicKafkaDeserializationSchema 
dynamicKafkaRecordDeserializationSchema =
+                new DynamicKafkaDeserializationSchema(
+                        4,
+                        keyDeserialization,
+                        keyProjection,
+                        valueDeserialization,
+                        valueProjection,
+                        false,
+                        null,
+                        producedTypeInfo,
+                        false);
+        byte[] key = {};
+        if (!testSpec.valueOnly) {
+            key = new byte[] {0};
+        }
+        byte[] value = {0};
+        ConsumerRecord<byte[], byte[]> message =
+                new ConsumerRecord<>(
+                        "",
+                        0,
+                        1L,
+                        1L,
+                        TimestampType.NO_TIMESTAMP_TYPE,
+                        testSpec.valueOnly ? 0 : 1,
+                        1,
+                        key,
+                        value,
+                        new RecordHeaders(),
+                        Optional.empty());
+        try {
+            dynamicKafkaRecordDeserializationSchema.deserialize(message, null);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static Collection<TestSpec> configProvider() {
+        return 
ImmutableList.<TestSpec>builder().addAll(getValidTestSpecs()).build();
+    }
+
+    @Test
+    public void testdeserializeWithoutMethod() {
+
+        // ensure we have the method.
+        
DynamicKafkaRecordSerializationSchema.nullifySerializeWithAdditionalPropertiesMethod();
+        try {
+            FlinkKafkaPartitioner<RowData> partitioner = null;
+            MockDeserializationSchema<RowData> keyDeserialization =
+                    new MockDeserializationSchema<>(true, false);
+            MockDeserializationSchema<RowData> valueDeserialization =
+                    new MockDeserializationSchema<>(false, false);
+
+            RowData.FieldGetter[] keyFieldGetters = null;
+
+            int[] keyProjection = {0, 1};
+            int[] valueProjection = {2, 3};
+            TypeInformation producedTypeInfo = null;
+            DynamicKafkaDeserializationSchema 
dynamicKafkaRecordDeserializationSchema =
+                    new DynamicKafkaDeserializationSchema(
+                            4,
+                            keyDeserialization,
+                            keyProjection,
+                            valueDeserialization,
+                            valueProjection,
+                            false,
+                            null,
+                            producedTypeInfo,
+                            false);
+            byte[] key = {0};
+            byte[] value = {0};
+            ConsumerRecord<byte[], byte[]> message =
+                    new ConsumerRecord<>(
+                            "",
+                            0,
+                            1L,
+                            1L,
+                            TimestampType.NO_TIMESTAMP_TYPE,
+                            1,
+                            1,
+                            key,
+                            value,
+                            new RecordHeaders(),
+                            Optional.empty());
+            try {
+                dynamicKafkaRecordDeserializationSchema.deserialize(message, 
null);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        } finally {
+            DynamicKafkaRecordSerializationSchema.initializeMethod();

Review Comment:
   Why are you using `finally` here?
   
   This looks like a side effect on a static object for the test - would it not 
be better under a `beforeEach()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to