This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new b13c52b  KAFKA-6290: Support casting from logical types in cast 
transform (#7371)
b13c52b is described below

commit b13c52b90ab9206aea05f0142b36c6b07eff380d
Author: Nigel Liang <[email protected]>
AuthorDate: Thu Oct 3 12:55:52 2019 -0700

    KAFKA-6290: Support casting from logical types in cast transform (#7371)
    
    Adds support for the Connect Cast transforms to cast from Connect logical 
types, such as DATE, TIME, TIMESTAMP, and DECIMAL. Casting to numeric types 
will produce the underlying numeric value represented in the desired type. For 
logical types represented by underlying Java Date class, this means the 
milliseconds since EPOCH. For Decimal, this means the underlying value. If the 
value does not fit in the desired target type, it may overflow.
    
    Casting to String from Date, Time, and Timestamp types will produce their 
ISO 8601 representation. Casting to String from Decimal will result in the 
value represented as a string. e.g. 1234 -> "1234".
    
    Author: Nigel Liang <[email protected]>
    Reviewer: Randall Hauch <[email protected]>
---
 .../org/apache/kafka/connect/transforms/Cast.java  |  20 ++++
 .../apache/kafka/connect/transforms/CastTest.java  | 101 ++++++++++++++++++++-
 2 files changed, 119 insertions(+), 2 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 3dc6dc7..475b98e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -24,10 +24,14 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.ConnectSchema;
+import org.apache.kafka.connect.data.Date;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.transforms.util.SchemaUtil;
@@ -222,7 +226,18 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
             default:
                 throw new DataException("Unexpected type in Cast 
transformation: " + type);
         }
+    }
 
+    private static Object encodeLogicalType(Schema schema, Object value) {
+        switch (schema.name()) {
+            case Date.LOGICAL_NAME:
+                return Date.fromLogical(schema, (java.util.Date) value);
+            case Time.LOGICAL_NAME:
+                return Time.fromLogical(schema, (java.util.Date) value);
+            case Timestamp.LOGICAL_NAME:
+                return Timestamp.fromLogical(schema, (java.util.Date) value);
+        }
+        return value;
     }
 
     private static Object castValueToType(Schema schema, Object value, 
Schema.Type targetType) {
@@ -238,6 +253,11 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
             // Ensure the type we are trying to cast from is supported
             validCastType(inferredType, FieldType.INPUT);
 
+            // Perform logical type encoding to their internal representation.
+            if (schema != null && schema.name() != null && targetType != 
Type.STRING) {
+                value = encodeLogicalType(schema, value);
+            }
+
             switch (targetType) {
                 case INT8:
                     return castToInt8(value);
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index c568afb..a28aa28 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -17,11 +17,16 @@
 
 package org.apache.kafka.connect.transforms;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
 import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
@@ -42,7 +47,8 @@ import static org.junit.Assert.assertTrue;
 public class CastTest {
     private final Cast<SourceRecord> xformKey = new Cast.Key<>();
     private final Cast<SourceRecord> xformValue = new Cast.Value<>();
-    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+    private static final long MILLIS_PER_HOUR = TimeUnit.HOURS.toMillis(1);
+    private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1);
 
     @After
     public void teardown() {
@@ -320,6 +326,97 @@ public class CastTest {
         xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 
Collections.singletonList("foo")));
     }
 
+    @Test
+    public void castLogicalToPrimitive() {
+        List<String> specParts = Arrays.asList(
+            "date_to_int32:int32",  // Cast to underlying representation
+            "timestamp_to_int64:int64",  // Cast to underlying representation
+            "time_to_int64:int64",  // Cast to wider datatype than underlying 
representation
+            "decimal_to_int32:int32",  // Cast to narrower datatype with data 
loss
+            "timestamp_to_float64:float64",  // loss of precision casting to 
double
+            "null_timestamp_to_int32:int32"
+        );
+
+        Date day = new Date(MILLIS_PER_DAY);
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+            String.join(",", specParts)));
+
+        SchemaBuilder builder = SchemaBuilder.struct();
+        builder.field("date_to_int32", 
org.apache.kafka.connect.data.Date.SCHEMA);
+        builder.field("timestamp_to_int64", Timestamp.SCHEMA);
+        builder.field("time_to_int64", Time.SCHEMA);
+        builder.field("decimal_to_int32", Decimal.schema(new BigDecimal((long) 
Integer.MAX_VALUE + 1).scale()));
+        builder.field("timestamp_to_float64", Timestamp.SCHEMA);
+        builder.field("null_timestamp_to_int32", 
Timestamp.builder().optional().build());
+
+        Schema supportedTypesSchema = builder.build();
+
+        Struct recordValue = new Struct(supportedTypesSchema);
+        recordValue.put("date_to_int32", day);
+        recordValue.put("timestamp_to_int64", new Date(0));
+        recordValue.put("time_to_int64", new Date(1));
+        recordValue.put("decimal_to_int32", new BigDecimal((long) 
Integer.MAX_VALUE + 1));
+        recordValue.put("timestamp_to_float64", new Date(Long.MAX_VALUE));
+        recordValue.put("null_timestamp_to_int32", null);
+
+        SourceRecord transformed = xformValue.apply(
+            new SourceRecord(null, null, "topic", 0,
+                supportedTypesSchema, recordValue));
+
+        assertEquals(1, ((Struct) transformed.value()).get("date_to_int32"));
+        assertEquals(0L, ((Struct) 
transformed.value()).get("timestamp_to_int64"));
+        assertEquals(1L, ((Struct) transformed.value()).get("time_to_int64"));
+        assertEquals(Integer.MIN_VALUE, ((Struct) 
transformed.value()).get("decimal_to_int32"));
+        assertEquals(9.223372036854776E18, ((Struct) 
transformed.value()).get("timestamp_to_float64"));
+        assertNull(((Struct) 
transformed.value()).get("null_timestamp_to_int32"));
+
+        Schema transformedSchema = ((Struct) transformed.value()).schema();
+        assertEquals(Type.INT32, 
transformedSchema.field("date_to_int32").schema().type());
+        assertEquals(Type.INT64, 
transformedSchema.field("timestamp_to_int64").schema().type());
+        assertEquals(Type.INT64, 
transformedSchema.field("time_to_int64").schema().type());
+        assertEquals(Type.INT32, 
transformedSchema.field("decimal_to_int32").schema().type());
+        assertEquals(Type.FLOAT64, 
transformedSchema.field("timestamp_to_float64").schema().type());
+        assertEquals(Type.INT32, 
transformedSchema.field("null_timestamp_to_int32").schema().type());
+    }
+
+    @Test
+    public void castLogicalToString() {
+        Date date = new Date(MILLIS_PER_DAY);
+        Date time = new Date(MILLIS_PER_HOUR);
+        Date timestamp = new Date();
+
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+            "date:string,decimal:string,time:string,timestamp:string"));
+
+        SchemaBuilder builder = SchemaBuilder.struct();
+        builder.field("date", org.apache.kafka.connect.data.Date.SCHEMA);
+        builder.field("decimal", Decimal.schema(new BigDecimal(1982).scale()));
+        builder.field("time", Time.SCHEMA);
+        builder.field("timestamp", Timestamp.SCHEMA);
+
+        Schema supportedTypesSchema = builder.build();
+
+        Struct recordValue = new Struct(supportedTypesSchema);
+        recordValue.put("date", date);
+        recordValue.put("decimal", new BigDecimal(1982));
+        recordValue.put("time", time);
+        recordValue.put("timestamp", timestamp);
+
+        SourceRecord transformed = xformValue.apply(
+            new SourceRecord(null, null, "topic", 0,
+                supportedTypesSchema, recordValue));
+
+        assertEquals(Values.dateFormatFor(date).format(date), ((Struct) 
transformed.value()).get("date"));
+        assertEquals("1982", ((Struct) transformed.value()).get("decimal"));
+        assertEquals(Values.dateFormatFor(time).format(time), ((Struct) 
transformed.value()).get("time"));
+        assertEquals(Values.dateFormatFor(timestamp).format(timestamp), 
((Struct) transformed.value()).get("timestamp"));
+
+        Schema transformedSchema = ((Struct) transformed.value()).schema();
+        assertEquals(Type.STRING, 
transformedSchema.field("date").schema().type());
+        assertEquals(Type.STRING, 
transformedSchema.field("decimal").schema().type());
+        assertEquals(Type.STRING, 
transformedSchema.field("time").schema().type());
+        assertEquals(Type.STRING, 
transformedSchema.field("timestamp").schema().type());
+    }
 
     @Test
     public void castFieldsWithSchema() {
@@ -338,7 +435,7 @@ public class CastTest {
         builder.field("boolean", Schema.BOOLEAN_SCHEMA);
         builder.field("string", Schema.STRING_SCHEMA);
         builder.field("bigdecimal", Decimal.schema(new 
BigDecimal(42).scale()));
-        builder.field("date", Timestamp.SCHEMA);
+        builder.field("date", org.apache.kafka.connect.data.Date.SCHEMA);
         builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA);
         builder.field("timestamp", Timestamp.SCHEMA);
         Schema supportedTypesSchema = builder.build();

Reply via email to