This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 03f8b806ec1 [fix][io] KCA sink: handle null values with
KeyValue<Avro,Avro> schema (#19861)
03f8b806ec1 is described below
commit 03f8b806ec177cdf6a5a82193a3fc82557000989
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Mar 21 03:14:33 2023 +0100
[fix][io] KCA sink: handle null values with KeyValue<Avro,Avro> schema
(#19861)
Co-authored-by: Andrey Yegorov <[email protected]>
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 4 +-
.../io/kafka/connect/schema/KafkaConnectData.java | 9 +++
.../connect/schema/PulsarSchemaToKafkaSchema.java | 80 +++++++++++++++++++++-
.../io/kafka/connect/KafkaConnectSinkTest.java | 54 +++++++++++++++
4 files changed, 144 insertions(+), 3 deletions(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index efbad2ef47a..06f66f60380 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -437,8 +437,8 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
if (nativeObject instanceof KeyValue) {
KeyValue kv = (KeyValue) nativeObject;
- key = KafkaConnectData.getKafkaConnectData(kv.getKey(),
keySchema);
- value = KafkaConnectData.getKafkaConnectData(kv.getValue(),
valueSchema);
+ key =
KafkaConnectData.getKafkaConnectDataFromSchema(kv.getKey(), keySchema);
+ value =
KafkaConnectData.getKafkaConnectDataFromSchema(kv.getValue(), valueSchema);
} else if (nativeObject != null) {
throw new IllegalStateException("Cannot extract KeyValue data
from " + nativeObject.getClass());
} else {
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 757241d4110..a308ef01ddc 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -54,6 +54,14 @@ public class KafkaConnectData {
return out;
}
+
+ public static Object getKafkaConnectDataFromSchema(Object nativeObject,
Schema kafkaSchema) {
+ if (kafkaSchema != null && nativeObject == null) {
+ return null;
+ }
+ return getKafkaConnectData(nativeObject, kafkaSchema);
+ }
+
@SuppressWarnings("unchecked")
public static Object getKafkaConnectData(Object nativeObject, Schema
kafkaSchema) {
if (kafkaSchema == null) {
@@ -380,6 +388,7 @@ public class KafkaConnectData {
if (kafkaSchema.isOptional()) {
return null;
}
+
throw new DataException("Invalid null value for required " +
kafkaSchema.type() + " field");
}
}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
index 2eb6573374c..faf28585e8a 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -26,11 +26,14 @@ import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.confluent.connect.avro.AvroData;
import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Time;
@@ -41,6 +44,76 @@ import org.apache.pulsar.common.schema.SchemaType;
@Slf4j
public class PulsarSchemaToKafkaSchema {
+
+ private static class OptionalForcingSchema implements Schema {
+
+ Schema sourceSchema;
+
+ public OptionalForcingSchema(Schema sourceSchema) {
+ this.sourceSchema = sourceSchema;
+ }
+
+ @Override
+ public Type type() {
+ return sourceSchema.type();
+ }
+
+ @Override
+ public boolean isOptional() {
+ return true;
+ }
+
+ @Override
+ public Object defaultValue() {
+ return sourceSchema.defaultValue();
+ }
+
+ @Override
+ public String name() {
+ return sourceSchema.name();
+ }
+
+ @Override
+ public Integer version() {
+ return sourceSchema.version();
+ }
+
+ @Override
+ public String doc() {
+ return sourceSchema.doc();
+ }
+
+ @Override
+ public Map<String, String> parameters() {
+ return sourceSchema.parameters();
+ }
+
+ @Override
+ public Schema keySchema() {
+ return sourceSchema.keySchema();
+ }
+
+ @Override
+ public Schema valueSchema() {
+ return sourceSchema.valueSchema();
+ }
+
+ @Override
+ public List<Field> fields() {
+ return sourceSchema.fields();
+ }
+
+ @Override
+ public Field field(String s) {
+ return sourceSchema.field(s);
+ }
+
+ @Override
+ public Schema schema() {
+ return sourceSchema.schema();
+ }
+ }
+
private static final ImmutableMap<SchemaType, Schema>
pulsarSchemaTypeToKafkaSchema;
private static final ImmutableSet<String> kafkaLogicalSchemas;
private static final AvroData avroData = new AvroData(1000);
@@ -80,6 +153,11 @@ public class PulsarSchemaToKafkaSchema {
return parser.parse(schemaJson);
}
+ public static Schema
getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema)
{
+ Schema s = getKafkaConnectSchema(pulsarSchema);
+ return new OptionalForcingSchema(s);
+ }
+
public static Schema
getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is
required.", null);
@@ -122,7 +200,7 @@ public class PulsarSchemaToKafkaSchema {
if (pulsarSchema.getSchemaInfo().getType() ==
SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
return
SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
-
getKafkaConnectSchema(kvSchema.getValueSchema()))
+
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
.build();
}
org.apache.avro.Schema avroSchema = parseAvroSchema(
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 567562d338b..e9d454ed2fd 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -51,6 +51,9 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -60,6 +63,7 @@ import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -734,6 +738,56 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
Assert.assertEquals(key, 11);
}
+ @Test
+ public void schemaKeyValueSchemaNullValueTest() throws Exception {
+ RecordSchemaBuilder builder = SchemaBuilder
+ .record("test");
+ builder.field("test").type(SchemaType.STRING);
+ GenericSchema<GenericRecord> schema =
GenericAvroSchema.of(builder.build(SchemaType.AVRO));
+ KeyValue<Integer, String> kv = new KeyValue<>(11, null);
+ SinkRecord sinkRecord = recordSchemaTest(kv,
Schema.KeyValue(Schema.INT32, schema), 11,
+ "INT32", null, "STRUCT");
+ Assert.assertNull(sinkRecord.value());
+ int key = (int) sinkRecord.key();
+ Assert.assertEquals(key, 11);
+ }
+
+ @Test
+ public void schemaKeyValueSchemaNullValueNoUnwrapTest() throws Exception {
+ props.put("unwrapKeyValueIfAvailable", "false");
+ JSONSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>
jsonSchema = JSONSchema
+
.of(SchemaDefinition.<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>builder()
+
.withPojo(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class)
+ .withAlwaysAllowNull(true)
+ .build());
+ KeyValue<Integer, String> kv = new KeyValue<>(11, null);
+ Map<String, Object> expected = new HashMap();
+ expected.put("11", null);
+ SinkRecord sinkRecord = recordSchemaTest(kv,
Schema.KeyValue(Schema.INT32, jsonSchema), "key",
+ "STRING", expected, "MAP");
+ Assert.assertNull(((Map)sinkRecord.value()).get(11));
+ String key =(String)sinkRecord.key();
+ Assert.assertEquals(key, "key");
+ }
+
+ @Test
+ public void schemaKeyValueSchemaNullValueNoUnwrapTestAvro() throws
Exception {
+ props.put("unwrapKeyValueIfAvailable", "false");
+ RecordSchemaBuilder builder = SchemaBuilder
+ .record("test");
+ builder.property("op", "test");
+ builder.field("test").type(SchemaType.STRING);
+ GenericSchema<GenericRecord> schema =
GenericAvroSchema.of(builder.build(SchemaType.AVRO));
+ KeyValue<Integer, String> kv = new KeyValue<>(11, null);
+ Map<String, Object> expected = new HashMap();
+ expected.put("11", null);
+ SinkRecord sinkRecord = recordSchemaTest(kv,
Schema.KeyValue(Schema.INT32, schema), "key",
+ "STRING", expected, "MAP");
+ Assert.assertNull(((Map)sinkRecord.value()).get(11));
+ String key =(String)sinkRecord.key();
+ Assert.assertEquals(key, "key");
+ }
+
@Test
public void kafkaLogicalTypesTimestampTest() {
Schema schema = new TestSchema(SchemaInfoImpl.builder()