This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new b13c52b KAFKA-6290: Support casting from logical types in cast
transform (#7371)
b13c52b is described below
commit b13c52b90ab9206aea05f0142b36c6b07eff380d
Author: Nigel Liang <[email protected]>
AuthorDate: Thu Oct 3 12:55:52 2019 -0700
KAFKA-6290: Support casting from logical types in cast transform (#7371)
Adds support for the Connect Cast transforms to cast from Connect logical
types, such as DATE, TIME, TIMESTAMP, and DECIMAL. Casting to numeric types
will produce the underlying numeric value represented in the desired type. For
logical types represented by underlying Java Date class, this means the
milliseconds since EPOCH. For Decimal, this means the underlying value. If the
value does not fit in the desired target type, it may overflow.
Casting to String from Date, Time, and Timestamp types will produce their
ISO 8601 representation. Casting to String from Decimal will result in the
value represented as a string. e.g. 1234 -> "1234".
Author: Nigel Liang <[email protected]>
Reviewer: Randall Hauch <[email protected]>
---
.../org/apache/kafka/connect/transforms/Cast.java | 20 ++++
.../apache/kafka/connect/transforms/CastTest.java | 101 ++++++++++++++++++++-
2 files changed, 119 insertions(+), 2 deletions(-)
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 3dc6dc7..475b98e 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -24,10 +24,14 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
+import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
@@ -222,7 +226,18 @@ public abstract class Cast<R extends ConnectRecord<R>>
implements Transformation
default:
throw new DataException("Unexpected type in Cast
transformation: " + type);
}
+ }
+ private static Object encodeLogicalType(Schema schema, Object value) {
+ switch (schema.name()) {
+ case Date.LOGICAL_NAME:
+ return Date.fromLogical(schema, (java.util.Date) value);
+ case Time.LOGICAL_NAME:
+ return Time.fromLogical(schema, (java.util.Date) value);
+ case Timestamp.LOGICAL_NAME:
+ return Timestamp.fromLogical(schema, (java.util.Date) value);
+ }
+ return value;
}
private static Object castValueToType(Schema schema, Object value,
Schema.Type targetType) {
@@ -238,6 +253,11 @@ public abstract class Cast<R extends ConnectRecord<R>>
implements Transformation
// Ensure the type we are trying to cast from is supported
validCastType(inferredType, FieldType.INPUT);
+ // Perform logical type encoding to their internal representation.
+ if (schema != null && schema.name() != null && targetType !=
Type.STRING) {
+ value = encodeLogicalType(schema, value);
+ }
+
switch (targetType) {
case INT8:
return castToInt8(value);
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index c568afb..a28aa28 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -17,11 +17,16 @@
package org.apache.kafka.connect.transforms;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.errors.DataException;
@@ -42,7 +47,8 @@ import static org.junit.Assert.assertTrue;
public class CastTest {
private final Cast<SourceRecord> xformKey = new Cast.Key<>();
private final Cast<SourceRecord> xformValue = new Cast.Value<>();
- private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+ private static final long MILLIS_PER_HOUR = TimeUnit.HOURS.toMillis(1);
+ private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1);
@After
public void teardown() {
@@ -320,6 +326,97 @@ public class CastTest {
xformValue.apply(new SourceRecord(null, null, "topic", 0, null,
Collections.singletonList("foo")));
}
+ @Test
+ public void castLogicalToPrimitive() {
+ List<String> specParts = Arrays.asList(
+ "date_to_int32:int32", // Cast to underlying representation
+ "timestamp_to_int64:int64", // Cast to underlying representation
+ "time_to_int64:int64", // Cast to wider datatype than underlying
representation
+ "decimal_to_int32:int32", // Cast to narrower datatype with data
loss
+ "timestamp_to_float64:float64", // loss of precision casting to
double
+ "null_timestamp_to_int32:int32"
+ );
+
+ Date day = new Date(MILLIS_PER_DAY);
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+ String.join(",", specParts)));
+
+ SchemaBuilder builder = SchemaBuilder.struct();
+ builder.field("date_to_int32",
org.apache.kafka.connect.data.Date.SCHEMA);
+ builder.field("timestamp_to_int64", Timestamp.SCHEMA);
+ builder.field("time_to_int64", Time.SCHEMA);
+ builder.field("decimal_to_int32", Decimal.schema(new BigDecimal((long)
Integer.MAX_VALUE + 1).scale()));
+ builder.field("timestamp_to_float64", Timestamp.SCHEMA);
+ builder.field("null_timestamp_to_int32",
Timestamp.builder().optional().build());
+
+ Schema supportedTypesSchema = builder.build();
+
+ Struct recordValue = new Struct(supportedTypesSchema);
+ recordValue.put("date_to_int32", day);
+ recordValue.put("timestamp_to_int64", new Date(0));
+ recordValue.put("time_to_int64", new Date(1));
+ recordValue.put("decimal_to_int32", new BigDecimal((long)
Integer.MAX_VALUE + 1));
+ recordValue.put("timestamp_to_float64", new Date(Long.MAX_VALUE));
+ recordValue.put("null_timestamp_to_int32", null);
+
+ SourceRecord transformed = xformValue.apply(
+ new SourceRecord(null, null, "topic", 0,
+ supportedTypesSchema, recordValue));
+
+ assertEquals(1, ((Struct) transformed.value()).get("date_to_int32"));
+ assertEquals(0L, ((Struct)
transformed.value()).get("timestamp_to_int64"));
+ assertEquals(1L, ((Struct) transformed.value()).get("time_to_int64"));
+ assertEquals(Integer.MIN_VALUE, ((Struct)
transformed.value()).get("decimal_to_int32"));
+ assertEquals(9.223372036854776E18, ((Struct)
transformed.value()).get("timestamp_to_float64"));
+ assertNull(((Struct)
transformed.value()).get("null_timestamp_to_int32"));
+
+ Schema transformedSchema = ((Struct) transformed.value()).schema();
+ assertEquals(Type.INT32,
transformedSchema.field("date_to_int32").schema().type());
+ assertEquals(Type.INT64,
transformedSchema.field("timestamp_to_int64").schema().type());
+ assertEquals(Type.INT64,
transformedSchema.field("time_to_int64").schema().type());
+ assertEquals(Type.INT32,
transformedSchema.field("decimal_to_int32").schema().type());
+ assertEquals(Type.FLOAT64,
transformedSchema.field("timestamp_to_float64").schema().type());
+ assertEquals(Type.INT32,
transformedSchema.field("null_timestamp_to_int32").schema().type());
+ }
+
+ @Test
+ public void castLogicalToString() {
+ Date date = new Date(MILLIS_PER_DAY);
+ Date time = new Date(MILLIS_PER_HOUR);
+ Date timestamp = new Date();
+
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+ "date:string,decimal:string,time:string,timestamp:string"));
+
+ SchemaBuilder builder = SchemaBuilder.struct();
+ builder.field("date", org.apache.kafka.connect.data.Date.SCHEMA);
+ builder.field("decimal", Decimal.schema(new BigDecimal(1982).scale()));
+ builder.field("time", Time.SCHEMA);
+ builder.field("timestamp", Timestamp.SCHEMA);
+
+ Schema supportedTypesSchema = builder.build();
+
+ Struct recordValue = new Struct(supportedTypesSchema);
+ recordValue.put("date", date);
+ recordValue.put("decimal", new BigDecimal(1982));
+ recordValue.put("time", time);
+ recordValue.put("timestamp", timestamp);
+
+ SourceRecord transformed = xformValue.apply(
+ new SourceRecord(null, null, "topic", 0,
+ supportedTypesSchema, recordValue));
+
+ assertEquals(Values.dateFormatFor(date).format(date), ((Struct)
transformed.value()).get("date"));
+ assertEquals("1982", ((Struct) transformed.value()).get("decimal"));
+ assertEquals(Values.dateFormatFor(time).format(time), ((Struct)
transformed.value()).get("time"));
+ assertEquals(Values.dateFormatFor(timestamp).format(timestamp),
((Struct) transformed.value()).get("timestamp"));
+
+ Schema transformedSchema = ((Struct) transformed.value()).schema();
+ assertEquals(Type.STRING,
transformedSchema.field("date").schema().type());
+ assertEquals(Type.STRING,
transformedSchema.field("decimal").schema().type());
+ assertEquals(Type.STRING,
transformedSchema.field("time").schema().type());
+ assertEquals(Type.STRING,
transformedSchema.field("timestamp").schema().type());
+ }
@Test
public void castFieldsWithSchema() {
@@ -338,7 +435,7 @@ public class CastTest {
builder.field("boolean", Schema.BOOLEAN_SCHEMA);
builder.field("string", Schema.STRING_SCHEMA);
builder.field("bigdecimal", Decimal.schema(new
BigDecimal(42).scale()));
- builder.field("date", Timestamp.SCHEMA);
+ builder.field("date", org.apache.kafka.connect.data.Date.SCHEMA);
builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA);
builder.field("timestamp", Timestamp.SCHEMA);
Schema supportedTypesSchema = builder.build();