[ https://issues.apache.org/jira/browse/KAFKA-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167493#comment-16167493 ]
Artem Plotnikov commented on KAFKA-5891: ---------------------------------------- Seems like Kafka Connect's Cast transformation loses schema information (basically, schema name) while doing type casting. I was able to reproduce this problem with the following test in org.apache.kafka.connect.transforms.CastTest for current trunk repository branch: {code} @SuppressWarnings("unchecked") @Test public void castWholeRecordValueWithSchemaBooleanAndTimestampField() { final Cast<SourceRecord> xform = new Cast.Value<>(); xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64:boolean")); SchemaBuilder builder = SchemaBuilder.struct(); builder.field("int64", Schema.INT64_SCHEMA); builder.field("timestamp", Timestamp.SCHEMA); Schema supportedTypesSchema = builder.build(); Struct recordValue = new Struct(supportedTypesSchema); recordValue.put("int64", (long) 64); recordValue.put("timestamp", new java.sql.Timestamp(0L)); SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, supportedTypesSchema, recordValue)); assertEquals(true, ((Struct) transformed.value()).get("int64")); assertEquals(new java.sql.Timestamp(0L), ((Struct) transformed.value()).get("timestamp")); } {code} The problem is that Timestamp.SCHEMA has schema.type = 'INT64' and schema.name = "org.apache.kafka.connect.data.Timestamp", but org.apache.kafka.connect.transforms.Cast#getOrBuildSchema method copies schema.type only: {code} SchemaBuilder fieldBuilder = convertFieldType(casts.containsKey(field.name()) ? casts.get(field.name()) : field.schema().type()); {code} > Cast transformation fails if record schema contains timestamp field > ------------------------------------------------------------------- > > Key: KAFKA-5891 > URL: https://issues.apache.org/jira/browse/KAFKA-5891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.11.0.0 > Reporter: Artem Plotnikov > > I have the following simple type cast transformation: > {code} > name=postgresql-source-simple > connector.class=io.confluent.connect.jdbc.JdbcSourceConnector > tasks.max=1 > connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres&password=mysecretpassword > query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b > transforms=Cast > transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value > transforms.Cast.spec=a:boolean > mode=bulk > topic.prefix=clients > {code} > Which fails with the following exception in runtime: > {code} > [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:148) > org.apache.kafka.connect.errors.DataException: Invalid Java object for schema > type INT64: class java.sql.Timestamp for field: "null" > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) > at org.apache.kafka.connect.data.Struct.put(Struct.java:214) > at > org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) > at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > If I remove the transforms.* part of the connector it will work correctly. > Actually, it doesn't really matter which types I use in the transformation > for field 'a', just the existence of a timestamp field brings the exception. -- This message was sent by Atlassian JIRA (v6.4.14#64029)