This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 65fad77a98b [fix][connector] Kinesis sink: fix NPE with KeyValue
schema and no value (#17959)
65fad77a98b is described below
commit 65fad77a98bb8526656adac1d5e07cc4381777bf
Author: Nicolò Boschi <[email protected]>
AuthorDate: Fri Oct 7 21:35:27 2022 +0200
[fix][connector] Kinesis sink: fix NPE with KeyValue schema and no value
(#17959)
---
.../java/org/apache/pulsar/io/kinesis/Utils.java | 12 ++--
.../org/apache/pulsar/io/kinesis/UtilsTest.java | 73 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 4 deletions(-)
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index a46e52806be..a3ebfb94be9 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -253,10 +253,14 @@ public class Utils {
org.apache.pulsar.common.schema.KeyValue<GenericObject,
GenericObject> keyValue =
(org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject>) val;
Map<String, Object> jsonKeyValue = new HashMap<>();
- jsonKeyValue.put("key",
toJsonSerializable(keyValueSchema.getKeySchema(),
- keyValue.getKey().getNativeObject()));
- jsonKeyValue.put("value",
toJsonSerializable(keyValueSchema.getValueSchema(),
- keyValue.getValue().getNativeObject()));
+ if (keyValue.getKey() != null) {
+ jsonKeyValue.put("key",
toJsonSerializable(keyValueSchema.getKeySchema(),
+ keyValue.getKey().getNativeObject()));
+ }
+ if (keyValue.getValue() != null) {
+ jsonKeyValue.put("value",
toJsonSerializable(keyValueSchema.getValueSchema(),
+ keyValue.getValue().getNativeObject()));
+ }
return jsonKeyValue;
case AVRO:
return
JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val);
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index 3b128fcf65b..5458008f5f6 100644
---
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -445,6 +446,78 @@ public class UtilsTest {
+
".a\":\"1\",\"payload.key.b\":1,\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}");
}
+ @Test(dataProvider = "schemaType")
+ public void testKeyValueSerializeNoValue(SchemaType schemaType) throws
Exception {
+ RecordSchemaBuilder keySchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+
keySchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+ GenericSchema<GenericRecord> keySchema =
Schema.generic(keySchemaBuilder.build(schemaType));
+
+ RecordSchemaBuilder valueSchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+
valueSchemaBuilder.field("c").type(SchemaType.STRING).optional().defaultValue(null);
+ GenericSchema<GenericRecord> valueSchema =
Schema.generic(valueSchemaBuilder.build(schemaType));
+
+ Schema<org.apache.pulsar.common.schema.KeyValue<GenericRecord,
GenericRecord>> keyValueSchema =
+ Schema.KeyValue(keySchema, valueSchema,
KeyValueEncodingType.INLINE);
+ org.apache.pulsar.common.schema.KeyValue<GenericRecord, GenericRecord>
+ keyValue = new
org.apache.pulsar.common.schema.KeyValue<>(null, null);
+ GenericObject genericObject = new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return keyValue;
+ }
+ };
+
+ Record<GenericObject> genericObjectRecord = new Record<>() {
+ @Override
+ public Optional<String> getTopicName() {
+ return Optional.of("data-ks1.table1");
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema getSchema() {
+ return keyValueSchema;
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.of("message-key");
+ }
+
+ @Override
+ public GenericObject getValue() {
+ return genericObject;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Optional<Long> getEventTime() {
+ return Optional.of(1648502845803L);
+ }
+ };
+
+ ObjectMapper objectMapper = new
ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ String json = Utils.serializeRecordToJsonExpandingValue(objectMapper,
genericObjectRecord, false);
+
+ assertEquals(json,
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+ + "\"payload\":{},"
+ + "\"eventTime\":1648502845803}");
+
+ json = Utils.serializeRecordToJsonExpandingValue(objectMapper,
genericObjectRecord, true);
+
+ assertEquals(json,
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+ + "\"payload\":{},"
+ + "\"eventTime\":1648502845803}");
+ }
+
@Test
public void testPrimitiveSerializeRecordToJsonExpandingValue() throws
Exception {
GenericObject genericObject = new GenericObject() {