This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 515a9bfd46ec0f2ffa3bb3d2d9cd646575574565 Author: Andrey Yegorov <[email protected]> AuthorDate: Thu Apr 14 08:55:23 2022 -0700 [pulsar-io] KCA: properly handle KeyValue that getNativeObject() returns: corrected type + support for KeyValue<GenericRecord, GenericRecord> (#15025) (cherry picked from commit d76b5d40da2c9055102b3ecf3e5f6b358ac52732) --- .../pulsar/io/kafka/connect/KafkaConnectSink.java | 18 ++- .../io/kafka/connect/schema/KafkaConnectData.java | 20 +++ .../io/kafka/connect/KafkaConnectSinkTest.java | 136 +++++++++++++++++++-- .../kafka/connect/SchemaedFileStreamSinkTask.java | 47 ++++--- 4 files changed, 193 insertions(+), 28 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 502154065d9..31f7cbf6399 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -51,9 +51,9 @@ import org.apache.kafka.connect.sink.SinkTask; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.KeyValueSchema; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData; @@ -255,11 +255,21 @@ public class KafkaConnectSink implements Sink<GenericObject> { && sourceRecord.getSchema().getSchemaInfo() != null && sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) { KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema(); - KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject(); keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema()); valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema()); - key = kv.getKey(); - value = kv.getValue(); + + Object nativeObject = sourceRecord.getValue().getNativeObject(); + + if (nativeObject instanceof KeyValue) { + KeyValue kv = (KeyValue) nativeObject; + key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema); + value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema); + } else if (nativeObject != null) { + throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass()); + } else { + key = null; + value = null; + } } else { if (sourceRecord.getMessage().get().hasBase64EncodedKey()) { key = sourceRecord.getMessage().get().getKeyBytes(); diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java index e39ce086a53..8374dd24bf7 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.apache.pulsar.client.api.schema.GenericRecord; @Slf4j public class KafkaConnectData { @@ -47,6 +48,10 @@ public class KafkaConnectData { } else if (nativeObject instanceof GenericData.Record) { GenericData.Record avroRecord = (GenericData.Record) nativeObject; return avroAsConnectData(avroRecord, kafkaSchema); + } else if (nativeObject instanceof GenericRecord) { + // Pulsar's GenericRecord + GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject; + return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema); } return castToKafkaSchema(nativeObject, kafkaSchema); @@ -134,6 +139,21 @@ public class KafkaConnectData { return struct; } + static Object pulsarGenericRecordAsConnectData(GenericRecord genericRecord, Schema kafkaSchema) { + if (kafkaSchema == null) { + if (genericRecord == null) { + return null; + } + throw new DataException("Don't know how to convert " + genericRecord + " to Connect data (schema is null)."); + } + + Struct struct = new Struct(kafkaSchema); + for (Field field : kafkaSchema.fields()) { + struct.put(field, getKafkaConnectData(genericRecord.getField(field.name()), field.schema())); + } + return struct; + } + // with some help of // https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) { diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 14c7dcd7ef8..23d9f1b5ce2 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; @@ -40,10 +41,11 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; import org.apache.pulsar.client.util.MessageIdUtils; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; -import org.apache.pulsar.io.core.KeyValue; import org.apache.pulsar.io.core.SinkContext; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -58,12 +60,14 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.AbstractMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -331,10 +335,10 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { ObjectMapper om = new ObjectMapper(); Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>(){}); - assertEquals(result.get("key"), expectedKey); - assertEquals(result.get("value"), expected); - assertEquals(result.get("keySchema"), expectedKeySchema); - assertEquals(result.get("valueSchema"), expectedSchema); + assertEquals(expectedKey, result.get("key")); + assertEquals(expected, result.get("value")); + assertEquals(expectedKeySchema, result.get("keySchema")); + assertEquals(expectedSchema, result.get("valueSchema")); if (schema.getSchemaInfo().getType().isPrimitive()) { // to test cast of primitive values @@ -362,8 +366,18 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { private GenericRecord getGenericRecord(Object value, Schema schema) { final GenericRecord rec; - if(value instanceof GenericRecord) { + if (value instanceof GenericRecord) { rec = (GenericRecord) value; + } else if (value instanceof org.apache.avro.generic.GenericRecord) { + org.apache.avro.generic.GenericRecord avroRecord = + (org.apache.avro.generic.GenericRecord) value; + org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get(); + List<Field> fields = avroSchema.getFields() + .stream() + .map(f -> new Field(f.name(), f.pos())) + .collect(Collectors.toList()); + + return new GenericAvroRecord(new byte[]{ 1 }, avroSchema, fields, avroRecord); } else { rec = MockGenericObjectWrapper.builder() .nativeObject(value) @@ -592,7 +606,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { } @Test - public void KeyValueSchemaTest() throws Exception { + public void schemaKeyValueSchemaTest() throws Exception { KeyValue<Integer, String> kv = new KeyValue<>(11, "value"); SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING"); String val = (String) sinkRecord.value(); @@ -601,6 +615,114 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { Assert.assertEquals(key, 11); } + @Test + public void schemaKeyValueAvroSchemaTest() throws Exception { + AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema + = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class); + + final GenericData.Record key = new GenericData.Record(pulsarAvroSchema.getAvroSchema()); + key.put("field1", 11); + key.put("field2", "key"); + key.put("field3", 101L); + + final GenericData.Record value = new GenericData.Record(pulsarAvroSchema.getAvroSchema()); + value.put("field1", 10); + value.put("field2", "value"); + value.put("field3", 100L); + + Map<String, Object> expectedKey = new LinkedHashMap<>(); + expectedKey.put("field1", 11); + expectedKey.put("field2", "key"); + // integer is coming back from ObjectMapper + expectedKey.put("field3", 101); + + Map<String, Object> expectedValue = new LinkedHashMap<>(); + expectedValue.put("field1", 10); + expectedValue.put("field2", "value"); + // integer is coming back from ObjectMapper + expectedValue.put("field3", 100); + + KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema), + getGenericRecord(value, pulsarAvroSchema)); + + SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema), + expectedKey, "STRUCT", expectedValue, "STRUCT"); + + Struct outValue = (Struct) sinkRecord.value(); + Assert.assertEquals((int)outValue.get("field1"), 10); + Assert.assertEquals((String)outValue.get("field2"), "value"); + Assert.assertEquals((long)outValue.get("field3"), 100L); + + Struct outKey = (Struct) sinkRecord.key(); + Assert.assertEquals((int)outKey.get("field1"), 11); + Assert.assertEquals((String)outKey.get("field2"), "key"); + Assert.assertEquals((long)outKey.get("field3"), 101L); + } + + @Test + public void nullKeyValueSchemaTest() throws Exception { + props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName()); + + KafkaConnectSink sink = new KafkaConnectSink(); + sink.open(props, context); + + Message msg = mock(MessageImpl.class); + // value is null + when(msg.getValue()).thenReturn(null); + when(msg.getKey()).thenReturn("key"); + when(msg.hasKey()).thenReturn(true); + when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0)); + + final AtomicInteger status = new AtomicInteger(0); + Record<GenericObject> record = PulsarRecord.<String>builder() + .topicName("fake-topic") + .message(msg) + .schema(Schema.KeyValue(Schema.INT32, Schema.STRING)) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .build(); + + sink.write(record); + sink.flush(); + + // expect fail + assertEquals(status.get(), -1); + + sink.close(); + } + + @Test + public void wrongKeyValueSchemaTest() throws Exception { + props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName()); + + KafkaConnectSink sink = new KafkaConnectSink(); + sink.open(props, context); + + Message msg = mock(MessageImpl.class); + // value is of a wrong/unsupported type + when(msg.getValue()).thenReturn(new AbstractMap.SimpleEntry<>(11, "value")); + when(msg.getKey()).thenReturn("key"); + when(msg.hasKey()).thenReturn(true); + when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0)); + + final AtomicInteger status = new AtomicInteger(0); + Record<GenericObject> record = PulsarRecord.<String>builder() + .topicName("fake-topic") + .message(msg) + .schema(Schema.KeyValue(Schema.INT32, Schema.STRING)) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .build(); + + sink.write(record); + sink.flush(); + + // expect fail + assertEquals(status.get(), -1); + + sink.close(); + } + @Test public void offsetTest() throws Exception { final AtomicLong entryId = new AtomicLong(0L); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java index 9821a58eb22..07b9117d2e4 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java @@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka.connect; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -37,6 +38,7 @@ import java.util.Map; * A FileStreamSinkTask for testing that writes data other than just a value, i.e.: * key, value, key and value schemas. */ +@Slf4j public class SchemaedFileStreamSinkTask extends FileStreamSinkTask { @Override @@ -49,32 +51,28 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask { ? new String((byte[]) record.value(), StandardCharsets.US_ASCII) : record.value(); + Object key = record.keySchema() == Schema.BYTES_SCHEMA + ? new String((byte[]) record.key(), StandardCharsets.US_ASCII) + : record.key(); + Map<String, Object> recOut = Maps.newHashMap(); recOut.put("keySchema", record.keySchema().type().toString()); recOut.put("valueSchema", record.valueSchema().type().toString()); - recOut.put("key", record.key()); - if (val instanceof Struct) { - Map<String, Object> map = Maps.newHashMap(); - Struct struct = (Struct)val; - - // no recursion needed for tests - for (Field f: struct.schema().fields()) { - map.put(f.name(), struct.get(f)); - } - - recOut.put("value", map); - } else { - recOut.put("value", val); - } + recOut.put("key", toWritableValue(key)); + recOut.put("value", toWritableValue(val)); ObjectMapper om = new ObjectMapper(); try { + String valueAsString = om.writeValueAsString(recOut); + + log.info("FileSink writing {}", valueAsString); + SinkRecord toSink = new SinkRecord(record.topic(), record.kafkaPartition(), - record.keySchema(), - record.key(), Schema.STRING_SCHEMA, - om.writeValueAsString(recOut), + "", // blank key, real one is serialized with recOut + Schema.STRING_SCHEMA, + valueAsString, record.kafkaOffset(), record.timestamp(), record.timestampType()); @@ -87,4 +85,19 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask { super.put(out); } + private Object toWritableValue(Object val) { + if (val instanceof Struct) { + Map<String, Object> map = Maps.newHashMap(); + Struct struct = (Struct) val; + + // no recursion needed for tests + for (Field f: struct.schema().fields()) { + map.put(f.name(), struct.get(f)); + } + return map; + } else { + return val; + } + } + }
