This is an automated email from the ASF dual-hosted git repository. oalsafi pushed a commit to branch support-camel-type-converter in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f15b53b9beef270736c25a633decb51cc4dbec9b Author: Omar Al-Safi <[email protected]> AuthorDate: Mon Dec 9 18:51:45 2019 +0100 Add the basis for Camel Type Converter transformations --- core/pom.xml | 5 + .../transforms/CamelTransformSupport.java | 15 +++ .../transforms/CamelTypeConverterTransform.java | 114 +++++++++++++++++++++ .../camel/kafkaconnector/utils/SchemaHelper.java | 59 +++++++++++ .../CamelTypeConverterTransformTest.java | 35 +++++++ parent/pom.xml | 6 ++ 6 files changed, 234 insertions(+) diff --git a/core/pom.xml b/core/pom.xml index fe67cf3..e803fb0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -97,6 +97,11 @@ <artifactId>connect-api</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-transforms</artifactId> + <scope>provided</scope> + </dependency> <!-- Test --> <dependency> diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java new file mode 100644 index 0000000..19fae92 --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java @@ -0,0 +1,15 @@ +package org.apache.camel.kafkaconnector.transforms; + +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; + +public abstract class CamelTransformSupport<R extends ConnectRecord<R>> implements Transformation<R> { + + private final CamelContext camelContext = new DefaultCamelContext(); + + protected CamelContext getCamelContext() { + return camelContext; + } +} diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java new file mode 100644 index 0000000..07351e7 --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java @@ -0,0 +1,114 @@ +package org.apache.camel.kafkaconnector.transforms; + +import java.util.Map; + +import org.apache.camel.TypeConverter; +import org.apache.camel.kafkaconnector.utils.SchemaHelper; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> extends CamelTransformSupport<R> { + + private interface ConfigName { + String FIELD_TARGET_TYPE = "target.type"; + } + + private Class<?> fieldTargetType; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.FIELD_TARGET_TYPE, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH, + "The target field type to convert the value from, this is full qualified Java class, e.g: java.util.Map"); + + @Override + public R apply(R record) { + final Schema schema = operatingSchema(record); + final Object value = operatingValue(record); + + final Object convertedValue = convertValueWithCamelTypeConverter(value); + final Schema updatedSchema = getOrBuildRecordSchema(schema, convertedValue); + + return newRecord(record, updatedSchema, convertedValue); + } + + private Object convertValueWithCamelTypeConverter(final Object originalValue) { + final TypeConverter converter = getCamelContext().getTypeConverter(); + final Object convertedValue = converter.tryConvertTo(fieldTargetType, originalValue); + + if (convertedValue == null) { + throw new DataException(String.format("CamelTypeConverter was not able to converter value `%s` to target type of `%s`", originalValue, fieldTargetType.getSimpleName())); + } + + return convertedValue; + } + + private Schema getOrBuildRecordSchema(final Schema originalSchema, final Object value) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(originalSchema, SchemaHelper.buildSchemaBuilderForType(value)); + + if (originalSchema.isOptional()) + builder.optional(); + if (originalSchema.defaultValue() != null) + builder.defaultValue(convertValueWithCamelTypeConverter(originalSchema.defaultValue())); + + return builder.build(); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + @Override + public void configure(Map<String, ?> props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + fieldTargetType = config.getClass(ConfigName.FIELD_TARGET_TYPE); + } + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); + + public static final class Key<R extends ConnectRecord<R>> extends CamelTypeConverterTransform<R> { + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + } + + public static final class Value<R extends ConnectRecord<R>> extends CamelTypeConverterTransform<R> { + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); + } + } +} diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java new file mode 100644 index 0000000..6b4960d --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java @@ -0,0 +1,59 @@ +package org.apache.camel.kafkaconnector.utils; + +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +public final class SchemaHelper { + + /** + * Try to build a {@link SchemaBuilder} for a value of type {@link Object} + * However, this will only build the schema only for known types, in case it can not return the precise SchemaBuilder type + * it will return an optional {@link SchemaBuilder.BYTE} + * @param value to return the SchemaBuilder for + * + * @return {@link SchemaBuilder} instance + */ + public static SchemaBuilder buildSchemaBuilderForType(final Object value) { + if (value instanceof Byte) { + return SchemaBuilder.bytes(); + } + if (value instanceof Short) { + return SchemaBuilder.int16(); + } + if (value instanceof Integer) { + return SchemaBuilder.int32(); + } + if (value instanceof Long) { + return SchemaBuilder.int64(); + } + if (value instanceof Float) { + return SchemaBuilder.float32(); + } + if (value instanceof Double) { + return SchemaBuilder.float64(); + } + if (value instanceof Boolean) { + return SchemaBuilder.bool(); + } + if (value instanceof String) { + return SchemaBuilder.string(); + } + if (value instanceof Map) { + // Note: optimally we should define the schema better for map, however for now we will keep it abstract + return new SchemaBuilder(Schema.Type.MAP); + } + if (value instanceof Iterable) { + // Note: optimally we should define the schema better for Iterable, however for now we will keep it abstract + return new SchemaBuilder(Schema.Type.ARRAY); + } + if (value instanceof Struct) { + return SchemaBuilder.struct(); + } + + // if we do not fine any of schema out of the above, we just return an an optional byte schema + return SchemaBuilder.bytes().optional(); + } +} diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java new file mode 100644 index 0000000..8973903 --- /dev/null +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java @@ -0,0 +1,35 @@ +package org.apache.camel.kafkaconnector.transforms; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.kafkaconnector.utils.SchemaHelper; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.junit.Test; +import org.apache.kafka.connect.connector.ConnectRecord; + +import static org.junit.Assert.*; + +public class CamelTypeConverterTransformTest { + + @Test + public void testIfItConvertsConnectRecordCorrectly() { + final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "TRUE"); + final Map<String, Object> props = new HashMap<>(); + props.put("target.type", "java.lang.Boolean"); + + final Transformation<SourceRecord> transformation = new CamelTypeConverterTransform.Value<>(); + + transformation.configure(props); + + final SourceRecord transformedSourceRecord = transformation.apply(connectRecord); + + assertEquals(true, transformedSourceRecord.value()); + assertEquals(Schema.BOOLEAN_SCHEMA, transformedSourceRecord.valueSchema()); + } + +} \ No newline at end of file diff --git a/parent/pom.xml b/parent/pom.xml index fa2a06f..b62f7f6 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -175,6 +175,12 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> + <artifactId>connect-transforms</artifactId> + <version>${kafka.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>${kafka.version}</version> <scope>test</scope>
