This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new f534da0 KAFKA-9706: Handle null in keys or values when Flatten
transformation is used (#8279)
f534da0 is described below
commit f534da078eaf3a3b0b5471522b22b99acc9e4837
Author: Greg Harris <[email protected]>
AuthorDate: Mon Mar 30 15:09:27 2020 -0700
KAFKA-9706: Handle null in keys or values when Flatten transformation is
used (#8279)
* Fixed DataException thrown when handling tombstone events with null value
* Passes through original record when finding a null key when it's
configured for keys or a null value when it's configured for values.
* Added unit tests for schema and schemaless data
---
.../apache/kafka/connect/transforms/Flatten.java | 4 +++-
.../kafka/connect/transforms/FlattenTest.java | 25 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index 1a4be33..cad8d79 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -69,7 +69,9 @@ public abstract class Flatten<R extends ConnectRecord<R>>
implements Transformat
@Override
public R apply(R record) {
- if (operatingSchema(record) == null) {
+ if (operatingValue(record) == null) {
+ return record;
+ } else if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 2e7be95..d044338 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -297,4 +297,29 @@ public class FlattenTest {
Schema transformedOptFieldSchema =
SchemaBuilder.string().optional().defaultValue("child_default").build();
assertEquals(transformedOptFieldSchema,
transformedSchema.field("opt_field").schema());
}
+
+ @Test
+ public void tombstoneEventWithoutSchemaShouldPassThrough() {
+ xformValue.configure(Collections.<String, String>emptyMap());
+
+ final SourceRecord record = new SourceRecord(null, null, "test", 0,
+ null, null);
+ final SourceRecord transformedRecord = xformValue.apply(record);
+
+ assertEquals(null, transformedRecord.value());
+ assertEquals(null, transformedRecord.valueSchema());
+ }
+
+ @Test
+ public void tombstoneEventWithSchemaShouldPassThrough() {
+ xformValue.configure(Collections.<String, String>emptyMap());
+
+ final Schema simpleStructSchema =
SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic",
Schema.OPTIONAL_INT64_SCHEMA).build();
+ final SourceRecord record = new SourceRecord(null, null, "test", 0,
+ simpleStructSchema, null);
+ final SourceRecord transformedRecord = xformValue.apply(record);
+
+ assertEquals(null, transformedRecord.value());
+ assertEquals(simpleStructSchema, transformedRecord.valueSchema());
+ }
}