This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03f8b806ec1 [fix][io] KCA sink: handle null values with 
KeyValue<Avro,Avro> schema (#19861)
03f8b806ec1 is described below

commit 03f8b806ec177cdf6a5a82193a3fc82557000989
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Mar 21 03:14:33 2023 +0100

    [fix][io] KCA sink: handle null values with KeyValue<Avro,Avro> schema 
(#19861)
    
    Co-authored-by: Andrey Yegorov <[email protected]>
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  4 +-
 .../io/kafka/connect/schema/KafkaConnectData.java  |  9 +++
 .../connect/schema/PulsarSchemaToKafkaSchema.java  | 80 +++++++++++++++++++++-
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 54 +++++++++++++++
 4 files changed, 144 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index efbad2ef47a..06f66f60380 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -437,8 +437,8 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
 
             if (nativeObject instanceof KeyValue) {
                 KeyValue kv = (KeyValue) nativeObject;
-                key = KafkaConnectData.getKafkaConnectData(kv.getKey(), 
keySchema);
-                value = KafkaConnectData.getKafkaConnectData(kv.getValue(), 
valueSchema);
+                key = 
KafkaConnectData.getKafkaConnectDataFromSchema(kv.getKey(), keySchema);
+                value = 
KafkaConnectData.getKafkaConnectDataFromSchema(kv.getValue(), valueSchema);
             } else if (nativeObject != null) {
                 throw new IllegalStateException("Cannot extract KeyValue data 
from " + nativeObject.getClass());
             } else {
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 757241d4110..a308ef01ddc 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -54,6 +54,14 @@ public class KafkaConnectData {
         return out;
     }
 
+
+    public static Object getKafkaConnectDataFromSchema(Object nativeObject, 
Schema kafkaSchema) {
+        if (kafkaSchema != null && nativeObject == null) {
+            return null;
+        }
+        return getKafkaConnectData(nativeObject, kafkaSchema);
+    }
+
     @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema 
kafkaSchema) {
         if (kafkaSchema == null) {
@@ -380,6 +388,7 @@ public class KafkaConnectData {
         if (kafkaSchema.isOptional()) {
             return null;
         }
+
         throw new DataException("Invalid null value for required " + 
kafkaSchema.type() + " field");
     }
 }
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
index 2eb6573374c..faf28585e8a 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -26,11 +26,14 @@ import com.google.common.util.concurrent.ExecutionError;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.confluent.connect.avro.AvroData;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.data.Date;
 import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Time;
@@ -41,6 +44,76 @@ import org.apache.pulsar.common.schema.SchemaType;
 
 @Slf4j
 public class PulsarSchemaToKafkaSchema {
+
+    private static class OptionalForcingSchema implements Schema {
+
+        Schema sourceSchema;
+
+        public OptionalForcingSchema(Schema sourceSchema) {
+            this.sourceSchema = sourceSchema;
+        }
+
+        @Override
+        public Type type() {
+            return sourceSchema.type();
+        }
+
+        @Override
+        public boolean isOptional() {
+            return true;
+        }
+
+        @Override
+        public Object defaultValue() {
+            return sourceSchema.defaultValue();
+        }
+
+        @Override
+        public String name() {
+            return sourceSchema.name();
+        }
+
+        @Override
+        public Integer version() {
+            return sourceSchema.version();
+        }
+
+        @Override
+        public String doc() {
+            return sourceSchema.doc();
+        }
+
+        @Override
+        public Map<String, String> parameters() {
+            return sourceSchema.parameters();
+        }
+
+        @Override
+        public Schema keySchema() {
+            return sourceSchema.keySchema();
+        }
+
+        @Override
+        public Schema valueSchema() {
+            return sourceSchema.valueSchema();
+        }
+
+        @Override
+        public List<Field> fields() {
+            return sourceSchema.fields();
+        }
+
+        @Override
+        public Field field(String s) {
+            return sourceSchema.field(s);
+        }
+
+        @Override
+        public Schema schema() {
+            return sourceSchema.schema();
+        }
+    }
+
     private static final ImmutableMap<SchemaType, Schema> 
pulsarSchemaTypeToKafkaSchema;
     private static final ImmutableSet<String> kafkaLogicalSchemas;
     private static final AvroData avroData = new AvroData(1000);
@@ -80,6 +153,11 @@ public class PulsarSchemaToKafkaSchema {
         return parser.parse(schemaJson);
     }
 
+    public static Schema 
getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) 
{
+        Schema s = getKafkaConnectSchema(pulsarSchema);
+        return new OptionalForcingSchema(s);
+    }
+
     public static Schema 
getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
         if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
             throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is 
required.", null);
@@ -122,7 +200,7 @@ public class PulsarSchemaToKafkaSchema {
                 if (pulsarSchema.getSchemaInfo().getType() == 
SchemaType.KEY_VALUE) {
                     KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
                     return 
SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
-                                             
getKafkaConnectSchema(kvSchema.getValueSchema()))
+                                    
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
                                 .build();
                 }
                 org.apache.avro.Schema avroSchema = parseAvroSchema(
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 567562d338b..e9d454ed2fd 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -51,6 +51,9 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -60,6 +63,7 @@ import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -734,6 +738,56 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         Assert.assertEquals(key, 11);
     }
 
+    @Test
+    public void schemaKeyValueSchemaNullValueTest() throws Exception {
+        RecordSchemaBuilder builder = SchemaBuilder
+                .record("test");
+        builder.field("test").type(SchemaType.STRING);
+        GenericSchema<GenericRecord> schema = 
GenericAvroSchema.of(builder.build(SchemaType.AVRO));
+        KeyValue<Integer, String> kv = new KeyValue<>(11, null);
+        SinkRecord sinkRecord = recordSchemaTest(kv, 
Schema.KeyValue(Schema.INT32, schema), 11,
+                "INT32", null, "STRUCT");
+        Assert.assertNull(sinkRecord.value());
+        int key = (int) sinkRecord.key();
+        Assert.assertEquals(key, 11);
+    }
+
+    @Test
+    public void schemaKeyValueSchemaNullValueNoUnwrapTest() throws Exception {
+        props.put("unwrapKeyValueIfAvailable", "false");
+        JSONSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> 
jsonSchema = JSONSchema
+                
.of(SchemaDefinition.<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>builder()
+                        
.withPojo(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class)
+                        .withAlwaysAllowNull(true)
+                        .build());
+        KeyValue<Integer, String> kv = new KeyValue<>(11, null);
+        Map<String, Object> expected = new HashMap();
+        expected.put("11", null);
+        SinkRecord sinkRecord = recordSchemaTest(kv, 
Schema.KeyValue(Schema.INT32, jsonSchema), "key",
+                "STRING", expected, "MAP");
+        Assert.assertNull(((Map)sinkRecord.value()).get(11));
+        String key =(String)sinkRecord.key();
+        Assert.assertEquals(key, "key");
+    }
+
+    @Test
+    public void schemaKeyValueSchemaNullValueNoUnwrapTestAvro() throws 
Exception {
+        props.put("unwrapKeyValueIfAvailable", "false");
+        RecordSchemaBuilder builder = SchemaBuilder
+                .record("test");
+        builder.property("op", "test");
+        builder.field("test").type(SchemaType.STRING);
+        GenericSchema<GenericRecord> schema = 
GenericAvroSchema.of(builder.build(SchemaType.AVRO));
+        KeyValue<Integer, String> kv = new KeyValue<>(11, null);
+        Map<String, Object> expected = new HashMap();
+        expected.put("11", null);
+        SinkRecord sinkRecord = recordSchemaTest(kv, 
Schema.KeyValue(Schema.INT32, schema), "key",
+                "STRING", expected, "MAP");
+        Assert.assertNull(((Map)sinkRecord.value()).get(11));
+        String key =(String)sinkRecord.key();
+        Assert.assertEquals(key, "key");
+    }
+
     @Test
     public void kafkaLogicalTypesTimestampTest() {
         Schema schema = new TestSchema(SchemaInfoImpl.builder()

Reply via email to