This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 270d8067d945ca775eefd7dfdb2424ec56e18279
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Sun Nov 22 23:27:44 2020 +0100

    fixed #715: Create the sink counterpart of the 
PojoToSchemaAndStructTransform.
---
 .../SinkPojoToSchemaAndStructTransform.java        | 118 ++++++++++
 .../SourcePojoToSchemaAndStructTransform.java      |   2 +-
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |   2 +-
 .../kafkaconnector/transforms/PojoWithMap.java     |  36 +++
 .../SinkPojoToSchemaAndStructTransformTest.java    | 129 ++++++++++
 .../kafkaconnector/transforms/SlackMessage.java    | 259 +++++++++++++++++++++
 .../SourcePojoToSchemaAndStructTransformTest.java  |  19 +-
 7 files changed, 545 insertions(+), 20 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java
new file mode 100644
index 0000000..30ef9ca
--- /dev/null
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.dataformat.avro.AvroFactory;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import io.apicurio.registry.utils.converter.avro.AvroData;
+import io.apicurio.registry.utils.converter.avro.AvroDataConfig;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class SinkPojoToSchemaAndStructTransform<R extends ConnectRecord<R>> 
implements Transformation<R> {
+    public static final String CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY = 
"camel.transformer.sink.pojo.class";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SinkPojoToSchemaAndStructTransform.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper(new 
AvroFactory());
+    private static final String CAMEL_TRANSFORMER_SINK_POJO_CLASS_DOC = "Full 
qualified class name of the pojo you want your record value converted to";
+    private static final Object CAMEL_TRANSFORMER_SINK_POJO_CLASS_DEFAULT = 
ConfigDef.NO_DEFAULT_VALUE;
+    private static final ConfigDef CONFIG_DEF = (new 
ConfigDef()).define(CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, 
ConfigDef.Type.STRING, CAMEL_TRANSFORMER_SINK_POJO_CLASS_DEFAULT, 
ConfigDef.Importance.HIGH, CAMEL_TRANSFORMER_SINK_POJO_CLASS_DOC);
+
+    private String pojoClass;
+    private ObjectReader objectReader;
+    private AvroData avroData;
+
+
+    @Override
+    public R apply(R r) {
+        LOG.debug("Incoming record: {}", r);
+
+        if (r.value() != null && r.valueSchema() != null && 
Schema.Type.STRUCT.equals(r.valueSchema().type())) {
+            GenericRecord avroGenericRecord = 
(GenericRecord)avroData.fromConnectData(r.valueSchema(), r.value());
+
+            LOG.debug("GenericRecord created: {} \nwith schema: {}", 
avroGenericRecord, avroGenericRecord == null ? "null" : 
avroGenericRecord.getClass().getName());
+
+            GenericDatumWriter<GenericRecord> writer = new 
GenericDatumWriter<GenericRecord>(avroGenericRecord.getSchema());
+
+            Object pojo;
+            try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                Encoder encoder = EncoderFactory.get().binaryEncoder(out, 
null);
+                writer.write(avroGenericRecord, encoder);
+                encoder.flush();
+
+                byte[] avroData = out.toByteArray();
+                out.close();
+                pojo = objectReader
+                        .with(new AvroSchema(avroGenericRecord.getSchema()))
+                        .readValue(avroData);
+                LOG.debug("Pojo of class {} created: {}", pojo.getClass(), 
pojo);
+            } catch (IOException e) {
+                throw new ConnectException("Error in generating POJO from 
Struct.", e);
+            }
+
+            LOG.debug("Generate pojo: {}", pojo);
+            return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), 
r.key(),
+                    null, pojo, r.timestamp());
+        } else {
+            LOG.debug("Incoming record with a null value or a value schema != 
Schema.Type.STRUCT, nothing to be done.");
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+        //NOOP
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
+        this.pojoClass = 
config.getString(CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY);
+
+        this.avroData = new AvroData(new AvroDataConfig(configs));
+
+        try {
+            this.objectReader = MAPPER.readerFor(Class.forName(pojoClass));
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException("Unable to initialize 
SinkPojoToSchemaAndStructTransform ", e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
index 6f0f850..128de09 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
@@ -55,7 +55,7 @@ public class SourcePojoToSchemaAndStructTransform<R extends 
ConnectRecord<R>> im
         LOG.debug("Incoming record: {}", r);
 
         if (r.value() != null) {
-            String recordClassCanonicalName = 
r.value().getClass().getCanonicalName();
+            String recordClassCanonicalName = r.value().getClass().getName();
             CacheEntry cacheEntry = 
avroSchemaWrapperCache.computeIfAbsent(recordClassCanonicalName, new 
Function<String, CacheEntry>() {
                 @Override
                 public CacheEntry apply(String s) {
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index c6fa7eb..49bf878 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -295,7 +295,7 @@ public class CamelSourceTaskTest {
         assertEquals(1, results.size());
         Header bigDecimalHeader = 
results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + 
"bigdecimal").next();
         assertEquals("[B", bigDecimalHeader.value().getClass().getName());
-        assertEquals(Decimal.class.getCanonicalName(), 
bigDecimalHeader.schema().name());
+        assertEquals(Decimal.class.getName(), 
bigDecimalHeader.schema().name());
         assertEquals(Schema.Type.BYTES, bigDecimalHeader.schema().type());
 
         sourceTask.stop();
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java
 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java
new file mode 100644
index 0000000..9511a55
--- /dev/null
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoWithMap {
+    private Map<String, Integer> map = new HashMap<>();
+
+    public Map<String, Integer> getMap() {
+        return map;
+    }
+
+    public void setMap(Map<String, Integer> map) {
+        this.map = map;
+    }
+
+    public void addToMap(String key, Integer value) {
+        map.put(key, value);
+    }
+}
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java
 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java
new file mode 100644
index 0000000..5f1655a
--- /dev/null
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.transforms.SlackMessage.Attachment;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SinkPojoToSchemaAndStructTransformTest {
+
+    @Test
+    public void testRecordValueConversion() {
+        SourcePojoToSchemaAndStructTransform 
sourcePojoToSchemaAndStructTransform = new 
SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        SlackMessage sm = new SlackMessage();
+
+        Attachment at1 = new Attachment();
+        Attachment.Field at1f1 = new Attachment.Field();
+        at1f1.setTitle("ciao");
+        at1f1.setShortValue(true);
+        at1.setFields(new 
ArrayList<Attachment.Field>(Collections.singleton(at1f1)));
+        at1.setAuthorName("Andrea");
+
+        Attachment at2 = new Attachment();
+        at2.setColor("green");
+
+        ArrayList<Attachment> attachments = new ArrayList<>();
+        attachments.add(at1);
+        attachments.add(at2);
+
+        sm.setText("text");
+        sm.setAttachments(attachments);
+
+        ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply(
+                new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, sm));
+
+        SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform 
= new SinkPojoToSchemaAndStructTransform();
+        
sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY,
 SlackMessage.class.getName()));
+
+        ConnectRecord transformedCr = 
sinkPojoToSchemaAndStructTransform.apply(cr);
+
+        assertEquals("testTopic", transformedCr.topic());
+        assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+        assertEquals("testKeyValue", transformedCr.key());
+        assertEquals(SlackMessage.class.getName(), 
transformedCr.value().getClass().getName());
+        SlackMessage transformedSM = (SlackMessage)transformedCr.value();
+        assertEquals(sm.getText(), transformedSM.getText());
+        assertEquals(sm.getAttachments().size(), 
transformedSM.getAttachments().size());
+    }
+
+    @Test
+    public void testMapValueConversion() {
+        SourcePojoToSchemaAndStructTransform 
sourcePojoToSchemaAndStructTransform = new 
SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        PojoWithMap pwm = new PojoWithMap();
+        pwm.addToMap("ciao", 9);
+
+        ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply(new 
SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, pwm));
+
+        SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform 
= new SinkPojoToSchemaAndStructTransform();
+        
sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY,
 PojoWithMap.class.getName()));
+
+        ConnectRecord transformedCr = 
sinkPojoToSchemaAndStructTransform.apply(cr);
+
+        assertEquals("testTopic", transformedCr.topic());
+        assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+        assertEquals("testKeyValue", transformedCr.key());
+
+        assertEquals(PojoWithMap.class.getName(), 
transformedCr.value().getClass().getName());
+        PojoWithMap transformedPWM = (PojoWithMap)transformedCr.value();
+        assertEquals(pwm.getMap().size(), transformedPWM.getMap().size());
+        assertEquals(pwm.getMap().keySet(), transformedPWM.getMap().keySet());
+    }
+
+    @Test()
+    public void testNotStructSchemaConversion() {
+        SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform 
= new SinkPojoToSchemaAndStructTransform();
+        
sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY,
 PojoWithMap.class.getName()));
+
+        Map map = Collections.singletonMap("ciao", 9);
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                null, map);
+
+        sinkPojoToSchemaAndStructTransform.apply(cr);
+    }
+
+    @Test()
+    public void testNullValueConversion() {
+        SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform 
= new SinkPojoToSchemaAndStructTransform();
+        
sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY,
 PojoWithMap.class.getName()));
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, null);
+
+        ConnectRecord transformedCr = 
sinkPojoToSchemaAndStructTransform.apply(cr);
+        assertEquals(cr, transformedCr);
+    }
+}
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java
 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java
