This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9e7aa0f43ad Fix: org.apache.kafka.connect.errors.DataException: Not a
struct schema: Schema{ARRAY} + tests (#15988)
9e7aa0f43ad is described below
commit 9e7aa0f43add7be841cf6b1791ec34c2ced43f1f
Author: Andrey Yegorov <[email protected]>
AuthorDate: Wed Jun 8 23:49:58 2022 -0700
Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema:
Schema{ARRAY} + tests (#15988)
---
.../io/kafka/connect/schema/KafkaConnectData.java | 148 +++++++---
.../io/kafka/connect/KafkaConnectSinkTest.java | 299 +++++++++++++++++++++
.../connect/PulsarSchemaToKafkaSchemaTest.java | 109 +++++++-
3 files changed, 522 insertions(+), 34 deletions(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 8374dd24bf7..671495c6df6 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.io.kafka.connect.schema;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -37,31 +38,90 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
@Slf4j
public class KafkaConnectData {
+
+ private static List<Object> arrayToList(Object nativeObject, Schema
kafkaValueSchema) {
+ Preconditions.checkArgument(nativeObject.getClass().isArray());
+ int length = Array.getLength(nativeObject);
+ List<Object> out = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ // this handles primitive values too
+ Object elem = Array.get(nativeObject, i);
+ out.add(getKafkaConnectData(elem, kafkaValueSchema));
+ }
+ return out;
+ }
+
+ @SuppressWarnings("unchecked")
public static Object getKafkaConnectData(Object nativeObject, Schema
kafkaSchema) {
if (kafkaSchema == null) {
return nativeObject;
}
+ if (nativeObject == null) {
+ return defaultOrThrow(kafkaSchema);
+ }
+
if (nativeObject instanceof JsonNode) {
JsonNode node = (JsonNode) nativeObject;
return jsonAsConnectData(node, kafkaSchema);
- } else if (nativeObject instanceof GenericData.Record) {
- GenericData.Record avroRecord = (GenericData.Record) nativeObject;
- return avroAsConnectData(avroRecord, kafkaSchema);
- } else if (nativeObject instanceof GenericRecord) {
- // Pulsar's GenericRecord
- GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
- return pulsarGenericRecordAsConnectData(pulsarGenericRecord,
kafkaSchema);
}
- return castToKafkaSchema(nativeObject, kafkaSchema);
+ switch (kafkaSchema.type()) {
+ case ARRAY:
+ if (nativeObject instanceof List) {
+ List arr = (List) nativeObject;
+ return arr.stream()
+ .map(x -> getKafkaConnectData(x,
kafkaSchema.valueSchema()))
+ .toList();
+ } else if (nativeObject.getClass().isArray()) {
+ return arrayToList(nativeObject,
kafkaSchema.valueSchema());
+ }
+ throw new IllegalStateException("Don't know how to convert " +
nativeObject.getClass()
+ + " into kafka ARRAY");
+ case MAP:
+ if (nativeObject instanceof Map) {
+ Map<Object, Object> map = (Map<Object, Object>)
nativeObject;
+ Map<Object, Object> responseMap = new
HashMap<>(map.size());
+ for (Map.Entry<Object, Object> kv : map.entrySet()) {
+ Object key = getKafkaConnectData(kv.getKey(),
kafkaSchema.keySchema());
+ Object val = getKafkaConnectData(kv.getValue(),
kafkaSchema.valueSchema());
+ responseMap.put(key, val);
+ }
+ return responseMap;
+ } else if (nativeObject instanceof
org.apache.pulsar.common.schema.KeyValue) {
+ org.apache.pulsar.common.schema.KeyValue kv =
+ (org.apache.pulsar.common.schema.KeyValue)
nativeObject;
+ Map<Object, Object> responseMap = new HashMap<>();
+ Object key = getKafkaConnectData(kv.getKey(),
kafkaSchema.keySchema());
+ Object val = getKafkaConnectData(kv.getValue(),
kafkaSchema.valueSchema());
+ responseMap.put(key, val);
+ return responseMap;
+ }
+ throw new IllegalStateException("Don't know how to convert " +
nativeObject.getClass()
+ + " into kafka MAP");
+ case STRUCT:
+ if (nativeObject instanceof GenericData.Record) {
+ GenericData.Record avroRecord = (GenericData.Record)
nativeObject;
+ return avroAsConnectData(avroRecord, kafkaSchema);
+ } else if (nativeObject instanceof GenericRecord) {
+ GenericRecord pulsarGenericRecord = (GenericRecord)
nativeObject;
+ // Pulsar's GenericRecord
+ if (pulsarGenericRecord.getNativeObject() instanceof
JsonNode
+ || pulsarGenericRecord.getNativeObject()
instanceof GenericData.Record) {
+ return
getKafkaConnectData(pulsarGenericRecord.getNativeObject(), kafkaSchema);
+ }
+ return
pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+ }
+ throw new IllegalStateException("Don't know how to convert " +
nativeObject.getClass()
+ + "into kafka STRUCT");
+ default:
+ Preconditions.checkArgument(kafkaSchema.type().isPrimitive(),
+ "Expected primitive schema but got " +
kafkaSchema.type());
+ return castToKafkaSchema(nativeObject, kafkaSchema);
+ }
}
public static Object castToKafkaSchema(Object nativeObject, Schema
kafkaSchema) {
- if (nativeObject == null) {
- return defaultOrThrow(kafkaSchema);
- }
-
if (nativeObject instanceof Number) {
// This is needed in case
// jackson decided to fit value into some other type internally
@@ -121,6 +181,19 @@ public class KafkaConnectData {
}
}
+ if (nativeObject instanceof Character) {
+ Character ch = (Character) nativeObject;
+ if (kafkaSchema.type() == Schema.Type.STRING) {
+ return ch.toString();
+ }
+ return castToKafkaSchema(Character.getNumericValue(ch),
kafkaSchema);
+ }
+
+ if (kafkaSchema.type() == Schema.Type.STRING && nativeObject
instanceof CharSequence) {
+ // e.g. org.apache.avro.util.Utf8
+ return nativeObject.toString();
+ }
+
return nativeObject;
}
@@ -161,23 +234,8 @@ public class KafkaConnectData {
if (jsonNode == null || jsonNode.isNull()) {
return null;
}
- switch (jsonNode.getNodeType()) {
- case BINARY:
- try {
- return jsonNode.binaryValue();
- } catch (IOException e) {
- throw new DataException("Cannot get binary value for "
+ jsonNode);
- }
- case BOOLEAN:
- return jsonNode.booleanValue();
- case NUMBER:
- return jsonNode.doubleValue();
- case STRING:
- return jsonNode.textValue();
- default:
- throw new DataException("Don't know how to convert " +
jsonNode
- + " to Connect data (schema is null).");
- }
+ throw new DataException("Don't know how to convert " + jsonNode
+ + " to Connect data (schema is null).");
}
if (jsonNode == null || jsonNode.isNull()) {
@@ -186,39 +244,65 @@ public class KafkaConnectData {
switch (kafkaSchema.type()) {
case INT8:
+ Preconditions.checkArgument(jsonNode.isNumber());
return (byte) jsonNode.shortValue();
case INT16:
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.shortValue();
case INT32:
+ if (jsonNode.isTextual() && jsonNode.textValue().length() ==
1) {
+ // char encoded as String instead of Integer
+ return
Character.getNumericValue(jsonNode.textValue().charAt(0));
+ }
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.intValue();
case INT64:
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.longValue();
case FLOAT32:
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.floatValue();
case FLOAT64:
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.doubleValue();
case BOOLEAN:
+ Preconditions.checkArgument(jsonNode.isBoolean());
return jsonNode.booleanValue();
case STRING:
+ Preconditions.checkArgument(jsonNode.isTextual());
return jsonNode.textValue();
case BYTES:
+ Preconditions.checkArgument(jsonNode.isBinary());
try {
return jsonNode.binaryValue();
} catch (IOException e) {
throw new DataException("Cannot get binary value for " +
jsonNode + " with schema " + kafkaSchema);
}
case ARRAY:
- List<Object> list = new ArrayList<>();
+ if (jsonNode.isTextual() && kafkaSchema.valueSchema().type()
== Schema.Type.INT32) {
+ // char[] encoded as String in json
+ List<Object> list = new ArrayList<>();
+ for (char ch: jsonNode.textValue().toCharArray()) {
+ list.add(Character.getNumericValue(ch));
+ }
+ return list;
+ }
+
Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has
to be an array");
- for (Iterator<JsonNode> it = jsonNode.elements();
it.hasNext(); ) {
+ List<Object> list = new ArrayList<>();
+ for (Iterator<JsonNode> it = jsonNode.elements();
it.hasNext();) {
list.add(jsonAsConnectData(it.next(),
kafkaSchema.valueSchema()));
}
return list;
case MAP:
+ Preconditions.checkArgument(jsonNode.isObject(), "jsonNode has
to be an Object node");
+ Preconditions.checkArgument(kafkaSchema.keySchema().type() ==
Schema.Type.STRING,
+ "kafka schema for json map is expected to be STRING");
Map<String, Object> map = new HashMap<>();
for (Iterator<Map.Entry<String, JsonNode>> it =
jsonNode.fields(); it.hasNext(); ) {
Map.Entry<String, JsonNode> elem = it.next();
- map.put(elem.getKey(), jsonAsConnectData(elem.getValue(),
kafkaSchema.valueSchema()));
+ map.put(elem.getKey(),
+ jsonAsConnectData(elem.getValue(),
kafkaSchema.valueSchema()));
}
return map;
case STRUCT:
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 23d9f1b5ce2..2c0ea31ac4c 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -22,8 +22,17 @@ package org.apache.pulsar.io.kafka.connect;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -47,6 +56,8 @@ import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -56,11 +67,14 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -538,6 +552,13 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
expected.put("field2", "test");
// integer is coming back from ObjectMapper
expected.put("field3", 100);
+ expected.put("byteField", 0);
+ expected.put("shortField", 0);
+ expected.put("intField", 0);
+ expected.put("longField", 0);
+ // double is coming back from ObjectMapper
+ expected.put("floatField", 0.0d);
+ expected.put("doubleField", 0.0d);
SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema,
expected, "STRUCT");
@@ -565,6 +586,13 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
expected.put("field2", "test");
// integer is coming back from ObjectMapper
expected.put("field3", 100);
+ expected.put("byteField", 0);
+ expected.put("shortField", 0);
+ expected.put("intField", 0);
+ expected.put("longField", 0);
+ // double is coming back from ObjectMapper
+ expected.put("floatField", 0.0d);
+ expected.put("doubleField", 0.0d);
SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema,
expected, "STRUCT");
@@ -615,6 +643,167 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
Assert.assertEquals(key, 11);
}
+ @Test
+ public void connectDataComplexAvroSchemaGenericRecordTest() {
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct>
pulsarAvroSchema
+ =
AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class);
+
+ final GenericData.Record key = getComplexStructRecord();
+ final GenericData.Record value = getComplexStructRecord();
+ KeyValue<GenericRecord, GenericRecord> kv = new
KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
+ getGenericRecord(value, pulsarAvroSchema));
+
+ org.apache.kafka.connect.data.Schema kafkaSchema =
PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema,
pulsarAvroSchema));
+
+ Object connectData = KafkaConnectData.getKafkaConnectData(kv,
kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema,
connectData);
+ }
+
+ @Test
+ public void connectDataPojoArrTest() throws Exception {
+ PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojo =
+ new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{
+ getPojoComplexStruct(),
+ getPojoComplexStruct(),
+ getPojoComplexStruct()
+ };
+
+ testPojoAsAvroAndJsonConversionToConnectData(pojo);
+ }
+
+ @Test
+ public void connectDataPojoListTest() throws Exception {
+ List<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo =
+ Lists.newArrayList(
+ getPojoComplexStruct(),
+ getPojoComplexStruct(),
+ getPojoComplexStruct()
+ );
+
+ /*
+ Need this because of (AFAICT)
+ https://issues.apache.org/jira/browse/AVRO-1183
+ https://github.com/apache/pulsar/issues/4851
+ to generate proper schema
+ */
+ PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojoForSchema =
+ new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{
+ getPojoComplexStruct(),
+ getPojoComplexStruct(),
+ getPojoComplexStruct()
+ };
+
+ AvroSchema pulsarAvroSchema = AvroSchema.of(pojoForSchema.getClass());
+
+ testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema);
+ }
+
+ @Test
+ public void connectDataPojoMapTest() throws Exception {
+ Map<String, PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo =
+ Maps.newHashMap();
+ pojo.put("key1", getPojoComplexStruct());
+ pojo.put("key2", getPojoComplexStruct());
+
+ testPojoAsAvroAndJsonConversionToConnectData(pojo);
+ }
+
+ @Test
+ public void connectDataPrimitivesTest() throws Exception {
+ testPojoAsAvroAndJsonConversionToConnectData("test");
+
+ testPojoAsAvroAndJsonConversionToConnectData('a');
+
+ testPojoAsAvroAndJsonConversionToConnectData(Byte.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Byte.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Short.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Short.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Integer.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Integer.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Long.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Long.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Float.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Float.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Double.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Double.MAX_VALUE);
+ }
+
+ @Test
+ public void connectDataPrimitiveArraysTest() throws Exception {
+ testPojoAsAvroAndJsonConversionToConnectData(new String[] {"test",
"test2"});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new char[] {'a', 'b',
'c'});
+ testPojoAsAvroAndJsonConversionToConnectData(new Character[] {'a',
'b', 'c'});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new byte[]
{Byte.MIN_VALUE, Byte.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Byte[]
{Byte.MIN_VALUE, Byte.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new short[]
{Short.MIN_VALUE, Short.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Short[]
{Short.MIN_VALUE, Short.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new int[]
{Integer.MIN_VALUE, Integer.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Integer[]
{Integer.MIN_VALUE, Integer.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new long[]
{Long.MIN_VALUE, Long.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Long[]
{Long.MIN_VALUE, Long.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new float[]
{Float.MIN_VALUE, Float.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Float[]
{Float.MIN_VALUE, Float.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new double[]
{Double.MIN_VALUE, Double.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Double[]
{Double.MIN_VALUE, Double.MAX_VALUE});
+ }
+
+ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo)
throws IOException {
+ AvroSchema pulsarAvroSchema = AvroSchema.of(pojo.getClass());
+ testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema);
+ }
+
+ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo,
AvroSchema pulsarAvroSchema) throws IOException {
+ Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);
+
+ org.apache.kafka.connect.data.Schema kafkaSchema =
PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(pulsarAvroSchema);
+
+ Object connectData = KafkaConnectData.getKafkaConnectData(value,
kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema,
connectData);
+
+ Object jsonNode = pojoAsJsonNode(pojo);
+ connectData = KafkaConnectData.getKafkaConnectData(jsonNode,
kafkaSchema);
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema,
connectData);
+ }
+
+ private JsonNode pojoAsJsonNode(Object pojo) {
+ ObjectMapper om = new ObjectMapper();
+ JsonNode json = om.valueToTree(pojo);
+ return json;
+ }
+
+ private Object pojoAsAvroRecord(Object pojo, AvroSchema pulsarAvroSchema)
throws IOException {
+ DatumWriter writer = new ReflectDatumWriter<>();
+
+ writer.setSchema(pulsarAvroSchema.getAvroSchema());
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Encoder enc = new EncoderFactory().directBinaryEncoder(out, null);
+ writer.write(pojo, enc);
+ enc.flush();
+ byte[] data = out.toByteArray();
+
+ DatumReader<GenericRecord> reader = new
GenericDatumReader<>(pulsarAvroSchema.getAvroSchema());
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+ Object value = reader.read(null, decoder);
+ return value;
+ }
+
@Test
public void schemaKeyValueAvroSchemaTest() throws Exception {
AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>
pulsarAvroSchema
@@ -635,12 +824,26 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
expectedKey.put("field2", "key");
// integer is coming back from ObjectMapper
expectedKey.put("field3", 101);
+ expectedKey.put("byteField", 0);
+ expectedKey.put("shortField", 0);
+ expectedKey.put("intField", 0);
+ expectedKey.put("longField", 0);
+ // double is coming back from ObjectMapper
+ expectedKey.put("floatField", 0.0d);
+ expectedKey.put("doubleField", 0.0d);
Map<String, Object> expectedValue = new LinkedHashMap<>();
expectedValue.put("field1", 10);
expectedValue.put("field2", "value");
// integer is coming back from ObjectMapper
expectedValue.put("field3", 100);
+ expectedValue.put("byteField", 0);
+ expectedValue.put("shortField", 0);
+ expectedValue.put("intField", 0);
+ expectedValue.put("longField", 0);
+ // double is coming back from ObjectMapper
+ expectedValue.put("floatField", 0.0d);
+ expectedValue.put("doubleField", 0.0d);
KeyValue<GenericRecord, GenericRecord> kv = new
KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
getGenericRecord(value, pulsarAvroSchema));
@@ -781,4 +984,100 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
sink.close();
}
+ private static PulsarSchemaToKafkaSchemaTest.StructWithAnnotations
getPojoStructWithAnnotations() {
+ return new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations()
+ .setField1(1)
+ .setField2("field2")
+ .setField3(100L)
+ .setByteField((byte) 1)
+ .setShortField((short) 2)
+ .setIntField(3)
+ .setLongField(4)
+ .setFloatField(5.0f)
+ .setDoubleField(6.0d);
+ }
+
+ private static PulsarSchemaToKafkaSchemaTest.ComplexStruct
getPojoComplexStruct() {
+ return new PulsarSchemaToKafkaSchemaTest.ComplexStruct()
+ .setStringList(Lists.newArrayList("str11", "str22"))
+ .setStructArr(new
PulsarSchemaToKafkaSchemaTest.StructWithAnnotations[]{getPojoStructWithAnnotations()})
+
.setStructList(Lists.newArrayList(getPojoStructWithAnnotations()))
+ .setStruct(getPojoStructWithAnnotations())
+ .setStructMap(Map.of("key1", getPojoStructWithAnnotations(),
+ "key2", getPojoStructWithAnnotations()))
+
+ .setByteField((byte) 1)
+ .setShortField((short) 2)
+ .setIntField(3)
+ .setLongField(4)
+ .setFloatField(5.0f)
+ .setDoubleField(6.0d)
+ .setCharField('c')
+ .setStringField("some text")
+
+ .setByteArr(new byte[] {1 ,2})
+ .setShortArr(new short[] {3, 4})
+ .setIntArr(new int[] {5, 6})
+ .setLongArr(new long[] {7, 8})
+ .setFloatArr(new float[] {9.0f, 10.0f})
+ .setDoubleArr(new double[] {11.0d, 12.0d})
+ .setCharArr(new char[]{'a', 'b'})
+ .setStringArr(new String[] {"abc", "def"});
+ }
+
+ private static GenericData.Record getStructRecord() {
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>
pulsarAvroSchema
+ =
AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+ final GenericData.Record rec = new
GenericData.Record(pulsarAvroSchema.getAvroSchema());
+
+ rec.put("field1", 11);
+ rec.put("field2", "str99");
+ rec.put("field3", 101L);
+ rec.put("byteField", (byte) 1);
+ rec.put("shortField", (short) 2);
+ rec.put("intField", 3);
+ rec.put("longField", 4L);
+ rec.put("floatField", 5.0f);
+ rec.put("doubleField", 6.0d);
+
+ return rec;
+ }
+
+ private static GenericData.Record getComplexStructRecord() {
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct>
pulsarAvroSchema
+ =
AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class);
+
+ final GenericData.Record rec = new
GenericData.Record(pulsarAvroSchema.getAvroSchema());
+
+ rec.put("stringArr", new String[]{"str1", "str2"});
+ rec.put("stringList", Lists.newArrayList("str11", "str22"));
+ rec.put("structArr", new GenericData.Record[]{getStructRecord(),
getStructRecord()});
+ rec.put("structList", Lists.newArrayList(getStructRecord(),
getStructRecord()));
+
+ rec.put("struct", getStructRecord());
+ rec.put("byteField", (byte) 1);
+ rec.put("shortField", (short) 2);
+ rec.put("intField", 3);
+ rec.put("longField", 4L);
+ rec.put("floatField", 5.1f);
+ rec.put("doubleField", 6.1d);
+ rec.put("charField", 'c');
+ rec.put("stringField", "some string");
+ rec.put("byteArr", new byte[] {(byte) 1, (byte) 2});
+ rec.put("shortArr", new short[] {(short) 3, (short) 4});
+ rec.put("intArr", new int[] {5, 6});
+ rec.put("longArr", new long[] {7L, 8L});
+ rec.put("floatArr", new float[] {9.0f, 10.0f});
+ rec.put("doubleArr", new double[] {11.0d, 12.0d});
+ rec.put("charArr", new char[] {'a', 'b', 'c'});
+
+ Map<String, GenericData.Record> map = new HashMap<>();
+ map.put("key1", getStructRecord());
+ map.put("key2", getStructRecord());
+
+ rec.put("structMap", map);
+
+ return rec;
+ }
}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 60caa2bbe81..ecf0633f588 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.kafka.connect;
import com.google.common.collect.Lists;
import lombok.Data;
+import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
import java.math.BigInteger;
import java.util.List;
+import java.util.Map;
import static org.testng.Assert.assertEquals;
@@ -44,15 +46,90 @@ import static org.testng.Assert.assertEquals;
@Slf4j
public class PulsarSchemaToKafkaSchemaTest {
- static final List<String> STRUCT_FIELDS = Lists.newArrayList("field1",
"field2", "field3");
+ static final List<String> STRUCT_FIELDS = Lists.newArrayList(
+ "field1",
+ "field2",
+ "field3",
+ "byteField",
+ "shortField",
+ "intField",
+ "longField",
+ "floatField",
+ "doubleField"
+ );
+ static final List<String> COMPLEX_STRUCT_FIELDS = Lists.newArrayList(
+ "stringArr",
+ "stringList",
+ "structArr",
+ "structList",
+ "structMap",
+ "struct",
+ "byteField",
+ "shortField",
+ "intField",
+ "longField",
+ "floatField",
+ "doubleField",
+ "charField",
+ "stringField",
+ "byteArr",
+ "shortArr",
+ "intArr",
+ "longArr",
+ "floatArr",
+ "doubleArr",
+ "charArr"
+ );
@Data
+ @Accessors(chain = true)
static class StructWithAnnotations {
int field1;
@Nullable
String field2;
- @AvroDefault("\"1000\"")
+ @AvroDefault("1000")
Long field3;
+
+ @AvroDefault("0")
+ byte byteField;
+ @AvroDefault("0")
+ short shortField;
+ @AvroDefault("0")
+ int intField;
+ @AvroDefault("0")
+ long longField;
+ @AvroDefault("0")
+ float floatField;
+ @AvroDefault("0")
+ double doubleField;
+ }
+
+ @Data
+ @Accessors(chain = true)
+ static class ComplexStruct {
+ List<String> stringList;
+ StructWithAnnotations[] structArr;
+ List<StructWithAnnotations> structList;
+ Map<String, StructWithAnnotations> structMap;
+ StructWithAnnotations struct;
+
+ byte byteField;
+ short shortField;
+ int intField;
+ long longField;
+ float floatField;
+ double doubleField;
+ char charField;
+ String stringField;
+
+ byte[] byteArr;
+ short[] shortArr;
+ int[] intArr;
+ long[] longArr;
+ float[] floatArr;
+ double[] doubleArr;
+ char[] charArr;
+ String[] stringArr;
}
@Test
@@ -153,6 +230,18 @@ public class PulsarSchemaToKafkaSchemaTest {
}
}
+ @Test
+ public void avroComplexSchemaTest() {
+ AvroSchema<ComplexStruct> pulsarAvroSchema =
AvroSchema.of(ComplexStruct.class);
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+ assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.STRUCT);
+ assertEquals(kafkaSchema.fields().size(),
COMPLEX_STRUCT_FIELDS.size());
+ for (String name: COMPLEX_STRUCT_FIELDS) {
+ assertEquals(kafkaSchema.field(name).name(), name);
+ }
+ }
+
@Test
public void jsonSchemaTest() {
JSONSchema<StructWithAnnotations> jsonSchema = JSONSchema
@@ -169,6 +258,22 @@ public class PulsarSchemaToKafkaSchemaTest {
}
}
+ @Test
+ public void jsonComplexSchemaTest() {
+ JSONSchema<ComplexStruct> jsonSchema = JSONSchema
+ .of(SchemaDefinition.<ComplexStruct>builder()
+ .withPojo(ComplexStruct.class)
+ .withAlwaysAllowNull(false)
+ .build());
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+ assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.STRUCT);
+ assertEquals(kafkaSchema.fields().size(),
COMPLEX_STRUCT_FIELDS.size());
+ for (String name: COMPLEX_STRUCT_FIELDS) {
+ assertEquals(kafkaSchema.field(name).name(), name);
+ }
+ }
+
@Test
public void castToKafkaSchemaTest() {
assertEquals(Byte.class,