This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 270d8067d945ca775eefd7dfdb2424ec56e18279 Author: Andrea Tarocchi <[email protected]> AuthorDate: Sun Nov 22 23:27:44 2020 +0100 fixed #715: Create the sink counterpart of the PojoToSchemaAndStructTransform. --- .../SinkPojoToSchemaAndStructTransform.java | 118 ++++++++++ .../SourcePojoToSchemaAndStructTransform.java | 2 +- .../camel/kafkaconnector/CamelSourceTaskTest.java | 2 +- .../kafkaconnector/transforms/PojoWithMap.java | 36 +++ .../SinkPojoToSchemaAndStructTransformTest.java | 129 ++++++++++ .../kafkaconnector/transforms/SlackMessage.java | 259 +++++++++++++++++++++ .../SourcePojoToSchemaAndStructTransformTest.java | 19 +- 7 files changed, 545 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java new file mode 100644 index 0000000..30ef9ca --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.transforms; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.dataformat.avro.AvroFactory; +import com.fasterxml.jackson.dataformat.avro.AvroSchema; +import io.apicurio.registry.utils.converter.avro.AvroData; +import io.apicurio.registry.utils.converter.avro.AvroDataConfig; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +public class SinkPojoToSchemaAndStructTransform<R extends ConnectRecord<R>> implements Transformation<R> { + public static final String CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY = "camel.transformer.sink.pojo.class"; + + private static final Logger LOG = LoggerFactory.getLogger(SinkPojoToSchemaAndStructTransform.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory()); + private static final String CAMEL_TRANSFORMER_SINK_POJO_CLASS_DOC = "Full qualified class name of the pojo you want your record value converted to"; + private static final Object CAMEL_TRANSFORMER_SINK_POJO_CLASS_DEFAULT = ConfigDef.NO_DEFAULT_VALUE; + private static final ConfigDef CONFIG_DEF = (new ConfigDef()).define(CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, ConfigDef.Type.STRING, CAMEL_TRANSFORMER_SINK_POJO_CLASS_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_TRANSFORMER_SINK_POJO_CLASS_DOC); + + private String pojoClass; + private ObjectReader objectReader; + private AvroData avroData; + + + @Override + public R apply(R r) { + LOG.debug("Incoming record: {}", r); + + if (r.value() != null && r.valueSchema() != null && Schema.Type.STRUCT.equals(r.valueSchema().type())) { + GenericRecord avroGenericRecord = (GenericRecord)avroData.fromConnectData(r.valueSchema(), r.value()); + + LOG.debug("GenericRecord created: {} \nwith schema: {}", avroGenericRecord, avroGenericRecord == null ? "null" : avroGenericRecord.getClass().getName()); + + GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroGenericRecord.getSchema()); + + Object pojo; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + writer.write(avroGenericRecord, encoder); + encoder.flush(); + + byte[] avroData = out.toByteArray(); + out.close(); + pojo = objectReader + .with(new AvroSchema(avroGenericRecord.getSchema())) + .readValue(avroData); + LOG.debug("Pojo of class {} created: {}", pojo.getClass(), pojo); + } catch (IOException e) { + throw new ConnectException("Error in generating POJO from Struct.", e); + } + + LOG.debug("Generate pojo: {}", pojo); + return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), + null, pojo, r.timestamp()); + } else { + LOG.debug("Incoming record with a null value or a value schema != Schema.Type.STRUCT, nothing to be done."); + return r; + } + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + //NOOP + } + + @Override + public void configure(Map<String, ?> configs) { + SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + this.pojoClass = config.getString(CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY); + + this.avroData = new AvroData(new AvroDataConfig(configs)); + + try { + this.objectReader = MAPPER.readerFor(Class.forName(pojoClass)); + } catch (ClassNotFoundException e) { + throw new ConnectException("Unable to initialize SinkPojoToSchemaAndStructTransform ", e); + } + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java index 6f0f850..128de09 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java @@ -55,7 +55,7 @@ public class SourcePojoToSchemaAndStructTransform<R extends ConnectRecord<R>> im LOG.debug("Incoming record: {}", r); if (r.value() != null) { - String recordClassCanonicalName = r.value().getClass().getCanonicalName(); + String recordClassCanonicalName = r.value().getClass().getName(); CacheEntry cacheEntry = avroSchemaWrapperCache.computeIfAbsent(recordClassCanonicalName, new Function<String, CacheEntry>() { @Override public CacheEntry apply(String s) { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index c6fa7eb..49bf878 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -295,7 +295,7 @@ public class CamelSourceTaskTest { assertEquals(1, results.size()); Header bigDecimalHeader = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + "bigdecimal").next(); assertEquals("[B", bigDecimalHeader.value().getClass().getName()); - assertEquals(Decimal.class.getCanonicalName(), bigDecimalHeader.schema().name()); + assertEquals(Decimal.class.getName(), bigDecimalHeader.schema().name()); assertEquals(Schema.Type.BYTES, bigDecimalHeader.schema().type()); sourceTask.stop(); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java new file mode 100644 index 0000000..9511a55 --- /dev/null +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.transforms; + +import java.util.HashMap; +import java.util.Map; + +public class PojoWithMap { + private Map<String, Integer> map = new HashMap<>(); + + public Map<String, Integer> getMap() { + return map; + } + + public void setMap(Map<String, Integer> map) { + this.map = map; + } + + public void addToMap(String key, Integer value) { + map.put(key, value); + } +} diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java new file mode 100644 index 0000000..5f1655a --- /dev/null +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; + +import org.apache.camel.kafkaconnector.transforms.SlackMessage.Attachment; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class SinkPojoToSchemaAndStructTransformTest { + + @Test + public void testRecordValueConversion() { + SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform(); + sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap()); + + SlackMessage sm = new SlackMessage(); + + Attachment at1 = new Attachment(); + Attachment.Field at1f1 = new Attachment.Field(); + at1f1.setTitle("ciao"); + at1f1.setShortValue(true); + at1.setFields(new ArrayList<Attachment.Field>(Collections.singleton(at1f1))); + at1.setAuthorName("Andrea"); + + Attachment at2 = new Attachment(); + at2.setColor("green"); + + ArrayList<Attachment> attachments = new ArrayList<>(); + attachments.add(at1); + attachments.add(at2); + + sm.setText("text"); + sm.setAttachments(attachments); + + ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply( + new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + Schema.BYTES_SCHEMA, sm)); + + SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform = new SinkPojoToSchemaAndStructTransform(); + sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, SlackMessage.class.getName())); + + ConnectRecord transformedCr = sinkPojoToSchemaAndStructTransform.apply(cr); + + assertEquals("testTopic", transformedCr.topic()); + assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema()); + assertEquals("testKeyValue", transformedCr.key()); + assertEquals(SlackMessage.class.getName(), transformedCr.value().getClass().getName()); + SlackMessage transformedSM = (SlackMessage)transformedCr.value(); + assertEquals(sm.getText(), transformedSM.getText()); + assertEquals(sm.getAttachments().size(), transformedSM.getAttachments().size()); + } + + @Test + public void testMapValueConversion() { + SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform(); + sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap()); + + PojoWithMap pwm = new PojoWithMap(); + pwm.addToMap("ciao", 9); + + ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply(new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + Schema.BYTES_SCHEMA, pwm)); + + SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform = new SinkPojoToSchemaAndStructTransform(); + sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, PojoWithMap.class.getName())); + + ConnectRecord transformedCr = sinkPojoToSchemaAndStructTransform.apply(cr); + + assertEquals("testTopic", transformedCr.topic()); + assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema()); + assertEquals("testKeyValue", transformedCr.key()); + + assertEquals(PojoWithMap.class.getName(), transformedCr.value().getClass().getName()); + PojoWithMap transformedPWM = (PojoWithMap)transformedCr.value(); + assertEquals(pwm.getMap().size(), transformedPWM.getMap().size()); + assertEquals(pwm.getMap().keySet(), transformedPWM.getMap().keySet()); + } + + @Test() + public void testNotStructSchemaConversion() { + SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform = new SinkPojoToSchemaAndStructTransform(); + sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, PojoWithMap.class.getName())); + + Map map = Collections.singletonMap("ciao", 9); + + ConnectRecord cr = new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + null, map); + + sinkPojoToSchemaAndStructTransform.apply(cr); + } + + @Test() + public void testNullValueConversion() { + SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform = new SinkPojoToSchemaAndStructTransform(); + sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, PojoWithMap.class.getName())); + + ConnectRecord cr = new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + Schema.BYTES_SCHEMA, null); + + ConnectRecord transformedCr = sinkPojoToSchemaAndStructTransform.apply(cr); + assertEquals(cr, transformedCr); + } +} diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java new file mode 100644 index 0000000..24fa125 --- /dev/null +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.transforms; + +import java.util.List; + +//XXX: this class can be removed and tests updated accordingly after Camel updated to 3.7.0 +public class SlackMessage { + + private String text; + private String channel; + private String username; + private String user; + private String iconUrl; + private String iconEmoji; + private List<Attachment> attachments; + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + public String getChannel() { + return channel; + } + + public void setChannel(String channel) { + this.channel = channel; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getIconUrl() { + return iconUrl; + } + + public void setIconUrl(String iconUrl) { + this.iconUrl = iconUrl; + } + + public String getIconEmoji() { + return iconEmoji; + } + + public void setIconEmoji(String iconEmoji) { + this.iconEmoji = iconEmoji; + } + + public List<Attachment> getAttachments() { + return attachments; + } + + public void setAttachments(List<Attachment> attachments) { + this.attachments = attachments; + } + + public static class Attachment { + + private String fallback; + private String color; + private String pretext; + private String authorName; + private String authorLink; + private String authorIcon; + private String title; + private String titleLink; + private String text; + private String imageUrl; + private String thumbUrl; + private String footer; + private String footerIcon; + private Long ts; + private List<Attachment.Field> fields; + + public String getFallback() { + return fallback; + } + + public void setFallback(String fallback) { + this.fallback = fallback; + } + + public String getColor() { + return color; + } + + public void setColor(String color) { + this.color = color; + } + + public String getPretext() { + return pretext; + } + + public void setPretext(String pretext) { + this.pretext = pretext; + } + + public String getAuthorName() { + return authorName; + } + + public void setAuthorName(String authorName) { + this.authorName = authorName; + } + + public String getAuthorLink() { + return authorLink; + } + + public void setAuthorLink(String authorLink) { + this.authorLink = authorLink; + } + + public String getAuthorIcon() { + return authorIcon; + } + + public void setAuthorIcon(String authorIcon) { + this.authorIcon = authorIcon; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getTitleLink() { + return titleLink; + } + + public void setTitleLink(String titleLink) { + this.titleLink = titleLink; + } + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + public String getImageUrl() { + return imageUrl; + } + + public void setImageUrl(String imageUrl) { + this.imageUrl = imageUrl; + } + + public String getThumbUrl() { + return thumbUrl; + } + + public void setThumbUrl(String thumbUrl) { + this.thumbUrl = thumbUrl; + } + + public String getFooter() { + return footer; + } + + public void setFooter(String footer) { + this.footer = footer; + } + + public String getFooterIcon() { + return footerIcon; + } + + public void setFooterIcon(String footerIcon) { + this.footerIcon = footerIcon; + } + + public Long getTs() { + return ts; + } + + public void setTs(Long ts) { + this.ts = ts; + } + + public List<Attachment.Field> getFields() { + return fields; + } + + public void setFields(List<Attachment.Field> fields) { + this.fields = fields; + } + + public static class Field { + + private String title; + private String value; + private Boolean shortValue; + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public Boolean isShortValue() { + return shortValue; + } + + public void setShortValue(Boolean shortValue) { + this.shortValue = shortValue; + } + } + } + +} + diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java index 529c45f..dfe1ed6 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java @@ -18,7 +18,6 @@ package org.apache.camel.kafkaconnector.transforms; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -157,7 +156,7 @@ public class SourcePojoToSchemaAndStructTransformTest { assertEquals(1, sourcePojoToSchemaAndStructTransform.getCache().keySet().size()); ConnectRecord transformedCr = sourcePojoToSchemaAndStructTransform.apply(cr); assertEquals(1, sourcePojoToSchemaAndStructTransform.getCache().keySet().size()); - assertTrue(sourcePojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getCanonicalName())); + assertTrue(sourcePojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getName())); } private void atLeastOneFieldWithGivenValueExists(List structs, String fieldName, String fieldExpectedValue) { @@ -167,20 +166,4 @@ public class SourcePojoToSchemaAndStructTransformTest { struct -> assertEquals(fieldExpectedValue, ((Struct) struct).getString(fieldName)) ); } - - public class PojoWithMap { - private Map<String, Integer> map = new HashMap<>(); - - public Map<String, Integer> getMap() { - return map; - } - - public void setMap(Map<String, Integer> map) { - this.map = map; - } - - public void addToMap(String key, Integer value) { - map.put(key, value); - } - } }