new file mode 100644
index 0000000..24fa125
--- /dev/null
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.List;
+
+//XXX: this class can be removed and tests updated accordingly after Camel 
updated to 3.7.0
+public class SlackMessage {
+
+    private String text;
+    private String channel;
+    private String username;
+    private String user;
+    private String iconUrl;
+    private String iconEmoji;
+    private List<Attachment> attachments;
+
+    public String getText() {
+        return text;
+    }
+
+    public void setText(String text) {
+        this.text = text;
+    }
+
+    public String getChannel() {
+        return channel;
+    }
+
+    public void setChannel(String channel) {
+        this.channel = channel;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getIconUrl() {
+        return iconUrl;
+    }
+
+    public void setIconUrl(String iconUrl) {
+        this.iconUrl = iconUrl;
+    }
+
+    public String getIconEmoji() {
+        return iconEmoji;
+    }
+
+    public void setIconEmoji(String iconEmoji) {
+        this.iconEmoji = iconEmoji;
+    }
+
+    public List<Attachment> getAttachments() {
+        return attachments;
+    }
+
+    public void setAttachments(List<Attachment> attachments) {
+        this.attachments = attachments;
+    }
+
+    public static class Attachment {
+
+        private String fallback;
+        private String color;
+        private String pretext;
+        private String authorName;
+        private String authorLink;
+        private String authorIcon;
+        private String title;
+        private String titleLink;
+        private String text;
+        private String imageUrl;
+        private String thumbUrl;
+        private String footer;
+        private String footerIcon;
+        private Long ts;
+        private List<Attachment.Field> fields;
+
+        public String getFallback() {
+            return fallback;
+        }
+
+        public void setFallback(String fallback) {
+            this.fallback = fallback;
+        }
+
+        public String getColor() {
+            return color;
+        }
+
+        public void setColor(String color) {
+            this.color = color;
+        }
+
+        public String getPretext() {
+            return pretext;
+        }
+
+        public void setPretext(String pretext) {
+            this.pretext = pretext;
+        }
+
+        public String getAuthorName() {
+            return authorName;
+        }
+
+        public void setAuthorName(String authorName) {
+            this.authorName = authorName;
+        }
+
+        public String getAuthorLink() {
+            return authorLink;
+        }
+
+        public void setAuthorLink(String authorLink) {
+            this.authorLink = authorLink;
+        }
+
+        public String getAuthorIcon() {
+            return authorIcon;
+        }
+
+        public void setAuthorIcon(String authorIcon) {
+            this.authorIcon = authorIcon;
+        }
+
+        public String getTitle() {
+            return title;
+        }
+
+        public void setTitle(String title) {
+            this.title = title;
+        }
+
+        public String getTitleLink() {
+            return titleLink;
+        }
+
+        public void setTitleLink(String titleLink) {
+            this.titleLink = titleLink;
+        }
+
+        public String getText() {
+            return text;
+        }
+
+        public void setText(String text) {
+            this.text = text;
+        }
+
+        public String getImageUrl() {
+            return imageUrl;
+        }
+
+        public void setImageUrl(String imageUrl) {
+            this.imageUrl = imageUrl;
+        }
+
+        public String getThumbUrl() {
+            return thumbUrl;
+        }
+
+        public void setThumbUrl(String thumbUrl) {
+            this.thumbUrl = thumbUrl;
+        }
+
+        public String getFooter() {
+            return footer;
+        }
+
+        public void setFooter(String footer) {
+            this.footer = footer;
+        }
+
+        public String getFooterIcon() {
+            return footerIcon;
+        }
+
+        public void setFooterIcon(String footerIcon) {
+            this.footerIcon = footerIcon;
+        }
+
+        public Long getTs() {
+            return ts;
+        }
+
+        public void setTs(Long ts) {
+            this.ts = ts;
+        }
+
+        public List<Attachment.Field> getFields() {
+            return fields;
+        }
+
+        public void setFields(List<Attachment.Field> fields) {
+            this.fields = fields;
+        }
+
+        public static class Field {
+
+            private String title;
+            private String value;
+            private Boolean shortValue;
+
+            public String getTitle() {
+                return title;
+            }
+
+            public void setTitle(String title) {
+                this.title = title;
+            }
+
+            public String getValue() {
+                return value;
+            }
+
+            public void setValue(String value) {
+                this.value = value;
+            }
+
+            public Boolean isShortValue() {
+                return shortValue;
+            }
+
+            public void setShortValue(Boolean shortValue) {
+                this.shortValue = shortValue;
+            }
+        }
+    }
+
+}
+
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
index 529c45f..dfe1ed6 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
@@ -18,7 +18,6 @@ package org.apache.camel.kafkaconnector.transforms;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -157,7 +156,7 @@ public class SourcePojoToSchemaAndStructTransformTest {
         assertEquals(1, 
sourcePojoToSchemaAndStructTransform.getCache().keySet().size());
         ConnectRecord transformedCr = 
sourcePojoToSchemaAndStructTransform.apply(cr);
         assertEquals(1, 
sourcePojoToSchemaAndStructTransform.getCache().keySet().size());
-        
assertTrue(sourcePojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getCanonicalName()));
+        
assertTrue(sourcePojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getName()));
     }
 
     private void atLeastOneFieldWithGivenValueExists(List structs, String 
fieldName, String fieldExpectedValue) {
@@ -167,20 +166,4 @@ public class SourcePojoToSchemaAndStructTransformTest {
             struct -> assertEquals(fieldExpectedValue, ((Struct) 
struct).getString(fieldName))
         );
     }
-
-    public class PojoWithMap {
-        private Map<String, Integer> map = new HashMap<>();
-
-        public Map<String, Integer> getMap() {
-            return map;
-        }
-
-        public void setMap(Map<String, Integer> map) {
-            this.map = map;
-        }
-
-        public void addToMap(String key, Integer value) {
-            map.put(key, value);
-        }
-    }
 }

Reply via email to