This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 5105d48  [FLINK-33176][Connectors/Kinesis] Handle null value in 
RowDataKinesisDeserializationSchema
5105d48 is described below

commit 5105d48bf7334503b314d188462c3542f438f1d6
Author: Aleksandr Pilipenko <[email protected]>
AuthorDate: Mon Oct 2 19:23:25 2023 +0100

    [FLINK-33176][Connectors/Kinesis] Handle null value in 
RowDataKinesisDeserializationSchema
---
 .../table/RowDataKinesisDeserializationSchema.java |   5 +
 .../RowDataKinesisDeserializationSchemaTest.java   | 108 +++++++++++++++++++++
 .../StaticValueDeserializationSchema.java          |  52 ++++++++++
 3 files changed, 165 insertions(+)

diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java
index 3ffd051..4d9023d 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java
@@ -118,6 +118,11 @@ public final class RowDataKinesisDeserializationSchema
             throws IOException {
 
         RowData physicalRow = physicalDeserializer.deserialize(recordValue);
+        // If message can not be deserialized by physicalDeserializer - return 
null to skip record
+        if (physicalRow == null) {
+            return null;
+        }
+
         GenericRowData metadataRow = new 
GenericRowData(requestedMetadataFields.size());
 
         for (int i = 0; i < metadataRow.getArity(); i++) {
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java
new file mode 100644
index 0000000..711b535
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisDeserializationSchema.Metadata;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.StaticValueDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for methods in {@link RowDataKinesisDeserializationSchema} class. */
+public class RowDataKinesisDeserializationSchemaTest {
+    private static final String FIELD_NAME = "text_field";
+
+    @Test
+    public void testAddMetadataToDeserializedRecord() throws Exception {
+        long timestamp = Instant.now().toEpochMilli();
+        String shardName = "test shard";
+        String sequenceNumber = "sequence number";
+        String deserializedValue = "deserialized value";
+
+        GenericRowData rowData = new GenericRowData(RowKind.INSERT, 1);
+        rowData.setField(0, StringData.fromString(deserializedValue));
+        DataType dataType = DataTypes.ROW(DataTypes.FIELD(FIELD_NAME, 
DataTypes.STRING()));
+
+        ScanTableSource.ScanContext scanContext = 
ScanRuntimeProviderContext.INSTANCE;
+        TypeInformation<RowData> typeInformation = 
scanContext.createTypeInformation(dataType);
+
+        RowDataKinesisDeserializationSchema 
rowDataKinesisDeserializationSchema =
+                createSchema(rowData, typeInformation);
+
+        RowData row =
+                rowDataKinesisDeserializationSchema.deserialize(
+                        deserializedValue.getBytes(StandardCharsets.UTF_8),
+                        "partitionKey",
+                        sequenceNumber,
+                        timestamp,
+                        "test_stream",
+                        shardName);
+
+        assertThat(row).isNotNull();
+        
assertThat(row.getString(0)).isEqualTo(StringData.fromString(deserializedValue));
+        assertThat(row.getTimestamp(1, 
0)).isEqualTo(TimestampData.fromEpochMillis(timestamp));
+        
assertThat(row.getString(2)).isEqualTo(StringData.fromString(sequenceNumber));
+        
assertThat(row.getString(3)).isEqualTo(StringData.fromString(shardName));
+    }
+
+    @Test
+    public void testHandleNullDeserializationResult() throws Exception {
+        ScanTableSource.ScanContext scanContext = 
ScanRuntimeProviderContext.INSTANCE;
+        RowDataKinesisDeserializationSchema 
rowDataKinesisDeserializationSchema =
+                createSchema(
+                        null, 
scanContext.createTypeInformation(DataTypes.ROW(DataTypes.STRING())));
+
+        RowData row =
+                rowDataKinesisDeserializationSchema.deserialize(
+                        new byte[0],
+                        "partitionKey",
+                        "sequence number",
+                        Instant.now().toEpochMilli(),
+                        "test_stream",
+                        "test shard");
+
+        assertThat(row).isNull();
+    }
+
+    private RowDataKinesisDeserializationSchema createSchema(
+            RowData deserializedValue, TypeInformation<RowData> 
typeInformation) {
+
+        DeserializationSchema<RowData> internalDeserializationSchema =
+                new StaticValueDeserializationSchema<>(deserializedValue, 
typeInformation);
+
+        return new RowDataKinesisDeserializationSchema(
+                internalDeserializationSchema, typeInformation, 
Arrays.asList(Metadata.values()));
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java
new file mode 100644
index 0000000..4349bed
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A DeserializationSchema implementation which returns predefined value when 
the deserialize method
+ * is called.
+ */
+public class StaticValueDeserializationSchema<T> implements 
DeserializationSchema<T> {
+    private final T value;
+    private final TypeInformation<T> typeInformation;
+
+    public StaticValueDeserializationSchema(T value, TypeInformation<T> 
typeInformation) {
+        this.value = value;
+        this.typeInformation = typeInformation;
+    }
+
+    @Override
+    public T deserialize(final byte[] bytes) throws IOException {
+        return value;
+    }
+
+    @Override
+    public boolean isEndOfStream(final T s) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return typeInformation;
+    }
+}

Reply via email to