This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 5105d48 [FLINK-33176][Connectors/Kinesis] Handle null value in
RowDataKinesisDeserializationSchema
5105d48 is described below
commit 5105d48bf7334503b314d188462c3542f438f1d6
Author: Aleksandr Pilipenko <[email protected]>
AuthorDate: Mon Oct 2 19:23:25 2023 +0100
[FLINK-33176][Connectors/Kinesis] Handle null value in
RowDataKinesisDeserializationSchema
---
.../table/RowDataKinesisDeserializationSchema.java | 5 +
.../RowDataKinesisDeserializationSchemaTest.java | 108 +++++++++++++++++++++
.../StaticValueDeserializationSchema.java | 52 ++++++++++
3 files changed, 165 insertions(+)
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java
index 3ffd051..4d9023d 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java
@@ -118,6 +118,11 @@ public final class RowDataKinesisDeserializationSchema
throws IOException {
RowData physicalRow = physicalDeserializer.deserialize(recordValue);
+ // If message can not be deserialized by physicalDeserializer - return
null to skip record
+ if (physicalRow == null) {
+ return null;
+ }
+
GenericRowData metadataRow = new
GenericRowData(requestedMetadataFields.size());
for (int i = 0; i < metadataRow.getArity(); i++) {
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java
new file mode 100644
index 0000000..711b535
--- /dev/null
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisDeserializationSchema.Metadata;
+import
org.apache.flink.streaming.connectors.kinesis.testutils.StaticValueDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for methods in {@link RowDataKinesisDeserializationSchema} class. */
+public class RowDataKinesisDeserializationSchemaTest {
+ private static final String FIELD_NAME = "text_field";
+
+ @Test
+ public void testAddMetadataToDeserializedRecord() throws Exception {
+ long timestamp = Instant.now().toEpochMilli();
+ String shardName = "test shard";
+ String sequenceNumber = "sequence number";
+ String deserializedValue = "deserialized value";
+
+ GenericRowData rowData = new GenericRowData(RowKind.INSERT, 1);
+ rowData.setField(0, StringData.fromString(deserializedValue));
+ DataType dataType = DataTypes.ROW(DataTypes.FIELD(FIELD_NAME,
DataTypes.STRING()));
+
+ ScanTableSource.ScanContext scanContext =
ScanRuntimeProviderContext.INSTANCE;
+ TypeInformation<RowData> typeInformation =
scanContext.createTypeInformation(dataType);
+
+ RowDataKinesisDeserializationSchema
rowDataKinesisDeserializationSchema =
+ createSchema(rowData, typeInformation);
+
+ RowData row =
+ rowDataKinesisDeserializationSchema.deserialize(
+ deserializedValue.getBytes(StandardCharsets.UTF_8),
+ "partitionKey",
+ sequenceNumber,
+ timestamp,
+ "test_stream",
+ shardName);
+
+ assertThat(row).isNotNull();
+
assertThat(row.getString(0)).isEqualTo(StringData.fromString(deserializedValue));
+ assertThat(row.getTimestamp(1,
0)).isEqualTo(TimestampData.fromEpochMillis(timestamp));
+
assertThat(row.getString(2)).isEqualTo(StringData.fromString(sequenceNumber));
+
assertThat(row.getString(3)).isEqualTo(StringData.fromString(shardName));
+ }
+
+ @Test
+ public void testHandleNullDeserializationResult() throws Exception {
+ ScanTableSource.ScanContext scanContext =
ScanRuntimeProviderContext.INSTANCE;
+ RowDataKinesisDeserializationSchema
rowDataKinesisDeserializationSchema =
+ createSchema(
+ null,
scanContext.createTypeInformation(DataTypes.ROW(DataTypes.STRING())));
+
+ RowData row =
+ rowDataKinesisDeserializationSchema.deserialize(
+ new byte[0],
+ "partitionKey",
+ "sequence number",
+ Instant.now().toEpochMilli(),
+ "test_stream",
+ "test shard");
+
+ assertThat(row).isNull();
+ }
+
+ private RowDataKinesisDeserializationSchema createSchema(
+ RowData deserializedValue, TypeInformation<RowData>
typeInformation) {
+
+ DeserializationSchema<RowData> internalDeserializationSchema =
+ new StaticValueDeserializationSchema<>(deserializedValue,
typeInformation);
+
+ return new RowDataKinesisDeserializationSchema(
+ internalDeserializationSchema, typeInformation,
Arrays.asList(Metadata.values()));
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java
new file mode 100644
index 0000000..4349bed
--- /dev/null
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A DeserializationSchema implementation which returns predefined value when
the deserialize method
+ * is called.
+ */
+public class StaticValueDeserializationSchema<T> implements
DeserializationSchema<T> {
+ private final T value;
+ private final TypeInformation<T> typeInformation;
+
+ public StaticValueDeserializationSchema(T value, TypeInformation<T>
typeInformation) {
+ this.value = value;
+ this.typeInformation = typeInformation;
+ }
+
+ @Override
+ public T deserialize(final byte[] bytes) throws IOException {
+ return value;
+ }
+
+ @Override
+ public boolean isEndOfStream(final T s) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return typeInformation;
+ }
+}