This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch support-camel-type-converter
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f15b53b9beef270736c25a633decb51cc4dbec9b
Author: Omar Al-Safi <[email protected]>
AuthorDate: Mon Dec 9 18:51:45 2019 +0100

    Add the basis for Camel Type Converter transformations
---
 core/pom.xml                                       |   5 +
 .../transforms/CamelTransformSupport.java          |  15 +++
 .../transforms/CamelTypeConverterTransform.java    | 114 +++++++++++++++++++++
 .../camel/kafkaconnector/utils/SchemaHelper.java   |  59 +++++++++++
 .../CamelTypeConverterTransformTest.java           |  35 +++++++
 parent/pom.xml                                     |   6 ++
 6 files changed, 234 insertions(+)

diff --git a/core/pom.xml b/core/pom.xml
index fe67cf3..e803fb0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -97,6 +97,11 @@
             <artifactId>connect-api</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-transforms</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
         <!-- Test -->
         <dependency>
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
new file mode 100644
index 0000000..19fae92
--- /dev/null
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
@@ -0,0 +1,15 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+public abstract class CamelTransformSupport<R extends ConnectRecord<R>> 
implements Transformation<R> {
+
+    private final CamelContext camelContext = new DefaultCamelContext();
+
+    protected CamelContext getCamelContext() {
+        return camelContext;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
new file mode 100644
index 0000000..07351e7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
@@ -0,0 +1,114 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Map;
+
+import org.apache.camel.TypeConverter;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> 
extends CamelTransformSupport<R> {
+
+    private interface ConfigName {
+        String FIELD_TARGET_TYPE = "target.type";
+    }
+
+    private Class<?> fieldTargetType;
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ConfigName.FIELD_TARGET_TYPE, ConfigDef.Type.CLASS, null, 
ConfigDef.Importance.HIGH,
+                    "The target field type to convert the value from, this is 
full qualified Java class, e.g: java.util.Map");
+
+    @Override
+    public R apply(R record) {
+        final Schema schema = operatingSchema(record);
+        final Object value = operatingValue(record);
+
+        final Object convertedValue = 
convertValueWithCamelTypeConverter(value);
+        final Schema updatedSchema = getOrBuildRecordSchema(schema, 
convertedValue);
+
+        return newRecord(record, updatedSchema, convertedValue);
+    }
+
+    private Object convertValueWithCamelTypeConverter(final Object 
originalValue) {
+        final TypeConverter converter = getCamelContext().getTypeConverter();
+        final Object convertedValue = converter.tryConvertTo(fieldTargetType, 
originalValue);
+
+        if (convertedValue == null) {
+            throw new DataException(String.format("CamelTypeConverter was not 
able to converter value `%s` to target type of `%s`", originalValue, 
fieldTargetType.getSimpleName()));
+        }
+
+        return convertedValue;
+    }
+
+    private Schema getOrBuildRecordSchema(final Schema originalSchema, final 
Object value) {
+        final SchemaBuilder builder = 
SchemaUtil.copySchemaBasics(originalSchema, 
SchemaHelper.buildSchemaBuilderForType(value));
+
+        if (originalSchema.isOptional())
+            builder.optional();
+        if (originalSchema.defaultValue() != null)
+            
builder.defaultValue(convertValueWithCamelTypeConverter(originalSchema.defaultValue()));
+
+        return builder.build();
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        fieldTargetType = config.getClass(ConfigName.FIELD_TARGET_TYPE);
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object 
updatedValue);
+
+    public static final class Key<R extends ConnectRecord<R>> extends 
CamelTypeConverterTransform<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object 
updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), 
updatedSchema, updatedValue, record.valueSchema(), record.value(), 
record.timestamp());
+        }
+    }
+
+    public static final class Value<R extends ConnectRecord<R>> extends 
CamelTypeConverterTransform<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object 
updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), 
record.keySchema(), record.key(), updatedSchema, updatedValue, 
record.timestamp());
+        }
+    }
+}
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
new file mode 100644
index 0000000..6b4960d
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -0,0 +1,59 @@
+package org.apache.camel.kafkaconnector.utils;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+
+public final class SchemaHelper {
+
+    /**
+     * Try to build a {@link SchemaBuilder} for a value of type {@link Object}
+     * However, this will only build the schema only for known types, in case 
it can not return the precise SchemaBuilder type
+     * it will return an optional {@link SchemaBuilder.BYTE}
+     * @param value to return the SchemaBuilder for
+     *
+     * @return {@link SchemaBuilder} instance
+     */
+    public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
+        if (value instanceof Byte) {
+            return SchemaBuilder.bytes();
+        }
+        if (value instanceof Short) {
+            return SchemaBuilder.int16();
+        }
+        if (value instanceof Integer) {
+            return SchemaBuilder.int32();
+        }
+        if (value instanceof Long) {
+            return SchemaBuilder.int64();
+        }
+        if (value instanceof Float) {
+            return SchemaBuilder.float32();
+        }
+        if (value instanceof Double) {
+            return SchemaBuilder.float64();
+        }
+        if (value instanceof Boolean) {
+            return SchemaBuilder.bool();
+        }
+        if (value instanceof String) {
+            return SchemaBuilder.string();
+        }
+        if (value instanceof Map) {
+            // Note: optimally we should define the schema better for map, 
however for now we will keep it abstract
+            return new SchemaBuilder(Schema.Type.MAP);
+        }
+        if (value instanceof Iterable) {
+            // Note: optimally we should define the schema better for 
Iterable, however for now we will keep it abstract
+            return new SchemaBuilder(Schema.Type.ARRAY);
+        }
+        if (value instanceof Struct) {
+            return SchemaBuilder.struct();
+        }
+
+        // if we do not fine any of schema out of the above, we just return an 
an optional byte schema
+        return SchemaBuilder.bytes().optional();
+    }
+}
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
new file mode 100644
index 0000000..8973903
--- /dev/null
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -0,0 +1,35 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.junit.Test;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+import static org.junit.Assert.*;
+
+public class CamelTypeConverterTransformTest {
+
+    @Test
+    public void testIfItConvertsConnectRecordCorrectly() {
+        final SourceRecord connectRecord = new 
SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", 
Schema.STRING_SCHEMA, "TRUE");
+        final Map<String, Object> props = new HashMap<>();
+        props.put("target.type", "java.lang.Boolean");
+
+        final Transformation<SourceRecord> transformation = new 
CamelTypeConverterTransform.Value<>();
+
+        transformation.configure(props);
+
+        final SourceRecord transformedSourceRecord = 
transformation.apply(connectRecord);
+
+        assertEquals(true, transformedSourceRecord.value());
+        assertEquals(Schema.BOOLEAN_SCHEMA, 
transformedSourceRecord.valueSchema());
+    }
+
+}
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index fa2a06f..b62f7f6 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -175,6 +175,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-transforms</artifactId>
+                <version>${kafka.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka_2.12</artifactId>
                 <version>${kafka.version}</version>
                 <scope>test</scope>

Reply via email to