This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new fe68a8e872f KCA: handle kafka's logical schemas (#16485) fe68a8e872f is described below commit fe68a8e872ff39369d5d401c8fc68da866c9b2dd Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Wed Aug 31 23:40:03 2022 -0700 KCA: handle kafka's logical schemas (#16485) --- .../io/kafka/connect/schema/KafkaConnectData.java | 58 ++++++ .../connect/schema/PulsarSchemaToKafkaSchema.java | 82 ++++++-- .../io/kafka/connect/KafkaConnectSinkTest.java | 219 ++++++++++++++++----- 3 files changed, 290 insertions(+), 69 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java index 671495c6df6..557cfbb9dd8 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java @@ -30,9 +30,13 @@ import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -122,6 +126,34 @@ public class KafkaConnectData { } public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) { + // special case for a few classes defined in org.apache.kafka.connect.data + // and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema + if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) { + if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) { + if (nativeObject instanceof java.util.Date) { + return nativeObject; + } + return Timestamp.toLogical(kafkaSchema, ((Number) nativeObject).longValue()); + } else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) { + if (nativeObject instanceof java.util.Date) { + return nativeObject; + } + return Date.toLogical(kafkaSchema, ((Number) nativeObject).intValue()); + } else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) { + if (nativeObject instanceof java.util.Date) { + return nativeObject; + } + return Time.toLogical(kafkaSchema, ((Number) nativeObject).intValue()); + } else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) { + if (nativeObject instanceof java.math.BigDecimal) { + return nativeObject; + } + return Decimal.toLogical(kafkaSchema, (byte[]) nativeObject); + } + throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name() + + " for value " + nativeObject); + } + if (nativeObject instanceof Number) { // This is needed in case // jackson decided to fit value into some other type internally @@ -242,6 +274,32 @@ public class KafkaConnectData { return defaultOrThrow(kafkaSchema); } + // special case for a few classes defined in org.apache.kafka.connect.data + // and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema + // time/date as String not supported as the format to parse is not clear + // (add it as a config param?) + if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) { + if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) { + return Timestamp.toLogical(kafkaSchema, jsonNode.longValue()); + } else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) { + return Date.toLogical(kafkaSchema, jsonNode.intValue()); + } else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) { + return Time.toLogical(kafkaSchema, jsonNode.intValue()); + } else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) { + if (jsonNode.isNumber()) { + return jsonNode.decimalValue(); + } + try { + return Decimal.toLogical(kafkaSchema, jsonNode.binaryValue()); + } catch (IOException e) { + throw new IllegalStateException("Could not convert Kafka Logical Schema " + kafkaSchema.name() + + " for jsonNode " + jsonNode + " into Decimal"); + } + } + throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name() + + " for jsonNode " + jsonNode); + } + switch (kafkaSchema.type()) { case INT8: Preconditions.checkArgument(jsonNode.isNumber()); diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java index 72d68610bdb..d1834e2f9dd 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java @@ -22,6 +22,7 @@ package org.apache.pulsar.io.kafka.connect.schema; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ExecutionError; import com.google.common.util.concurrent.UncheckedExecutionException; import java.nio.charset.StandardCharsets; @@ -29,8 +30,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.DataException; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData; @@ -38,6 +43,7 @@ import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData; @Slf4j public class PulsarSchemaToKafkaSchema { private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema; + private static final ImmutableSet<String> kafkaLogicalSchemas; private static final AvroData avroData = new AvroData(1000); private static final Cache<byte[], Schema> schemaCache = CacheBuilder.newBuilder().maximumSize(10000) @@ -56,6 +62,16 @@ public class PulsarSchemaToKafkaSchema { .put(SchemaType.BYTES, Schema.BYTES_SCHEMA) .put(SchemaType.DATE, Date.SCHEMA) .build(); + kafkaLogicalSchemas = ImmutableSet.<String>builder() + .add(Timestamp.LOGICAL_NAME) + .add(Date.LOGICAL_NAME) + .add(Time.LOGICAL_NAME) + .add(Decimal.LOGICAL_NAME) + .build(); + } + + public static boolean matchesToKafkaLogicalSchema(Schema kafkaSchema) { + return kafkaLogicalSchemas.contains(kafkaSchema.name()); } // Parse json to shaded schema @@ -67,30 +83,58 @@ public class PulsarSchemaToKafkaSchema { } public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) { - if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null) { - if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) { - return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType()); - } + if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) { + throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null); + } - try { - return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> { - if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { - KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema; - return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()), - getKafkaConnectSchema(kvSchema.getValueSchema())) - .build(); + String logicalSchemaName = pulsarSchema.getSchemaInfo().getName(); + if (kafkaLogicalSchemas.contains(logicalSchemaName)) { + if (Timestamp.LOGICAL_NAME.equals(logicalSchemaName)) { + return Timestamp.SCHEMA; + } else if (Date.LOGICAL_NAME.equals(logicalSchemaName)) { + return Date.SCHEMA; + } else if (Time.LOGICAL_NAME.equals(logicalSchemaName)) { + return Time.SCHEMA; + } else if (Decimal.LOGICAL_NAME.equals(logicalSchemaName)) { + String scaleString = null; + final int scale; + if (pulsarSchema.getSchemaInfo().getProperties() != null) { + scaleString = pulsarSchema.getSchemaInfo().getProperties().get("scale"); + } + if (scaleString == null) { + throw new DataException("Invalid Decimal schema: scale parameter not found."); + } else { + try { + scale = Integer.parseInt(scaleString); + } catch (NumberFormatException nfe) { + throw new DataException("Invalid scale parameter found in Decimal schema: ", nfe); } - org.apache.pulsar.kafka.shade.avro.Schema avroSchema = - parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(), - StandardCharsets.UTF_8)); - return avroData.toConnectSchema(avroSchema); - }); - } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) { - throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee); + } + return Decimal.schema(scale); } + throw new IllegalStateException("Unsupported Kafka Logical Schema " + logicalSchemaName); + } + + if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) { + return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType()); } - throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null); + try { + return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> { + if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { + KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema; + return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()), + getKafkaConnectSchema(kvSchema.getValueSchema())) + .build(); + } + org.apache.pulsar.kafka.shade.avro.Schema avroSchema = + parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(), + StandardCharsets.UTF_8)); + return avroData.toConnectSchema(avroSchema); + }); + } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) { + throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee); + } } private static IllegalStateException logAndThrowOnUnsupportedSchema( diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 7661e5fc98d..4f3996c8fde 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -35,7 +36,11 @@ import org.apache.avro.io.EncoderFactory; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -52,9 +57,12 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; import org.apache.pulsar.client.util.MessageIdUtils; import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.io.core.SinkContext; @@ -71,10 +79,13 @@ import org.testng.collections.Maps; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.Serializable; +import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.text.SimpleDateFormat; import java.util.AbstractMap; import java.util.HashMap; import java.util.HashSet; @@ -83,6 +94,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; @@ -104,10 +116,35 @@ import static org.testng.Assert.fail; @SuppressWarnings({"unchecked", "rawtypes"}) @Slf4j -public class KafkaConnectSinkTest extends ProducerConsumerBase { +public class KafkaConnectSinkTest extends ProducerConsumerBase { + + public class TestSchema implements Schema<byte[]>, Serializable { + + private SchemaInfo schemaInfo; + + public TestSchema(SchemaInfo schemaInfo) { + this.schemaInfo = schemaInfo; + } + + @Override + public byte[] encode(byte[] data) { + return data; + } + + @Override + public SchemaInfo getSchemaInfo() { + return schemaInfo; + } + + @Override + public Schema<byte[]> clone() { + return null; + } + } public class ResultCaptor<T> implements Answer { private T result = null; + public T getResult() { return result; } @@ -119,7 +156,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { } } - private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset"; + private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset"; private Path file; private Map<String, Object> props; @@ -319,11 +356,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { } private SinkRecord recordSchemaTest(Object value, Schema schema, Object expected, String expectedSchema) throws Exception { - return recordSchemaTest(value, schema, "key", "STRING", expected, expectedSchema); + return recordSchemaTest(value, schema, "key", "STRING", expected, expectedSchema); } private SinkRecord recordSchemaTest(Object value, Schema schema, Object expectedKey, String expectedKeySchema, - Object expected, String expectedSchema) throws Exception { + Object expected, String expectedSchema) throws Exception { props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName()); KafkaConnectSink sink = new KafkaConnectSink(); @@ -354,7 +391,8 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { List<String> lines = Files.readAllLines(file, StandardCharsets.US_ASCII); ObjectMapper om = new ObjectMapper(); - Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>(){}); + Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>() { + }); assertEquals(expectedKey, result.get("key")); assertEquals(expected, result.get("value")); @@ -398,12 +436,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { .map(f -> new Field(f.name(), f.pos())) .collect(Collectors.toList()); - return new GenericAvroRecord(new byte[]{ 1 }, avroSchema, fields, avroRecord); + return new GenericAvroRecord(new byte[]{1}, avroSchema, fields, avroRecord); } else { rec = MockGenericObjectWrapper.builder() .nativeObject(value) .schemaType(schema != null ? schema.getSchemaInfo().getType() : null) - .schemaVersion(new byte[]{ 1 }).build(); + .schemaVersion(new byte[]{1}).build(); } return rec; } @@ -421,11 +459,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema()); // schema type INT32 - obj.put("field1", (byte)10); + obj.put("field1", (byte) 10); // schema type STRING obj.put("field2", "test"); // schema type INT64 - obj.put("field3", (short)100); + obj.put("field3", (short) 100); final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema); Message msg = mock(MessageImpl.class); @@ -482,17 +520,17 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { @Test public void byteRecordSchemaTest() throws Exception { // int 1 is coming back from ObjectMapper - SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8"); + SinkRecord sinkRecord = recordSchemaTest((byte) 1, Schema.INT8, 1, "INT8"); Assert.assertEquals(sinkRecord.value().getClass(), Byte.class); - Assert.assertEquals(sinkRecord.value(), (byte)1); + Assert.assertEquals(sinkRecord.value(), (byte) 1); } @Test public void shortRecordSchemaTest() throws Exception { // int 1 is coming back from ObjectMapper - SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16"); + SinkRecord sinkRecord = recordSchemaTest((short) 1, Schema.INT16, 1, "INT16"); Assert.assertEquals(sinkRecord.value().getClass(), Short.class); - Assert.assertEquals(sinkRecord.value(), (short)1); + Assert.assertEquals(sinkRecord.value(), (short) 1); } @Test @@ -650,6 +688,77 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { Assert.assertEquals(key, 11); } + @Test + public void kafkaLogicalTypesTimestampTest() { + Schema schema = new TestSchema(new SchemaInfoImpl() + .setName(Timestamp.LOGICAL_NAME) + .setType(SchemaType.INT64) + .setSchema(new byte[0])); + + org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema + .getKafkaConnectSchema(schema); + + java.util.Date date = getDateFromString("12/30/1999 11:12:13"); + Object connectData = KafkaConnectData + .getKafkaConnectData(Timestamp.fromLogical(kafkaSchema, date), kafkaSchema); + + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + } + + @Test + public void kafkaLogicalTypesTimeTest() { + Schema schema = new TestSchema(new SchemaInfoImpl() + .setName(Time.LOGICAL_NAME) + .setType(SchemaType.INT32) + .setSchema(new byte[0])); + + org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema + .getKafkaConnectSchema(schema); + + java.util.Date date = getDateFromString("01/01/1970 11:12:13"); + Object connectData = KafkaConnectData + .getKafkaConnectData(Time.fromLogical(kafkaSchema, date), kafkaSchema); + + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + } + + @Test + public void kafkaLogicalTypesDateTest() { + Schema schema = new TestSchema(new SchemaInfoImpl() + .setName(Date.LOGICAL_NAME) + .setType(SchemaType.INT32) + .setSchema(new byte[0])); + + org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema + .getKafkaConnectSchema(schema); + + java.util.Date date = getDateFromString("12/31/2022 00:00:00"); + Object connectData = KafkaConnectData + .getKafkaConnectData(Date.fromLogical(kafkaSchema, date), kafkaSchema); + + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + } + + @Test + public void kafkaLogicalTypesDecimalTest() { + Map<String, String> props = new HashMap<>(); + props.put("scale", "10"); + Schema schema = new TestSchema(new SchemaInfoImpl() + .setName(Decimal.LOGICAL_NAME) + .setType(SchemaType.BYTES) + .setProperties(props) + .setSchema(new byte[0])); + + org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema + .getKafkaConnectSchema(schema); + + Object connectData = KafkaConnectData + .getKafkaConnectData(Decimal.fromLogical(kafkaSchema, BigDecimal.valueOf(100L, 10)), kafkaSchema); + + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + } + + @Test public void connectDataComplexAvroSchemaGenericRecordTest() { AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema @@ -744,28 +853,28 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { @Test public void connectDataPrimitiveArraysTest() throws Exception { - testPojoAsAvroAndJsonConversionToConnectData(new String[] {"test", "test2"}); + testPojoAsAvroAndJsonConversionToConnectData(new String[]{"test", "test2"}); - testPojoAsAvroAndJsonConversionToConnectData(new char[] {'a', 'b', 'c'}); - testPojoAsAvroAndJsonConversionToConnectData(new Character[] {'a', 'b', 'c'}); + testPojoAsAvroAndJsonConversionToConnectData(new char[]{'a', 'b', 'c'}); + testPojoAsAvroAndJsonConversionToConnectData(new Character[]{'a', 'b', 'c'}); - testPojoAsAvroAndJsonConversionToConnectData(new byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new Byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new byte[]{Byte.MIN_VALUE, Byte.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Byte[]{Byte.MIN_VALUE, Byte.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new short[] {Short.MIN_VALUE, Short.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new Short[] {Short.MIN_VALUE, Short.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new short[]{Short.MIN_VALUE, Short.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Short[]{Short.MIN_VALUE, Short.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new Integer[] {Integer.MIN_VALUE, Integer.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new int[]{Integer.MIN_VALUE, Integer.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Integer[]{Integer.MIN_VALUE, Integer.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new long[] {Long.MIN_VALUE, Long.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new Long[] {Long.MIN_VALUE, Long.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new long[]{Long.MIN_VALUE, Long.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Long[]{Long.MIN_VALUE, Long.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new float[] {Float.MIN_VALUE, Float.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new Float[] {Float.MIN_VALUE, Float.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new float[]{Float.MIN_VALUE, Float.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Float[]{Float.MIN_VALUE, Float.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new double[] {Double.MIN_VALUE, Double.MAX_VALUE}); - testPojoAsAvroAndJsonConversionToConnectData(new Double[] {Double.MIN_VALUE, Double.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new double[]{Double.MIN_VALUE, Double.MAX_VALUE}); + testPojoAsAvroAndJsonConversionToConnectData(new Double[]{Double.MIN_VALUE, Double.MAX_VALUE}); } private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo) throws IOException { @@ -853,20 +962,20 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { expectedValue.put("doubleField", 0.0d); KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema), - getGenericRecord(value, pulsarAvroSchema)); + getGenericRecord(value, pulsarAvroSchema)); SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema), expectedKey, "STRUCT", expectedValue, "STRUCT"); Struct outValue = (Struct) sinkRecord.value(); - Assert.assertEquals((int)outValue.get("field1"), 10); - Assert.assertEquals((String)outValue.get("field2"), "value"); - Assert.assertEquals((long)outValue.get("field3"), 100L); + Assert.assertEquals((int) outValue.get("field1"), 10); + Assert.assertEquals((String) outValue.get("field2"), "value"); + Assert.assertEquals((long) outValue.get("field3"), 100L); Struct outKey = (Struct) sinkRecord.key(); - Assert.assertEquals((int)outKey.get("field1"), 11); - Assert.assertEquals((String)outKey.get("field2"), "key"); - Assert.assertEquals((long)outKey.get("field3"), 101L); + Assert.assertEquals((int) outKey.get("field1"), 11); + Assert.assertEquals((String) outKey.get("field2"), "key"); + Assert.assertEquals((long) outKey.get("field3"), 101L); } @Test @@ -1186,7 +1295,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0)); when(msg.hasIndex()).thenReturn(false); - final int partition = (int)(i % numPartitions); + final int partition = (int) (i % numPartitions); final AtomicInteger status = new AtomicInteger(0); Record<GenericObject> record = PulsarRecord.<String>builder() .topicName(topicName) @@ -1257,14 +1366,14 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { .setCharField('c') .setStringField("some text") - .setByteArr(new byte[] {1 ,2}) - .setShortArr(new short[] {3, 4}) - .setIntArr(new int[] {5, 6}) - .setLongArr(new long[] {7, 8}) - .setFloatArr(new float[] {9.0f, 10.0f}) - .setDoubleArr(new double[] {11.0d, 12.0d}) + .setByteArr(new byte[]{1, 2}) + .setShortArr(new short[]{3, 4}) + .setIntArr(new int[]{5, 6}) + .setLongArr(new long[]{7, 8}) + .setFloatArr(new float[]{9.0f, 10.0f}) + .setDoubleArr(new double[]{11.0d, 12.0d}) .setCharArr(new char[]{'a', 'b'}) - .setStringArr(new String[] {"abc", "def"}); + .setStringArr(new String[]{"abc", "def"}); } private static GenericData.Record getStructRecord() { @@ -1306,13 +1415,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { rec.put("doubleField", 6.1d); rec.put("charField", 'c'); rec.put("stringField", "some string"); - rec.put("byteArr", new byte[] {(byte) 1, (byte) 2}); - rec.put("shortArr", new short[] {(short) 3, (short) 4}); - rec.put("intArr", new int[] {5, 6}); - rec.put("longArr", new long[] {7L, 8L}); - rec.put("floatArr", new float[] {9.0f, 10.0f}); - rec.put("doubleArr", new double[] {11.0d, 12.0d}); - rec.put("charArr", new char[] {'a', 'b', 'c'}); + rec.put("byteArr", new byte[]{(byte) 1, (byte) 2}); + rec.put("shortArr", new short[]{(short) 3, (short) 4}); + rec.put("intArr", new int[]{5, 6}); + rec.put("longArr", new long[]{7L, 8L}); + rec.put("floatArr", new float[]{9.0f, 10.0f}); + rec.put("doubleArr", new double[]{11.0d, 12.0d}); + rec.put("charArr", new char[]{'a', 'b', 'c'}); Map<String, GenericData.Record> map = new HashMap<>(); map.put("key1", getStructRecord()); @@ -1322,4 +1431,14 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { return rec; } + + @SneakyThrows + private java.util.Date getDateFromString(String dateInString) { + SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss"); + formatter.setTimeZone(TimeZone.getTimeZone("GMT")); + + java.util.Date parsedDate = formatter.parse(dateInString); + return parsedDate; + } + }