This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fe570032a0aa8fe6f79fc753bd7a8266765c1461 Author: Andrey Yegorov <[email protected]> AuthorDate: Wed Jun 8 23:49:58 2022 -0700 Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988) (cherry picked from commit 9e7aa0f43add7be841cf6b1791ec34c2ced43f1f) --- .../io/kafka/connect/schema/KafkaConnectData.java | 148 +++++++--- .../io/kafka/connect/KafkaConnectSinkTest.java | 299 +++++++++++++++++++++ .../connect/PulsarSchemaToKafkaSchemaTest.java | 109 +++++++- 3 files changed, 522 insertions(+), 34 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java index 8374dd24bf7..671495c6df6 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java @@ -22,6 +22,7 @@ package org.apache.pulsar.io.kafka.connect.schema; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import java.io.IOException; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -37,31 +38,90 @@ import org.apache.pulsar.client.api.schema.GenericRecord; @Slf4j public class KafkaConnectData { + + private static List<Object> arrayToList(Object nativeObject, Schema kafkaValueSchema) { + Preconditions.checkArgument(nativeObject.getClass().isArray()); + int length = Array.getLength(nativeObject); + List<Object> out = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + // this handles primitive values too + Object elem = Array.get(nativeObject, i); + out.add(getKafkaConnectData(elem, kafkaValueSchema)); + } + return out; + } + + @SuppressWarnings("unchecked") public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) { if (kafkaSchema == null) { return nativeObject; } + if (nativeObject == null) { + return defaultOrThrow(kafkaSchema); + } + if (nativeObject instanceof JsonNode) { JsonNode node = (JsonNode) nativeObject; return jsonAsConnectData(node, kafkaSchema); - } else if (nativeObject instanceof GenericData.Record) { - GenericData.Record avroRecord = (GenericData.Record) nativeObject; - return avroAsConnectData(avroRecord, kafkaSchema); - } else if (nativeObject instanceof GenericRecord) { - // Pulsar's GenericRecord - GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject; - return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema); } - return castToKafkaSchema(nativeObject, kafkaSchema); + switch (kafkaSchema.type()) { + case ARRAY: + if (nativeObject instanceof List) { + List arr = (List) nativeObject; + return arr.stream() + .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema())) + .toList(); + } else if (nativeObject.getClass().isArray()) { + return arrayToList(nativeObject, kafkaSchema.valueSchema()); + } + throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass() + + " into kafka ARRAY"); + case MAP: + if (nativeObject instanceof Map) { + Map<Object, Object> map = (Map<Object, Object>) nativeObject; + Map<Object, Object> responseMap = new HashMap<>(map.size()); + for (Map.Entry<Object, Object> kv : map.entrySet()) { + Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema()); + Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema()); + responseMap.put(key, val); + } + return responseMap; + } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue) { + org.apache.pulsar.common.schema.KeyValue kv = + (org.apache.pulsar.common.schema.KeyValue) nativeObject; + Map<Object, Object> responseMap = new HashMap<>(); + Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema()); + Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema()); + responseMap.put(key, val); + return responseMap; + } + throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass() + + " into kafka MAP"); + case STRUCT: + if (nativeObject instanceof GenericData.Record) { + GenericData.Record avroRecord = (GenericData.Record) nativeObject; + return avroAsConnectData(avroRecord, kafkaSchema); + } else if (nativeObject instanceof GenericRecord) { + GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject; + // Pulsar's GenericRecord + if (pulsarGenericRecord.getNativeObject() instanceof JsonNode + || pulsarGenericRecord.getNativeObject() instanceof GenericData.Record) { + return getKafkaConnectData(pulsarGenericRecord.getNativeObject(), kafkaSchema); + } + return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema); + } + throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass() + + "into kafka STRUCT"); + default: + Preconditions.checkArgument(kafkaSchema.type().isPrimitive(), + "Expected primitive schema but got " + kafkaSchema.type()); + return castToKafkaSchema(nativeObject, kafkaSchema); + } } public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) { - if (nativeObject == null) { - return defaultOrThrow(kafkaSchema); - } - if (nativeObject instanceof Number) { // This is needed in case // jackson decided to fit value into some other type internally @@ -121,6 +181,19 @@ public class KafkaConnectData { } } + if (nativeObject instanceof Character) { + Character ch = (Character) nativeObject; + if (kafkaSchema.type() == Schema.Type.STRING) { + return ch.toString(); + } + return castToKafkaSchema(Character.getNumericValue(ch), kafkaSchema); + } + + if (kafkaSchema.type() == Schema.Type.STRING && nativeObject instanceof CharSequence) { + // e.g. org.apache.avro.util.Utf8 + return nativeObject.toString(); + } + return nativeObject; } @@ -161,23 +234,8 @@ public class KafkaConnectData { if (jsonNode == null || jsonNode.isNull()) { return null; } - switch (jsonNode.getNodeType()) { - case BINARY: - try { - return jsonNode.binaryValue(); - } catch (IOException e) { - throw new DataException("Cannot get binary value for " + jsonNode); - } - case BOOLEAN: - return jsonNode.booleanValue(); - case NUMBER: - return jsonNode.doubleValue(); - case STRING: - return jsonNode.textValue(); - default: - throw new DataException("Don't know how to convert " + jsonNode - + " to Connect data (schema is null)."); - } + throw new DataException("Don't know how to convert " + jsonNode + + " to Connect data (schema is null)."); } if (jsonNode == null || jsonNode.isNull()) { @@ -186,39 +244,65 @@ public class KafkaConnectData { switch (kafkaSchema.type()) { case INT8: + Preconditions.checkArgument(jsonNode.isNumber()); return (byte) jsonNode.shortValue(); case INT16: + Preconditions.checkArgument(jsonNode.isNumber()); return jsonNode.shortValue(); case INT32: + if (jsonNode.isTextual() && jsonNode.textValue().length() == 1) { + // char encoded as String instead of Integer + return Character.getNumericValue(jsonNode.textValue().charAt(0)); + } + Preconditions.checkArgument(jsonNode.isNumber()); return jsonNode.intValue(); case INT64: + Preconditions.checkArgument(jsonNode.isNumber()); return jsonNode.longValue(); case FLOAT32: + Preconditions.checkArgument(jsonNode.isNumber()); return jsonNode.floatValue(); case FLOAT64: + Preconditions.checkArgument(jsonNode.isNumber()); return jsonNode.doubleValue(); case BOOLEAN: + Preconditions.checkArgument(jsonNode.isBoolean()); return jsonNode.booleanValue(); case STRING: + Preconditions.checkArgument(jsonNode.isTextual()); return jsonNode.textValue(); case BYTES: + Preconditions.checkArgument(jsonNode.isBinary()); try { return jsonNode.binaryValue(); } catch (IOException e) { throw new DataException("Cannot get binary value for " + jsonNode + " with schema " + kafkaSchema); } case ARRAY: - List<Object> list = new ArrayList<>(); + if (jsonNode.isTextual() && kafkaSchema.valueSchema().type() == Schema.Type.INT32) { + // char[] encoded as String in json + List<Object> list = new ArrayList<>(); + for (char ch: jsonNode.textValue().toCharArray()) { + list.add(Character.getNumericValue(ch)); + } + return list; + } + Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has to be an array"); - for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) { + List<Object> list = new ArrayList<>(); + for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext();) { list.add(jsonAsConnectData(it.next(), kafkaSchema.valueSchema())); } return list; case MAP: + Preconditions.checkArgument(jsonNode.isObject(), "jsonNode has to be an Object node"); + Preconditions.checkArgument(kafkaSchema.keySchema().type() == Schema.Type.STRING, + "kafka schema for json map is expected to be STRING"); Map<String, Object> map = new HashMap<>(); for (Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields(); it.hasNext(); ) { Map.Entry<String, JsonNode> elem = it.next(); - map.put(elem.getKey(), jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema())); + map.put(elem.getKey(), + jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema())); } return map; case STRUCT: diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 23d9f1b5ce2..2c0ea31ac4c 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -22,8 +22,17 @@ package org.apache.pulsar.io.kafka.connect; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; @@ -47,6 +56,8 @@ import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData; +import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -56,11 +67,14 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import org.testng.collections.Maps; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.AbstractMap; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -538,6 +552,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { expected.put("field2", "test"); // integer is coming back from ObjectMapper expected.put("field3", 100); + expected.put("byteField", 0); + expected.put("shortField", 0); + expected.put("intField", 0); + expected.put("longField", 0); + // double is coming back from ObjectMapper + expected.put("floatField", 0.0d); + expected.put("doubleField", 0.0d); SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT"); @@ -565,6 +586,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { expected.put("field2", "test"); // integer is coming back from ObjectMapper expected.put("field3", 100); + expected.put("byteField", 0); + expected.put("shortField", 0); + expected.put("intField", 0); + expected.put("longField", 0); + // double is coming back from ObjectMapper + expected.put("floatField", 0.0d); + expected.put("doubleField", 0.0d); SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT"); @@ -615,6 +643,167 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { Assert.assertEquals(key, 11); } + @Test + public void connectDataComplexAvroSchemaGenericRecordTest() { + AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema + = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class); + + final GenericData.Record key = getComplexStructRecord(); + final GenericData.Record value = getComplexStructRecord(); + KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema), + getGenericRecord(value, pulsarAvroSchema)); + + org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema + .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema)); + + Object connectData = KafkaConnectData.getKafkaConnectData(kv, kafkaSchema); + + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + } + + @Test + public void connectDataPojoArrTest() throws Exception { + PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojo = + new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{ + getPojoComplexStruct(), + getPojoComplexStruct(), + getPojoComplexStruct() + }; + + testPojoAsAvroAndJsonConversionToConnectData(pojo); + } + + @Test + public void connectDataPojoListTest() throws Exception { + List<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo = + Lists.newArrayList( + getPojoComplexStruct(), + getPojoComplexStruct(), + getPojoComplexStruct() + ); + + /* + Need this because of (AFAICT) + https://issues.apache.org/jira/browse/AVRO-1183 + https://github.com/apache/pulsar/issues/4851 + to generate proper schema + */ + PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojoForSchema = + new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{ + getPojoComplexStruct(), + getPojoComplexStruct(), + getPojoComplexStruct() + }; + + AvroSchema pulsarAvroSchema = AvroSchema.of(pojoForSchema.getClass()); + + testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema); + } + + @Test + public void connectDataPojoMapTest() throws Exception { + Map<String, PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo = + Maps.newHashMap(); + pojo.put("key1", getPojoComplexStruct()); + pojo.put("key2", getPojoComplexStruct()); + + testPojoAsAvroAndJsonConversionToConnectData(pojo); + } + + @Test + public void connectDataPrimitivesTest() throws Exception { + testPojoAsAvroAndJsonConversionToConnectData("test"); + + testPojoAsAvroAndJsonConversionToConnectData('a'); + + testPojoAsAvroAndJsonConversionToConnectData(Byte.MIN_VALUE); + testPojoAsAvroAndJsonConversionToConnectData(Byte.MAX_VALUE); + + testPojoAsAvroAndJsonConversionToConnectData(Short.MIN_VALUE); + testPojoAsAvroAndJsonConversionToConnectData(Short.MAX_VALUE); + + testPojoAsAvroAndJsonConversionToConnectData(Integer.MIN_VALUE); + testPojoAsAvroAndJsonConversionToConnectData(Integer.MAX_VALUE); + + testPojoAsAvroAndJsonConversionToConnectData(Long.MIN_VALUE); + testPojoAsAvroAndJsonConversionToConnectData(Long.MAX_VALUE); + + testPojoAsAvroAndJsonConversionToConnectData(Float.MIN_VALUE); + testPojoAsAvroAndJsonConversionToConnectData(Float.MAX_VALUE); + + testPojoAsAvroAndJsonConversionToConnectData(Double.MIN_VALUE); + testPojoAsAvroAndJsonConversionToConnectData(Double.MAX_VALUE); + } + + @Test + public void connectDataPrimitiveArraysTest() throws Exception { + testPojoAsAvroAndJsonConversionToConnectData(new String[] {"test", "test2"}); + + testPojoAsAvroAndJsonConversionToConnectData(new char[] {'a', 'b', 'c'}); + testPojoAsAvroAndJsonConversionToConnectData(new Character[] {'a', 'b', 'c'}); + + testPojoAsAvroAndJsonConversionToConnectData(new byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE}); + + testPojoAsAvroAndJsonConversionToConnectData(new short[] {Short.MIN_VALUE, Short.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Short[] {Short.MIN_VALUE, Short.MAX_VALUE}); + + testPojoAsAvroAndJsonConversionToConnectData(new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Integer[] {Integer.MIN_VALUE, Integer.MAX_VALUE}); + + testPojoAsAvroAndJsonConversionToConnectData(new long[] {Long.MIN_VALUE, Long.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Long[] {Long.MIN_VALUE, Long.MAX_VALUE}); + + testPojoAsAvroAndJsonConversionToConnectData(new float[] {Float.MIN_VALUE, Float.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Float[] {Float.MIN_VALUE, Float.MAX_VALUE}); + + testPojoAsAvroAndJsonConversionToConnectData(new double[] {Double.MIN_VALUE, Double.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Double[] {Double.MIN_VALUE, Double.MAX_VALUE}); + } + + private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo) throws IOException { + AvroSchema pulsarAvroSchema = AvroSchema.of(pojo.getClass()); + testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema); + } + + private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchema pulsarAvroSchema) throws IOException { + Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema); + + org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema + .getKafkaConnectSchema(pulsarAvroSchema); + + Object connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema); + + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + + Object jsonNode = pojoAsJsonNode(pojo); + connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema); + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + } + + private JsonNode pojoAsJsonNode(Object pojo) { + ObjectMapper om = new ObjectMapper(); + JsonNode json = om.valueToTree(pojo); + return json; + } + + private Object pojoAsAvroRecord(Object pojo, AvroSchema pulsarAvroSchema) throws IOException { + DatumWriter writer = new ReflectDatumWriter<>(); + + writer.setSchema(pulsarAvroSchema.getAvroSchema()); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder enc = new EncoderFactory().directBinaryEncoder(out, null); + writer.write(pojo, enc); + enc.flush(); + byte[] data = out.toByteArray(); + + DatumReader<GenericRecord> reader = new GenericDatumReader<>(pulsarAvroSchema.getAvroSchema()); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + Object value = reader.read(null, decoder); + return value; + } + @Test public void schemaKeyValueAvroSchemaTest() throws Exception { AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema @@ -635,12 +824,26 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { expectedKey.put("field2", "key"); // integer is coming back from ObjectMapper expectedKey.put("field3", 101); + expectedKey.put("byteField", 0); + expectedKey.put("shortField", 0); + expectedKey.put("intField", 0); + expectedKey.put("longField", 0); + // double is coming back from ObjectMapper + expectedKey.put("floatField", 0.0d); + expectedKey.put("doubleField", 0.0d); Map<String, Object> expectedValue = new LinkedHashMap<>(); expectedValue.put("field1", 10); expectedValue.put("field2", "value"); // integer is coming back from ObjectMapper expectedValue.put("field3", 100); + expectedValue.put("byteField", 0); + expectedValue.put("shortField", 0); + expectedValue.put("intField", 0); + expectedValue.put("longField", 0); + // double is coming back from ObjectMapper + expectedValue.put("floatField", 0.0d); + expectedValue.put("doubleField", 0.0d); KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema), getGenericRecord(value, pulsarAvroSchema)); @@ -781,4 +984,100 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { sink.close(); } + private static PulsarSchemaToKafkaSchemaTest.StructWithAnnotations getPojoStructWithAnnotations() { + return new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations() + .setField1(1) + .setField2("field2") + .setField3(100L) + .setByteField((byte) 1) + .setShortField((short) 2) + .setIntField(3) + .setLongField(4) + .setFloatField(5.0f) + .setDoubleField(6.0d); + } + + private static PulsarSchemaToKafkaSchemaTest.ComplexStruct getPojoComplexStruct() { + return new PulsarSchemaToKafkaSchemaTest.ComplexStruct() + .setStringList(Lists.newArrayList("str11", "str22")) + .setStructArr(new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations[]{getPojoStructWithAnnotations()}) + .setStructList(Lists.newArrayList(getPojoStructWithAnnotations())) + .setStruct(getPojoStructWithAnnotations()) + .setStructMap(Map.of("key1", getPojoStructWithAnnotations(), + "key2", getPojoStructWithAnnotations())) + + .setByteField((byte) 1) + .setShortField((short) 2) + .setIntField(3) + .setLongField(4) + .setFloatField(5.0f) + .setDoubleField(6.0d) + .setCharField('c') + .setStringField("some text") + + .setByteArr(new byte[] {1 ,2}) + .setShortArr(new short[] {3, 4}) + .setIntArr(new int[] {5, 6}) + .setLongArr(new long[] {7, 8}) + .setFloatArr(new float[] {9.0f, 10.0f}) + .setDoubleArr(new double[] {11.0d, 12.0d}) + .setCharArr(new char[]{'a', 'b'}) + .setStringArr(new String[] {"abc", "def"}); + } + + private static GenericData.Record getStructRecord() { + AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema + = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class); + + final GenericData.Record rec = new GenericData.Record(pulsarAvroSchema.getAvroSchema()); + + rec.put("field1", 11); + rec.put("field2", "str99"); + rec.put("field3", 101L); + rec.put("byteField", (byte) 1); + rec.put("shortField", (short) 2); + rec.put("intField", 3); + rec.put("longField", 4L); + rec.put("floatField", 5.0f); + rec.put("doubleField", 6.0d); + + return rec; + } + + private static GenericData.Record getComplexStructRecord() { + AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema + = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class); + + final GenericData.Record rec = new GenericData.Record(pulsarAvroSchema.getAvroSchema()); + + rec.put("stringArr", new String[]{"str1", "str2"}); + rec.put("stringList", Lists.newArrayList("str11", "str22")); + rec.put("structArr", new GenericData.Record[]{getStructRecord(), getStructRecord()}); + rec.put("structList", Lists.newArrayList(getStructRecord(), getStructRecord())); + + rec.put("struct", getStructRecord()); + rec.put("byteField", (byte) 1); + rec.put("shortField", (short) 2); + rec.put("intField", 3); + rec.put("longField", 4L); + rec.put("floatField", 5.1f); + rec.put("doubleField", 6.1d); + rec.put("charField", 'c'); + rec.put("stringField", "some string"); + rec.put("byteArr", new byte[] {(byte) 1, (byte) 2}); + rec.put("shortArr", new short[] {(short) 3, (short) 4}); + rec.put("intArr", new int[] {5, 6}); + rec.put("longArr", new long[] {7L, 8L}); + rec.put("floatArr", new float[] {9.0f, 10.0f}); + rec.put("doubleArr", new double[] {11.0d, 12.0d}); + rec.put("charArr", new char[] {'a', 'b', 'c'}); + + Map<String, GenericData.Record> map = new HashMap<>(); + map.put("key1", getStructRecord()); + map.put("key2", getStructRecord()); + + rec.put("structMap", map); + + return rec; + } } diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java index 60caa2bbe81..ecf0633f588 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.io.kafka.connect; import com.google.common.collect.Lists; import lombok.Data; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.apache.avro.reflect.AvroDefault; import org.apache.avro.reflect.Nullable; @@ -35,6 +36,7 @@ import org.testng.annotations.Test; import java.math.BigInteger; import java.util.List; +import java.util.Map; import static org.testng.Assert.assertEquals; @@ -44,15 +46,90 @@ import static org.testng.Assert.assertEquals; @Slf4j public class PulsarSchemaToKafkaSchemaTest { - static final List<String> STRUCT_FIELDS = Lists.newArrayList("field1", "field2", "field3"); + static final List<String> STRUCT_FIELDS = Lists.newArrayList( + "field1", + "field2", + "field3", + "byteField", + "shortField", + "intField", + "longField", + "floatField", + "doubleField" + ); + static final List<String> COMPLEX_STRUCT_FIELDS = Lists.newArrayList( + "stringArr", + "stringList", + "structArr", + "structList", + "structMap", + "struct", + "byteField", + "shortField", + "intField", + "longField", + "floatField", + "doubleField", + "charField", + "stringField", + "byteArr", + "shortArr", + "intArr", + "longArr", + "floatArr", + "doubleArr", + "charArr" + ); @Data + @Accessors(chain = true) static class StructWithAnnotations { int field1; @Nullable String field2; - @AvroDefault("\"1000\"") + @AvroDefault("1000") Long field3; + + @AvroDefault("0") + byte byteField; + @AvroDefault("0") + short shortField; + @AvroDefault("0") + int intField; + @AvroDefault("0") + long longField; + @AvroDefault("0") + float floatField; + @AvroDefault("0") + double doubleField; + } + + @Data + @Accessors(chain = true) + static class ComplexStruct { + List<String> stringList; + StructWithAnnotations[] structArr; + List<StructWithAnnotations> structList; + Map<String, StructWithAnnotations> structMap; + StructWithAnnotations struct; + + byte byteField; + short shortField; + int intField; + long longField; + float floatField; + double doubleField; + char charField; + String stringField; + + byte[] byteArr; + short[] shortArr; + int[] intArr; + long[] longArr; + float[] floatArr; + double[] doubleArr; + char[] charArr; + String[] stringArr; } @Test @@ -153,6 +230,18 @@ public class PulsarSchemaToKafkaSchemaTest { } } + @Test + public void avroComplexSchemaTest() { + AvroSchema<ComplexStruct> pulsarAvroSchema = AvroSchema.of(ComplexStruct.class); + org.apache.kafka.connect.data.Schema kafkaSchema = + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema); + assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT); + assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size()); + for (String name: COMPLEX_STRUCT_FIELDS) { + assertEquals(kafkaSchema.field(name).name(), name); + } + } + @Test public void jsonSchemaTest() { JSONSchema<StructWithAnnotations> jsonSchema = JSONSchema @@ -169,6 +258,22 @@ public class PulsarSchemaToKafkaSchemaTest { } } + @Test + public void jsonComplexSchemaTest() { + JSONSchema<ComplexStruct> jsonSchema = JSONSchema + .of(SchemaDefinition.<ComplexStruct>builder() + .withPojo(ComplexStruct.class) + .withAlwaysAllowNull(false) + .build()); + org.apache.kafka.connect.data.Schema kafkaSchema = + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema); + assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT); + assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size()); + for (String name: COMPLEX_STRUCT_FIELDS) { + assertEquals(kafkaSchema.field(name).name(), name); + } + } + @Test public void castToKafkaSchemaTest() { assertEquals(Byte.class,
