Repository: kafka Updated Branches: refs/heads/0.11.0 b31df613f -> 64e12d3c6
KAFKA-4714; TimestampConverter transformation (KIP-66) Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Konstantine Karantasis <[email protected]>, Jason Gustafson <[email protected]> Closes #3065 from ewencp/kafka-3209-timestamp-converter (cherry picked from commit 61bab2d875ab5e03d0df4f62217346549a4c64c3) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64e12d3c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64e12d3c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64e12d3c Branch: refs/heads/0.11.0 Commit: 64e12d3c641baeb6a4a624b971ba33721d656054 Parents: b31df61 Author: Ewen Cheslack-Postava <[email protected]> Authored: Fri May 19 11:26:59 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri May 19 11:28:20 2017 -0700 ---------------------------------------------------------------------- .../kafka/connect/tools/TransformationDoc.java | 4 +- .../connect/transforms/TimestampConverter.java | 452 +++++++++++++++++++ .../connect/transforms/util/Requirements.java | 2 +- .../transforms/TimestampConverterTest.java | 370 +++++++++++++++ 4 files changed, 826 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/64e12d3c/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index 1a8f0a8..b76e7d4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.transforms.MaskField; import org.apache.kafka.connect.transforms.RegexRouter; import org.apache.kafka.connect.transforms.ReplaceField; import org.apache.kafka.connect.transforms.SetSchemaMetadata; +import org.apache.kafka.connect.transforms.TimestampConverter; import org.apache.kafka.connect.transforms.TimestampRouter; import org.apache.kafka.connect.transforms.ValueToKey; @@ -58,7 +59,8 @@ public class TransformationDoc { new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF), new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF), new DocInfo(Flatten.class.getName(), Flatten.OVERVIEW_DOC, Flatten.CONFIG_DEF), - new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, Cast.CONFIG_DEF) + new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, Cast.CONFIG_DEF), + new DocInfo(TimestampConverter.class.getName(), TimestampConverter.OVERVIEW_DOC, TimestampConverter.CONFIG_DEF) ); private static void printTransformationHtml(PrintStream out, DocInfo docInfo) { http://git-wip-us.apache.org/repos/asf/kafka/blob/64e12d3c/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java new file mode 100644 index 0000000..ce7d002 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +public abstract class TimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> { + + public static final String OVERVIEW_DOC = + "Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types." + + "Applies to individual fields or to the entire value." + + "<p/>Use the concrete transformation type designed for the record key (<code>" + TimestampConverter.Key.class.getName() + "</code>) " + + "or value (<code>" + TimestampConverter.Value.class.getName() + "</code>)."; + + public static final String FIELD_CONFIG = "field"; + private static final String FIELD_DEFAULT = ""; + + public static final String TYPE_CONFIG = "type"; + + public static final String FORMAT_CONFIG = "format"; + private static final String FORMAT_DEFAULT = ""; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH, + "The field containing the timestamp, or empty if the entire value is a timestamp") + .define(TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, + "The desired timestamp representation: string, unix, Date, Time, or Timestamp") + .define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM, + "A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string " + + "or used to parse the input if the input is a string."); + + private static final String PURPOSE = "converting timestamp formats"; + + private static final String TYPE_STRING = "string"; + private static final String TYPE_UNIX = "unix"; + private static final String TYPE_DATE = "Date"; + private static final String TYPE_TIME = "Time"; + private static final String TYPE_TIMESTAMP = "Timestamp"; + private static final Set<String> VALID_TYPES = new HashSet<>(Arrays.asList(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP)); + + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + private interface TimestampTranslator { + /** + * Convert from the type-specific format to the universal java.util.Date format + */ + Date toRaw(Config config, Object orig); + + /** + * Get the schema for this format. + */ + Schema typeSchema(); + + /** + * Convert from the universal java.util.Date format to the type-specific format + */ + Object toType(Config config, Date orig); + } + + private static final Map<String, TimestampTranslator> TRANSLATORS = new HashMap<>(); + static { + TRANSLATORS.put(TYPE_STRING, new TimestampTranslator() { + @Override + public Date toRaw(Config config, Object orig) { + if (!(orig instanceof String)) + throw new DataException("Expected string timestamp to be a String, but found " + orig.getClass()); + try { + return config.format.parse((String) orig); + } catch (ParseException e) { + throw new DataException("Could not parse timestamp: value (" + orig + ") does not match pattern (" + + config.format.toPattern() + ")", e); + } + } + + @Override + public Schema typeSchema() { + return Schema.STRING_SCHEMA; + } + + @Override + public String toType(Config config, Date orig) { + synchronized (config.format) { + return config.format.format(orig); + } + } + }); + + TRANSLATORS.put(TYPE_UNIX, new TimestampTranslator() { + @Override + public Date toRaw(Config config, Object orig) { + if (!(orig instanceof Long)) + throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass()); + return Timestamp.toLogical(Timestamp.SCHEMA, (Long) orig); + } + + @Override + public Schema typeSchema() { + return Schema.INT64_SCHEMA; + } + + @Override + public Long toType(Config config, Date orig) { + return Timestamp.fromLogical(Timestamp.SCHEMA, orig); + } + }); + + TRANSLATORS.put(TYPE_DATE, new TimestampTranslator() { + @Override + public Date toRaw(Config config, Object orig) { + if (!(orig instanceof Date)) + throw new DataException("Expected Date to be a java.util.Date, but found " + orig.getClass()); + // Already represented as a java.util.Date and Connect Dates are a subset of valid java.util.Date values + return (Date) orig; + } + + @Override + public Schema typeSchema() { + return org.apache.kafka.connect.data.Date.SCHEMA; + } + + @Override + public Date toType(Config config, Date orig) { + Calendar result = Calendar.getInstance(UTC); + result.setTime(orig); + result.set(Calendar.HOUR_OF_DAY, 0); + result.set(Calendar.MINUTE, 0); + result.set(Calendar.SECOND, 0); + result.set(Calendar.MILLISECOND, 0); + return result.getTime(); + } + }); + + TRANSLATORS.put(TYPE_TIME, new TimestampTranslator() { + @Override + public Date toRaw(Config config, Object orig) { + if (!(orig instanceof Date)) + throw new DataException("Expected Time to be a java.util.Date, but found " + orig.getClass()); + // Already represented as a java.util.Date and Connect Times are a subset of valid java.util.Date values + return (Date) orig; + } + + @Override + public Schema typeSchema() { + return Time.SCHEMA; + } + + @Override + public Date toType(Config config, Date orig) { + Calendar origCalendar = Calendar.getInstance(UTC); + origCalendar.setTime(orig); + Calendar result = Calendar.getInstance(UTC); + result.setTimeInMillis(0L); + result.set(Calendar.HOUR_OF_DAY, origCalendar.get(Calendar.HOUR_OF_DAY)); + result.set(Calendar.MINUTE, origCalendar.get(Calendar.MINUTE)); + result.set(Calendar.SECOND, origCalendar.get(Calendar.SECOND)); + result.set(Calendar.MILLISECOND, origCalendar.get(Calendar.MILLISECOND)); + return result.getTime(); + } + }); + + TRANSLATORS.put(TYPE_TIMESTAMP, new TimestampTranslator() { + @Override + public Date toRaw(Config config, Object orig) { + if (!(orig instanceof Date)) + throw new DataException("Expected Timestamp to be a java.util.Date, but found " + orig.getClass()); + return (Date) orig; + } + + @Override + public Schema typeSchema() { + return Timestamp.SCHEMA; + } + + @Override + public Date toType(Config config, Date orig) { + return orig; + } + }); + } + + // This is a bit unusual, but allows the transformation config to be passed to static anonymous classes to customize + // their behavior + private static class Config { + Config(String field, String type, SimpleDateFormat format) { + this.field = field; + this.type = type; + this.format = format; + } + String field; + String type; + SimpleDateFormat format; + } + private Config config; + private Cache<Schema, Schema> schemaUpdateCache; + + + @Override + public void configure(Map<String, ?> configs) { + final SimpleConfig simpleConfig = new SimpleConfig(CONFIG_DEF, configs); + final String field = simpleConfig.getString(FIELD_CONFIG); + final String type = simpleConfig.getString(TYPE_CONFIG); + String formatPattern = simpleConfig.getString(FORMAT_CONFIG); + schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16)); + + if (!VALID_TYPES.contains(type)) { + throw new ConfigException("Unknown timestamp type in TimestampConverter: " + type + ". Valid values are " + + Utils.join(VALID_TYPES, ", ") + "."); + } + if (type.equals(TYPE_STRING) && formatPattern.trim().isEmpty()) { + throw new ConfigException("TimestampConverter requires format option to be specified when using string timestamps"); + } + SimpleDateFormat format = null; + if (formatPattern != null && !formatPattern.trim().isEmpty()) { + try { + format = new SimpleDateFormat(formatPattern); + format.setTimeZone(UTC); + } catch (IllegalArgumentException e) { + throw new ConfigException("TimestampConverter requires a SimpleDateFormat-compatible pattern for string timestamps: " + + formatPattern, e); + } + } + config = new Config(field, type, format); + } + + @Override + public R apply(R record) { + if (operatingSchema(record) == null) { + return applySchemaless(record); + } else { + return applyWithSchema(record); + } + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + public static class Key<R extends ConnectRecord<R>> extends TimestampConverter<R> { + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + } + + public static class Value<R extends ConnectRecord<R>> extends TimestampConverter<R> { + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); + } + } + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); + + private R applyWithSchema(R record) { + final Schema schema = operatingSchema(record); + if (config.field.isEmpty()) { + Object value = operatingValue(record); + // New schema is determined by the requested target timestamp type + Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); + return newRecord(record, updatedSchema, convertTimestamp(value, timestampTypeFromSchema(schema))); + } else { + final Struct value = requireStruct(operatingValue(record), PURPOSE); + Schema updatedSchema = schemaUpdateCache.get(value.schema()); + if (updatedSchema == null) { + SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + for (Field field : schema.fields()) { + if (field.name().equals(config.field)) { + builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema()); + } else { + builder.field(field.name(), field.schema()); + } + } + if (schema.isOptional()) + builder.optional(); + if (schema.defaultValue() != null) { + Struct updatedDefaultValue = applyValueWithSchema((Struct) schema.defaultValue(), builder); + builder.defaultValue(updatedDefaultValue); + } + + updatedSchema = builder.build(); + schemaUpdateCache.put(schema, updatedSchema); + } + + Struct updatedValue = applyValueWithSchema(value, updatedSchema); + return newRecord(record, updatedSchema, updatedValue); + } + } + + private Struct applyValueWithSchema(Struct value, Schema updatedSchema) { + Struct updatedValue = new Struct(updatedSchema); + for (Field field : value.schema().fields()) { + final Object updatedFieldValue; + if (field.name().equals(config.field)) { + updatedFieldValue = convertTimestamp(value.get(field), timestampTypeFromSchema(field.schema())); + } else { + updatedFieldValue = value.get(field); + } + updatedValue.put(field.name(), updatedFieldValue); + } + return updatedValue; + } + + private R applySchemaless(R record) { + if (config.field.isEmpty()) { + Object value = operatingValue(record); + return newRecord(record, null, convertTimestamp(value)); + } else { + final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE); + final HashMap<String, Object> updatedValue = new HashMap<>(value); + updatedValue.put(config.field, convertTimestamp(value.get(config.field))); + return newRecord(record, null, updatedValue); + } + } + + /** + * Determine the type/format of the timestamp based on the schema + */ + private String timestampTypeFromSchema(Schema schema) { + if (Timestamp.LOGICAL_NAME.equals(schema.name())) { + return TYPE_TIMESTAMP; + } else if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(schema.name())) { + return TYPE_DATE; + } else if (Time.LOGICAL_NAME.equals(schema.name())) { + return TYPE_TIME; + } else if (schema.type().equals(Schema.Type.STRING)) { + // If not otherwise specified, string == user-specified string format for timestamps + return TYPE_STRING; + } else if (schema.type().equals(Schema.Type.INT64)) { + // If not otherwise specified, long == unix time + return TYPE_UNIX; + } + throw new ConnectException("Schema " + schema + " does not correspond to a known timestamp type format"); + } + + /** + * Infer the type/format of the timestamp based on the raw Java type + */ + private String inferTimestampType(Object timestamp) { + // Note that we can't infer all types, e.g. Date/Time/Timestamp all have the same runtime representation as a + // java.util.Date + if (timestamp instanceof Date) { + return TYPE_TIMESTAMP; + } else if (timestamp instanceof Long) { + return TYPE_UNIX; + } else if (timestamp instanceof String) { + return TYPE_STRING; + } + throw new DataException("TimestampConverter does not support " + timestamp.getClass() + " objects as timestamps"); + } + + /** + * Convert the given timestamp to the target timestamp format. + * @param timestamp the input timestamp + * @param timestampFormat the format of the timestamp, or null if the format should be inferred + * @return the converted timestamp + */ + private Object convertTimestamp(Object timestamp, String timestampFormat) { + if (timestampFormat == null) { + timestampFormat = inferTimestampType(timestamp); + } + + TimestampTranslator sourceTranslator = TRANSLATORS.get(timestampFormat); + if (sourceTranslator == null) { + throw new ConnectException("Unsupported timestamp type: " + timestampFormat); + } + Date rawTimestamp = sourceTranslator.toRaw(config, timestamp); + + TimestampTranslator targetTranslator = TRANSLATORS.get(config.type); + if (targetTranslator == null) { + throw new ConnectException("Unsupported timestamp type: " + config.type); + } + return targetTranslator.toType(config, rawTimestamp); + } + + private Object convertTimestamp(Object timestamp) { + return convertTimestamp(timestamp, null); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/64e12d3c/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java index 0220907..6f1be19 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java @@ -55,7 +55,7 @@ public class Requirements { } private static String nullSafeClassName(Object x) { - return x == null ? "null" : x.getClass().getCanonicalName(); + return x == null ? "null" : x.getClass().getName(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/64e12d3c/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java new file mode 100644 index 0000000..1b93874 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import java.util.Calendar; +import java.util.Collections; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TimestampConverterTest { + + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + private static final Calendar EPOCH; + private static final Calendar TIME; + private static final Calendar DATE; + private static final Calendar DATE_PLUS_TIME; + private static final long DATE_PLUS_TIME_UNIX; + private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z"; + private static final String DATE_PLUS_TIME_STRING; + + static { + EPOCH = GregorianCalendar.getInstance(UTC); + EPOCH.setTimeInMillis(0L); + + TIME = GregorianCalendar.getInstance(UTC); + TIME.setTimeInMillis(0L); + TIME.add(Calendar.MILLISECOND, 1234); + + DATE = GregorianCalendar.getInstance(UTC); + DATE.setTimeInMillis(0L); + DATE.set(1970, Calendar.JANUARY, 1, 0, 0, 0); + DATE.add(Calendar.DATE, 1); + + DATE_PLUS_TIME = GregorianCalendar.getInstance(UTC); + DATE_PLUS_TIME.setTimeInMillis(0L); + DATE_PLUS_TIME.add(Calendar.DATE, 1); + DATE_PLUS_TIME.add(Calendar.MILLISECOND, 1234); + + DATE_PLUS_TIME_UNIX = DATE_PLUS_TIME.getTime().getTime(); + DATE_PLUS_TIME_STRING = "1970 01 02 00 00 01 234 UTC"; + } + + + // Configuration + + @Test(expected = ConfigException.class) + public void testConfigNoTargetType() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.<String, String>emptyMap()); + } + + @Test(expected = ConfigException.class) + public void testConfigInvalidTargetType() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "invalid")); + } + + @Test(expected = ConfigException.class) + public void testConfigMissingFormat() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "string")); + } + + @Test(expected = ConfigException.class) + public void testConfigInvalidFormat() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TYPE_CONFIG, "string"); + config.put(TimestampConverter.FORMAT_CONFIG, "bad-format"); + xform.configure(config); + } + + + // Conversions without schemas (most flexible Timestamp -> other types) + + @Test + public void testSchemalessIdentity() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + + assertNull(transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); + } + + @Test + public void testSchemalessTimestampToDate() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Date")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + + assertNull(transformed.valueSchema()); + assertEquals(DATE.getTime(), transformed.value()); + } + + @Test + public void testSchemalessTimestampToTime() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Time")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + + assertNull(transformed.valueSchema()); + assertEquals(TIME.getTime(), transformed.value()); + } + + @Test + public void testSchemalessTimestampToUnix() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "unix")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + + assertNull(transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME_UNIX, transformed.value()); + } + + @Test + public void testSchemalessTimestampToString() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TYPE_CONFIG, "string"); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xform.configure(config); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + + assertNull(transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME_STRING, transformed.value()); + } + + + // Conversions without schemas (core types -> most flexible Timestamp format) + + @Test + public void testSchemalessDateToTimestamp() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime())); + + assertNull(transformed.valueSchema()); + // No change expected since the source type is coarser-grained + assertEquals(DATE.getTime(), transformed.value()); + } + + @Test + public void testSchemalessTimeToTimestamp() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime())); + + assertNull(transformed.valueSchema()); + // No change expected since the source type is coarser-grained + assertEquals(TIME.getTime(), transformed.value()); + } + + @Test + public void testSchemalessUnixToTimestamp() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX)); + + assertNull(transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); + } + + @Test + public void testSchemalessStringToTimestamp() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TYPE_CONFIG, "Timestamp"); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xform.configure(config); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING)); + + assertNull(transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); + } + + + // Conversions with schemas (most flexible Timestamp -> other types) + + @Test + public void testWithSchemaIdentity() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + + assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); + } + + @Test + public void testWithSchemaTimestampToDate() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Date")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + + assertEquals(Date.SCHEMA, transformed.valueSchema()); + assertEquals(DATE.getTime(), transformed.value()); + } + + @Test + public void testWithSchemaTimestampToTime() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Time")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + + assertEquals(Time.SCHEMA, transformed.valueSchema()); + assertEquals(TIME.getTime(), transformed.value()); + } + + @Test + public void testWithSchemaTimestampToUnix() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "unix")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + + assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME_UNIX, transformed.value()); + } + + @Test + public void testWithSchemaTimestampToString() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TYPE_CONFIG, "string"); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xform.configure(config); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + + assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME_STRING, transformed.value()); + } + + + // Conversions with schemas (core types -> most flexible Timestamp format) + + @Test + public void testWithSchemaDateToTimestamp() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime())); + + assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); + // No change expected since the source type is coarser-grained + assertEquals(DATE.getTime(), transformed.value()); + } + + @Test + public void testWithSchemaTimeToTimestamp() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime())); + + assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); + // No change expected since the source type is coarser-grained + assertEquals(TIME.getTime(), transformed.value()); + } + + @Test + public void testWithSchemaUnixToTimestamp() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX)); + + assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); + } + + @Test + public void testWithSchemaStringToTimestamp() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TYPE_CONFIG, "Timestamp"); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xform.configure(config); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING)); + + assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); + } + + + // Convert field instead of entire key/value + + @Test + public void testSchemalessFieldConversion() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TYPE_CONFIG, "Date"); + config.put(TimestampConverter.FIELD_CONFIG, "ts"); + xform.configure(config); + + Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime()); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, value)); + + assertNull(transformed.valueSchema()); + assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value()); + } + + @Test + public void testWithSchemaFieldConversion() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TYPE_CONFIG, "Timestamp"); + config.put(TimestampConverter.FIELD_CONFIG, "ts"); + xform.configure(config); + + // ts field is a unix timestamp + Schema structWithTimestampFieldSchema = SchemaBuilder.struct() + .field("ts", Schema.INT64_SCHEMA) + .field("other", Schema.STRING_SCHEMA) + .build(); + Struct original = new Struct(structWithTimestampFieldSchema); + original.put("ts", DATE_PLUS_TIME_UNIX); + original.put("other", "test"); + + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original)); + + Schema expectedSchema = SchemaBuilder.struct() + .field("ts", Timestamp.SCHEMA) + .field("other", Schema.STRING_SCHEMA) + .build(); + assertEquals(expectedSchema, transformed.valueSchema()); + assertEquals(DATE_PLUS_TIME.getTime(), ((Struct) transformed.value()).get("ts")); + assertEquals("test", ((Struct) transformed.value()).get("other")); + } + + + // Validate Key implementation in addition to Value + + @Test + public void testKey() { + TimestampConverter<SourceRecord> xform = new TimestampConverter.Key<>(); + xform.configure(Collections.singletonMap(TimestampConverter.TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null)); + + assertNull(transformed.keySchema()); + assertEquals(DATE_PLUS_TIME.getTime(), transformed.key()); + } + +}
