nictownsend commented on code in PR #99:
URL:
https://github.com/apache/flink-connector-kafka/pull/99#discussion_r1618477190
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -55,6 +61,34 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
private final TypeInformation<RowData> producedTypeInfo;
private final boolean upsertMode;
+ private static Method deserializeWithAdditionalPropertiesMethod = null;
+ protected static final String IS_KEY = "IS_KEY";
+
+ protected static final String HEADERS = "HEADERS";
+
+ static {
+ initializeMethod();
+ }
+
+ protected static void initializeMethod() {
Review Comment:
Probably worth some javadoc here to explain that this is for backwards
compatibility (versions of flink that don't have the method)
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]>
record, Collector<RowData
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
- valueDeserialization.deserialize(record.value(), outputCollector);
+ doDeserialize(record, additionalParameters, outputCollector,
false);
}
keyCollector.buffer.clear();
}
+ private void doDeserialize(
+ ConsumerRecord<byte[], byte[]> record,
+ Map<String, Object> additionalParameters,
+ Collector<RowData> collector,
+ boolean isKey)
+ throws IOException {
+ if (deserializeWithAdditionalPropertiesMethod == null) {
+ if (isKey) {
+ keyDeserialization.deserialize(record.key(), collector);
+ } else {
+ valueDeserialization.deserialize(record.value(), collector);
+ }
+
+ } else {
+ additionalParameters.put(IS_KEY, isKey);
+ try {
+ if (isKey) {
+ deserializeWithAdditionalPropertiesMethod.invoke(
+ keyDeserialization, record.key(),
additionalParameters, collector);
+ } else {
+ deserializeWithAdditionalPropertiesMethod.invoke(
+ valueDeserialization, record.value(),
additionalParameters, collector);
+ }
+ } catch (IllegalAccessException e) {
Review Comment:
Syntax may be cleaner with `|` as you're handling both exceptions in the
same way
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java:
##########
@@ -36,10 +42,36 @@ public class KafkaDeserializationSchemaWrapper<T>
implements KafkaDeserializatio
private static final long serialVersionUID = 2651665280744549932L;
+ private static Method deserializeWithAdditionalPropertiesMethod = null;
+
+ protected static final String IS_KEY = "IS_KEY";
+
+ protected static final String HEADERS = "HEADERS";
+
+ static {
Review Comment:
Why is
https://github.com/apache/flink-connector-kafka/pull/99/files?file-filters%5B%5D=.java&show-viewed-files=true#diff-0a1bb10356e97bd2de4e3e8a257cf8c94290dd0aadb133d9288b6d1bc60b5994R73
different in terms of the `static` block?
You use a static block here, but a static method in
DynamicKafkaDeserializationSchema.
However, I like the idea of an interface method `initializeMethod` that
returns `Method<DeserializationSchema>` rather than `void` - the typing gives
safety that `intializeMethod` returns the Method from a class structure that
has Deserialization as the parent.
I do appreciate that the typing doesn't guarantee the method returned is one
with the correct parameters - but I think catching the runtime exception is as
good as trying to validate the method before using it
https://www.tutorialspoint.com/javareflect/javareflect_method_getparametertypes.htm
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -74,25 +113,63 @@ class DynamicKafkaRecordSerializationSchema implements
KafkaRecordSerializationS
@Override
public ProducerRecord<byte[], byte[]> serialize(
RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+
+ // keeping the metadataHeaders maps for keys and values separate so
these maps
+ // are output only for the methods; avoiding changing the inputs.
+ // define the input map
+ Map<String, Object> inputMap = new HashMap<>();
+
+ // define the output maps, it is 2 levels deep in case we need to add
new information in
+ // the future
+ Map<String, Object> outputKeyMap = new HashMap<>();
+ Map<String, Object> outputValueMap = new HashMap<>();
+
+ inputMap.put(TOPIC_NAME, topic);
+
// shortcut in case no input projection is required
if (keySerialization == null && !hasMetadata) {
- final byte[] valueSerialized =
valueSerialization.serialize(consumedRow);
- return new ProducerRecord<>(
- topic,
- extractPartition(
- consumedRow,
- null,
- valueSerialized,
- context.getPartitionsForTopic(topic)),
- null,
- valueSerialized);
+ byte[] valueSerialized = getSerialized(consumedRow, inputMap,
outputValueMap, false);
+ if (serializeWithAdditionalPropertiesMethod == null) {
Review Comment:
This logic seems odd - you check for
`serializeWithAdditionalPropertiesMethod` but then you do not use it in the
`else` block.
It could be re-written to be cleaner regardless - the logic is only about
adding headers, so you can put the header generation in an if block and then
create ProducerRecord later in both cases.
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java:
##########
@@ -55,7 +87,25 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record)
throws Exception {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> message,
Collector<T> out)
throws Exception {
- deserializationSchema.deserialize(message.value(), out);
+ Map<String, Object> headersMapFromKafka = new HashMap<>();
+ for (Header header : message.headers()) {
+ headersMapFromKafka.put(header.key(), header.value());
+ }
+ if (deserializeWithAdditionalPropertiesMethod == null) {
+ deserializationSchema.deserialize(message.value(), out);
+ } else {
+ Map<String, Object> additionalParameters = new HashMap<>();
+ additionalParameters.put(IS_KEY, false);
+ additionalParameters.put(HEADERS, headersMapFromKafka);
+ try {
+ deserializeWithAdditionalPropertiesMethod.invoke(
Review Comment:
This seems inconsistent, in some places you call
`deserializeWithAdditionalPropertiesMethod.invoke()` with 4 parameters, but
here it's just three.
I _think_ this call means you're invoking the method on `message.value()`
(which won't have that method)
##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.IntType;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for DynamicKafkaRecordSerializationSchema. */
+public class DynamicKafkaRecordSerializationSchemaTest {
+
+ @Test
+ public void testserializewithoutMethod() {
+
+
DynamicKafkaRecordSerializationSchema.nullifySerializeWithAdditionalPropertiesMethod();
+ try {
+ FlinkKafkaPartitioner<RowData> partitioner = null;
+ MockSerializationSchema<RowData> keySerialization = null;
+
+ RowData.FieldGetter[] keyFieldGetters = null;
+
+ keySerialization = new MockSerializationSchema<>(true);
+ RowData.FieldGetter keyFieldGetter = RowData.createFieldGetter(new
IntType(), 0);
+ keyFieldGetters = new RowData.FieldGetter[] {keyFieldGetter};
+
+ MockSerializationSchema<RowData> valueSerialization =
+ new MockSerializationSchema<>(false);
+ RowData.FieldGetter valueFieldGetter =
RowData.createFieldGetter(new IntType(), 0);
+ RowData.FieldGetter[] valueFieldGetters = {valueFieldGetter};
+
+ DynamicKafkaRecordSerializationSchema
dynamicKafkaRecordSerializationSchema =
+ new DynamicKafkaRecordSerializationSchema(
+ "test",
+ partitioner,
+ keySerialization,
+ valueSerialization,
+ keyFieldGetters,
+ valueFieldGetters,
+ false,
+ new int[] {},
+ false);
+
+ RowData consumedRow = new GenericRowData(1);
+ KafkaRecordSerializationSchema.KafkaSinkContext context =
+ new KafkaRecordSerializationSchema.KafkaSinkContext() {
+ @Override
+ public int getParallelInstanceId() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfParallelInstances() {
+ return 0;
+ }
+
+ @Override
+ public int[] getPartitionsForTopic(String topic) {
+ return new int[0];
+ }
+ };
+
+ Long timestamp = null;
+ final ProducerRecord<byte[], byte[]> producerRecord =
+ dynamicKafkaRecordSerializationSchema.serialize(
+ consumedRow, context, timestamp);
+ Headers headers = producerRecord.headers();
+ assertThat(headers).isEmpty();
+ } finally {
+ // ensure the method is present after this test.
+ DynamicKafkaRecordSerializationSchema.initializeMethod();
Review Comment:
Similar, I don't think it's good practice to use `finally` as a
`beforeEach/afterEach`
##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Optional;
+
+/** test. */
+public class DynamicKafkaDeserializationSchemaTest {
+
+ @ParameterizedTest
+ @MethodSource("configProvider")
+ public void testdeserialize(TestSpec testSpec) {
+ // ensure we have the method.
+ DynamicKafkaRecordSerializationSchema.initializeMethod();
+ MockDeserializationSchema<RowData> keyDeserialization = null;
+ if (!testSpec.valueOnly) {
+ keyDeserialization = new MockDeserializationSchema<>(true,
testSpec.providesHeaders);
+ }
+ MockDeserializationSchema<RowData> valueDeserialization =
+ new MockDeserializationSchema<>(false,
testSpec.providesHeaders);
+
+ int[] keyProjection = {};
+ if (!testSpec.valueOnly) {
+ keyProjection = new int[] {0, 1};
+ }
+ int[] valueProjection = {2, 3};
+ TypeInformation producedTypeInfo = null;
+ DynamicKafkaDeserializationSchema
dynamicKafkaRecordDeserializationSchema =
+ new DynamicKafkaDeserializationSchema(
+ 4,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ valueProjection,
+ false,
+ null,
+ producedTypeInfo,
+ false);
+ byte[] key = {};
+ if (!testSpec.valueOnly) {
+ key = new byte[] {0};
+ }
+ byte[] value = {0};
+ ConsumerRecord<byte[], byte[]> message =
+ new ConsumerRecord<>(
+ "",
+ 0,
+ 1L,
+ 1L,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ testSpec.valueOnly ? 0 : 1,
+ 1,
+ key,
+ value,
+ new RecordHeaders(),
+ Optional.empty());
+ try {
+ dynamicKafkaRecordDeserializationSchema.deserialize(message, null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
Review Comment:
You can use
https://junit.org/junit5/docs/5.0.1/api/org/junit/jupiter/api/Assertions.html#fail-java.lang.Throwable-
`fail(cause)` to fail the junit test - rather than wrapping and rethrowing
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]>
record, Collector<RowData
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
- valueDeserialization.deserialize(record.value(), outputCollector);
+ doDeserialize(record, additionalParameters, outputCollector,
false);
}
keyCollector.buffer.clear();
}
+ private void doDeserialize(
+ ConsumerRecord<byte[], byte[]> record,
+ Map<String, Object> additionalParameters,
+ Collector<RowData> collector,
+ boolean isKey)
+ throws IOException {
+ if (deserializeWithAdditionalPropertiesMethod == null) {
+ if (isKey) {
+ keyDeserialization.deserialize(record.key(), collector);
+ } else {
+ valueDeserialization.deserialize(record.value(), collector);
+ }
+
+ } else {
+ additionalParameters.put(IS_KEY, isKey);
+ try {
+ if (isKey) {
+ deserializeWithAdditionalPropertiesMethod.invoke(
+ keyDeserialization, record.key(),
additionalParameters, collector);
+ } else {
+ deserializeWithAdditionalPropertiesMethod.invoke(
+ valueDeserialization, record.value(),
additionalParameters, collector);
+ }
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void performDeserialiserialize(
Review Comment:
```suggestion
private void performDeserialize(
```
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]>
record, Collector<RowData
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
- valueDeserialization.deserialize(record.value(), outputCollector);
+ doDeserialize(record, additionalParameters, outputCollector,
false);
}
keyCollector.buffer.clear();
}
+ private void doDeserialize(
+ ConsumerRecord<byte[], byte[]> record,
+ Map<String, Object> additionalParameters,
+ Collector<RowData> collector,
+ boolean isKey)
+ throws IOException {
+ if (deserializeWithAdditionalPropertiesMethod == null) {
+ if (isKey) {
+ keyDeserialization.deserialize(record.key(), collector);
Review Comment:
`keyDeserialization` can be null (looks like other places in the file are
explicitly null checking before access)
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]>
record, Collector<RowData
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
- valueDeserialization.deserialize(record.value(), outputCollector);
+ doDeserialize(record, additionalParameters, outputCollector,
false);
}
keyCollector.buffer.clear();
}
+ private void doDeserialize(
+ ConsumerRecord<byte[], byte[]> record,
+ Map<String, Object> additionalParameters,
+ Collector<RowData> collector,
+ boolean isKey)
+ throws IOException {
+ if (deserializeWithAdditionalPropertiesMethod == null) {
+ if (isKey) {
+ keyDeserialization.deserialize(record.key(), collector);
+ } else {
+ valueDeserialization.deserialize(record.value(), collector);
+ }
+
+ } else {
+ additionalParameters.put(IS_KEY, isKey);
+ try {
+ if (isKey) {
+ deserializeWithAdditionalPropertiesMethod.invoke(
+ keyDeserialization, record.key(),
additionalParameters, collector);
+ } else {
+ deserializeWithAdditionalPropertiesMethod.invoke(
+ valueDeserialization, record.value(),
additionalParameters, collector);
+ }
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void performDeserialiserialize(
Review Comment:
Also, I can't find a reference to this `private` method, should it be
deleted?
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -74,25 +113,63 @@ class DynamicKafkaRecordSerializationSchema implements
KafkaRecordSerializationS
@Override
public ProducerRecord<byte[], byte[]> serialize(
RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+
+ // keeping the metadataHeaders maps for keys and values separate so
these maps
+ // are output only for the methods; avoiding changing the inputs.
+ // define the input map
+ Map<String, Object> inputMap = new HashMap<>();
+
+ // define the output maps, it is 2 levels deep in case we need to add
new information in
+ // the future
+ Map<String, Object> outputKeyMap = new HashMap<>();
+ Map<String, Object> outputValueMap = new HashMap<>();
+
+ inputMap.put(TOPIC_NAME, topic);
+
// shortcut in case no input projection is required
if (keySerialization == null && !hasMetadata) {
- final byte[] valueSerialized =
valueSerialization.serialize(consumedRow);
- return new ProducerRecord<>(
- topic,
- extractPartition(
- consumedRow,
- null,
- valueSerialized,
- context.getPartitionsForTopic(topic)),
- null,
- valueSerialized);
+ byte[] valueSerialized = getSerialized(consumedRow, inputMap,
outputValueMap, false);
+ if (serializeWithAdditionalPropertiesMethod == null) {
+ return new ProducerRecord<>(
+ topic,
+ extractPartition(
+ consumedRow,
+ null,
+ valueSerialized,
+ context.getPartitionsForTopic(topic)),
+ null,
+ valueSerialized);
+ } else {
+ List<Header> headers = new ArrayList<>();
+ Map<String, Object> headersToAddForValue =
+ (Map<String, Object>) outputValueMap.get(HEADERS);
+ if (!MapUtils.isEmpty(headersToAddForValue)) {
+ for (String headerKey : headersToAddForValue.keySet()) {
+ KafkaDynamicSink.KafkaHeader kafkaHeader =
+ new KafkaDynamicSink.KafkaHeader(
+ headerKey, (byte[])
headersToAddForValue.get(headerKey));
+ headers.add(kafkaHeader);
+ }
+ }
+ return new ProducerRecord<>(
+ topic,
+ extractPartition(
+ consumedRow,
+ null,
+ valueSerialized,
+ context.getPartitionsForTopic(topic)),
+ null, // timestamp will be current time
+ null,
+ valueSerialized,
+ headers);
+ }
Review Comment:
```suggestion
List<Header> headers = new ArrayList<>();
if (serializeWithAdditionalPropertiesMethod != null) {
Map<String, Object> headersToAddForValue =
(Map<String, Object>) outputValueMap.get(HEADERS);
if (!MapUtils.isEmpty(headersToAddForValue)) {
for (String headerKey : headersToAddForValue.keySet()) {
KafkaDynamicSink.KafkaHeader kafkaHeader =
new KafkaDynamicSink.KafkaHeader(
headerKey, (byte[])
headersToAddForValue.get(headerKey));
headers.add(kafkaHeader);
}
}
return new ProducerRecord<>(
topic,
extractPartition(
consumedRow,
null,
valueSerialized,
context.getPartitionsForTopic(topic)),
null, // timestamp will be current time
null,
valueSerialized,
headers);
}
```
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord<byte[], byte[]>
record, Collector<RowData
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
- valueDeserialization.deserialize(record.value(), outputCollector);
+ doDeserialize(record, additionalParameters, outputCollector,
false);
}
keyCollector.buffer.clear();
}
+ private void doDeserialize(
+ ConsumerRecord<byte[], byte[]> record,
+ Map<String, Object> additionalParameters,
+ Collector<RowData> collector,
+ boolean isKey)
+ throws IOException {
+ if (deserializeWithAdditionalPropertiesMethod == null) {
+ if (isKey) {
+ keyDeserialization.deserialize(record.key(), collector);
+ } else {
+ valueDeserialization.deserialize(record.value(), collector);
+ }
+
+ } else {
+ additionalParameters.put(IS_KEY, isKey);
Review Comment:
Safe to mutate the input, or better to copy into a new collection and then
add `IS_KEY`?
##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Optional;
+
+/** test. */
+public class DynamicKafkaDeserializationSchemaTest {
+
+ @ParameterizedTest
+ @MethodSource("configProvider")
+ public void testdeserialize(TestSpec testSpec) {
+ // ensure we have the method.
+ DynamicKafkaRecordSerializationSchema.initializeMethod();
+ MockDeserializationSchema<RowData> keyDeserialization = null;
+ if (!testSpec.valueOnly) {
+ keyDeserialization = new MockDeserializationSchema<>(true,
testSpec.providesHeaders);
+ }
+ MockDeserializationSchema<RowData> valueDeserialization =
+ new MockDeserializationSchema<>(false,
testSpec.providesHeaders);
+
+ int[] keyProjection = {};
+ if (!testSpec.valueOnly) {
+ keyProjection = new int[] {0, 1};
+ }
+ int[] valueProjection = {2, 3};
+ TypeInformation producedTypeInfo = null;
+ DynamicKafkaDeserializationSchema
dynamicKafkaRecordDeserializationSchema =
+ new DynamicKafkaDeserializationSchema(
+ 4,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ valueProjection,
+ false,
+ null,
+ producedTypeInfo,
+ false);
+ byte[] key = {};
+ if (!testSpec.valueOnly) {
+ key = new byte[] {0};
+ }
+ byte[] value = {0};
+ ConsumerRecord<byte[], byte[]> message =
+ new ConsumerRecord<>(
+ "",
+ 0,
+ 1L,
+ 1L,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ testSpec.valueOnly ? 0 : 1,
+ 1,
+ key,
+ value,
+ new RecordHeaders(),
+ Optional.empty());
+ try {
+ dynamicKafkaRecordDeserializationSchema.deserialize(message, null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static Collection<TestSpec> configProvider() {
+ return
ImmutableList.<TestSpec>builder().addAll(getValidTestSpecs()).build();
+ }
+
+ @Test
+ public void testdeserializeWithoutMethod() {
+
+ // ensure we have the method.
+
DynamicKafkaRecordSerializationSchema.nullifySerializeWithAdditionalPropertiesMethod();
+ try {
+ FlinkKafkaPartitioner<RowData> partitioner = null;
+ MockDeserializationSchema<RowData> keyDeserialization =
+ new MockDeserializationSchema<>(true, false);
+ MockDeserializationSchema<RowData> valueDeserialization =
+ new MockDeserializationSchema<>(false, false);
+
+ RowData.FieldGetter[] keyFieldGetters = null;
+
+ int[] keyProjection = {0, 1};
+ int[] valueProjection = {2, 3};
+ TypeInformation producedTypeInfo = null;
+ DynamicKafkaDeserializationSchema
dynamicKafkaRecordDeserializationSchema =
+ new DynamicKafkaDeserializationSchema(
+ 4,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ valueProjection,
+ false,
+ null,
+ producedTypeInfo,
+ false);
+ byte[] key = {0};
+ byte[] value = {0};
+ ConsumerRecord<byte[], byte[]> message =
+ new ConsumerRecord<>(
+ "",
+ 0,
+ 1L,
+ 1L,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ 1,
+ 1,
+ key,
+ value,
+ new RecordHeaders(),
+ Optional.empty());
+ try {
+ dynamicKafkaRecordDeserializationSchema.deserialize(message,
null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } finally {
+ DynamicKafkaRecordSerializationSchema.initializeMethod();
Review Comment:
Why are you using `finally` here?
This looks like a side effect on a static object for the test - would it not
be better under a `beforeEach()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]