Randall Hauch created KAFKA-6605:
------------------------------------
Summary: Flatten SMT does not properly handle fields that are null
Key: KAFKA-6605
URL: https://issues.apache.org/jira/browse/KAFKA-6605
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Randall Hauch
When a message has a null field, the `Flatten` SMT does not properly handle
this and throws an NPE. Consider this message from Debezium:
{code}
{
"before": null,
"after": {
"dbserver1.mydb.team.Value": {
"id": 1,
"name": "kafka",
"email": "[email protected]",
"last_modified": 1519939449000
}
},
"source": {
"version": {
"string": "0.7.3"
},
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"snapshot": {
"boolean": true
},
"thread": null,
"db": {
"string": "mydb"
},
"table": {
"string": "team"
}
},
"op": "c",
"ts_ms": {
"long": 1519939520285
}
}
{code}
Note how `before` is null; this event represents a row was INSERTED and thus
there is no `before` state of the row. This results in an NPE:
{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.<init>(Schema.java:403)
at
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
at
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
at
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
at
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
at
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
at
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
at
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
at
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
at
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}
Here's the connector configuration that was used:
{code}
{
"name": "debezium-connector-flatten",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "223345",
"database.server.name": "dbserver-flatten",
"database.whitelist": "mydb",
"database.history.kafka.bootstrap.servers":
"kafka-1:9092,kafka-2:9092,kafka-3:9092",
"database.history.kafka.topic": "schema-flatten.mydb",
"include.schema.changes": "true",
"transforms": "flatten",
"transforms.flatten.type":
"org.apache.kafka.connect.transforms.Flatten$Value"
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)