This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2b16111b332468929314592494832118734b7c61 Author: Andrey Yegorov <[email protected]> AuthorDate: Mon May 16 07:05:05 2022 -0700 fix: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type... (#15598) (cherry picked from commit f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a) --- .../io/kafka/connect/schema/KafkaConnectData.java | 77 ++++++++++- .../io/kafka/connect/KafkaConnectSinkTest.java | 141 +++++++++++++++++---- .../connect/PulsarSchemaToKafkaSchemaTest.java | 33 +++++ 3 files changed, 218 insertions(+), 33 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java index b1a370ddde4..e39ce086a53 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java @@ -27,22 +27,20 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +@Slf4j public class KafkaConnectData { public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) { if (kafkaSchema == null) { return nativeObject; } - if (nativeObject == null) { - return defaultOrThrow(kafkaSchema); - } - if (nativeObject instanceof JsonNode) { JsonNode node = (JsonNode) nativeObject; return jsonAsConnectData(node, kafkaSchema); @@ -51,6 +49,73 @@ public class KafkaConnectData { return avroAsConnectData(avroRecord, kafkaSchema); } + return castToKafkaSchema(nativeObject, kafkaSchema); + } + + public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) { + if (nativeObject == null) { + return defaultOrThrow(kafkaSchema); + } + + if (nativeObject instanceof Number) { + // This is needed in case + // jackson decided to fit value into some other type internally + // (e.g. Double instead of Float). + // Kafka's ConnectSchema expects exact type + // https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71 + Number num = (Number) nativeObject; + switch (kafkaSchema.type()) { + case INT8: + if (!(nativeObject instanceof Byte)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Byte", nativeObject.getClass()); + } + return num.byteValue(); + } + break; + case INT16: + if (!(nativeObject instanceof Short)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Short", nativeObject.getClass()); + } + return num.shortValue(); + } + break; + case INT32: + if (!(nativeObject instanceof Integer)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Integer", nativeObject.getClass()); + } + return num.intValue(); + } + break; + case INT64: + if (!(nativeObject instanceof Long)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Long", nativeObject.getClass()); + } + return num.longValue(); + } + break; + case FLOAT32: + if (!(nativeObject instanceof Float)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Float", nativeObject.getClass()); + } + return num.floatValue(); + } + break; + case FLOAT64: + if (!(nativeObject instanceof Double)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Double", nativeObject.getClass()); + } + return num.doubleValue(); + } + break; + } + } + return nativeObject; } @@ -86,9 +151,9 @@ public class KafkaConnectData { case BOOLEAN: return jsonNode.booleanValue(); case NUMBER: - jsonNode.doubleValue(); + return jsonNode.doubleValue(); case STRING: - jsonNode.textValue(); + return jsonNode.textValue(); default: throw new DataException("Don't know how to convert " + jsonNode + " to Connect data (schema is null)."); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 1fba098a228..14c7dcd7ef8 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -336,8 +336,28 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { assertEquals(result.get("keySchema"), expectedKeySchema); assertEquals(result.get("valueSchema"), expectedSchema); - SinkRecord sinkRecord = sink.toSinkRecord(record); - return sinkRecord; + if (schema.getSchemaInfo().getType().isPrimitive()) { + // to test cast of primitive values + Message msgOut = mock(MessageImpl.class); + when(msgOut.getValue()).thenReturn(getGenericRecord(result.get("value"), schema)); + when(msgOut.getKey()).thenReturn(result.get("key").toString()); + when(msgOut.hasKey()).thenReturn(true); + when(msgOut.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0)); + + Record<GenericObject> recordOut = PulsarRecord.<String>builder() + .topicName("fake-topic") + .message(msgOut) + .schema(schema) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .build(); + + SinkRecord sinkRecord = sink.toSinkRecord(recordOut); + return sinkRecord; + } else { + SinkRecord sinkRecord = sink.toSinkRecord(record); + return sinkRecord; + } } private GenericRecord getGenericRecord(Object value, Schema schema) { @@ -353,71 +373,135 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { return rec; } + + @Test + public void genericRecordCastTest() throws Exception { + props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName()); + + KafkaConnectSink sink = new KafkaConnectSink(); + sink.open(props, context); + + AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema + = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class); + + final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema()); + // schema type INT32 + obj.put("field1", (byte)10); + // schema type STRING + obj.put("field2", "test"); + // schema type INT64 + obj.put("field3", (short)100); + + final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema); + Message msg = mock(MessageImpl.class); + when(msg.getValue()).thenReturn(rec); + when(msg.getKey()).thenReturn("key"); + when(msg.hasKey()).thenReturn(true); + when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0)); + + final AtomicInteger status = new AtomicInteger(0); + Record<GenericObject> record = PulsarRecord.<String>builder() + .topicName("fake-topic") + .message(msg) + .schema(pulsarAvroSchema) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .build(); + + SinkRecord sinkRecord = sink.toSinkRecord(record); + + Struct out = (Struct) sinkRecord.value(); + Assert.assertEquals(out.get("field1").getClass(), Integer.class); + Assert.assertEquals(out.get("field2").getClass(), String.class); + Assert.assertEquals(out.get("field3").getClass(), Long.class); + + Assert.assertEquals(out.get("field1"), 10); + Assert.assertEquals(out.get("field2"), "test"); + Assert.assertEquals(out.get("field3"), 100L); + + sink.close(); + } + @Test public void bytesRecordSchemaTest() throws Exception { byte[] in = "val".getBytes(StandardCharsets.US_ASCII); SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", "BYTES"); - byte[] out = (byte[]) sinkRecord.value(); - Assert.assertEquals(out, in); + // test/mock writes it as string + Assert.assertEquals(sinkRecord.value(), "val"); } @Test public void stringRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", "STRING"); - String out = (String) sinkRecord.value(); - Assert.assertEquals(out, "val"); + Assert.assertEquals(sinkRecord.value().getClass(), String.class); + Assert.assertEquals(sinkRecord.value(), "val"); } @Test public void booleanRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN"); - boolean out = (boolean) sinkRecord.value(); - Assert.assertEquals(out, true); + Assert.assertEquals(sinkRecord.value().getClass(), Boolean.class); + Assert.assertEquals(sinkRecord.value(), true); } @Test public void byteRecordSchemaTest() throws Exception { // int 1 is coming back from ObjectMapper SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8"); - byte out = (byte) sinkRecord.value(); - Assert.assertEquals(out, 1); + Assert.assertEquals(sinkRecord.value().getClass(), Byte.class); + Assert.assertEquals(sinkRecord.value(), (byte)1); } @Test public void shortRecordSchemaTest() throws Exception { // int 1 is coming back from ObjectMapper SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16"); - short out = (short) sinkRecord.value(); - Assert.assertEquals(out, 1); + Assert.assertEquals(sinkRecord.value().getClass(), Short.class); + Assert.assertEquals(sinkRecord.value(), (short)1); } @Test public void integerRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32"); - int out = (int) sinkRecord.value(); - Assert.assertEquals(out, Integer.MAX_VALUE); + Assert.assertEquals(sinkRecord.value().getClass(), Integer.class); + Assert.assertEquals(sinkRecord.value(), Integer.MAX_VALUE); } @Test public void longRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64"); - long out = (long) sinkRecord.value(); - Assert.assertEquals(out, Long.MAX_VALUE); + Assert.assertEquals(sinkRecord.value().getClass(), Long.class); + Assert.assertEquals(sinkRecord.value(), Long.MAX_VALUE); + } + + @Test + public void longRecordSchemaTestCast() throws Exception { + // int 1 is coming from ObjectMapper, expect Long (as in schema) from sinkRecord + SinkRecord sinkRecord = recordSchemaTest(1L, Schema.INT64, 1, "INT64"); + Assert.assertEquals(sinkRecord.value().getClass(), Long.class); + Assert.assertEquals(sinkRecord.value(), 1L); } @Test public void floatRecordSchemaTest() throws Exception { - // 1.0d is coming back from ObjectMapper + // 1.0d is coming back from ObjectMapper, expect Float (as in schema) from sinkRecord SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32"); - float out = (float) sinkRecord.value(); - Assert.assertEquals(out, 1.0d); + Assert.assertEquals(sinkRecord.value().getClass(), Float.class); + Assert.assertEquals(sinkRecord.value(), 1.0f); } @Test public void doubleRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64"); - double out = (double) sinkRecord.value(); - Assert.assertEquals(out, Double.MAX_VALUE); + Assert.assertEquals(sinkRecord.value().getClass(), Double.class); + Assert.assertEquals(sinkRecord.value(), Double.MAX_VALUE); + } + + @Test + public void doubleRecordSchemaTestCast() throws Exception { + SinkRecord sinkRecord = recordSchemaTest(1.0d, Schema.DOUBLE, 1.0d, "FLOAT64"); + Assert.assertEquals(sinkRecord.value().getClass(), Double.class); + Assert.assertEquals(sinkRecord.value(), 1.0d); } @Test @@ -444,9 +528,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT"); Struct out = (Struct) sinkRecord.value(); - Assert.assertEquals((int)out.get("field1"), 10); - Assert.assertEquals((String)out.get("field2"), "test"); - Assert.assertEquals((long)out.get("field3"), 100L); + Assert.assertEquals(out.get("field1").getClass(), Integer.class); + Assert.assertEquals(out.get("field1"), 10); + Assert.assertEquals(out.get("field2").getClass(), String.class); + Assert.assertEquals(out.get("field2"), "test"); + Assert.assertEquals(out.get("field3").getClass(), Long.class); + Assert.assertEquals(out.get("field3"), 100L); } @Test @@ -468,9 +555,9 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT"); Struct out = (Struct) sinkRecord.value(); - Assert.assertEquals((int)out.get("field1"), 10); - Assert.assertEquals((String)out.get("field2"), "test"); - Assert.assertEquals((long)out.get("field3"), 100L); + Assert.assertEquals(out.get("field1"), 10); + Assert.assertEquals(out.get("field2"), "test"); + Assert.assertEquals(out.get("field3"), 100L); } @Test diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java index 9075dd9c3d3..60caa2bbe81 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java @@ -29,9 +29,11 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; +import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData; import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema; import org.testng.annotations.Test; +import java.math.BigInteger; import java.util.List; import static org.testng.Assert.assertEquals; @@ -167,6 +169,37 @@ public class PulsarSchemaToKafkaSchemaTest { } } + @Test + public void castToKafkaSchemaTest() { + assertEquals(Byte.class, + KafkaConnectData.castToKafkaSchema(100L, + org.apache.kafka.connect.data.Schema.INT8_SCHEMA).getClass()); + + assertEquals(Short.class, + KafkaConnectData.castToKafkaSchema(100.0d, + org.apache.kafka.connect.data.Schema.INT16_SCHEMA).getClass()); + + assertEquals(Integer.class, + KafkaConnectData.castToKafkaSchema((byte)5, + org.apache.kafka.connect.data.Schema.INT32_SCHEMA).getClass()); + + assertEquals(Long.class, + KafkaConnectData.castToKafkaSchema((short)5, + org.apache.kafka.connect.data.Schema.INT64_SCHEMA).getClass()); + + assertEquals(Float.class, + KafkaConnectData.castToKafkaSchema(1.0d, + org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA).getClass()); + + assertEquals(Double.class, + KafkaConnectData.castToKafkaSchema(1.5f, + org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass()); + + assertEquals(Double.class, + KafkaConnectData.castToKafkaSchema(new BigInteger("100"), + org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass()); + } + @Test public void dateSchemaTest() { org.apache.kafka.connect.data.Schema kafkaSchema =
