This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 4377367 KAFKA-8523 Enabling InsertField transform to be used with
tombstone events (#6914)
4377367 is described below
commit 4377367a61d7258b4ba3ad686ba2c898695caa9a
Author: Gunnar Morling <[email protected]>
AuthorDate: Thu Oct 3 20:51:00 2019 +0200
KAFKA-8523 Enabling InsertField transform to be used with tombstone events
(#6914)
* KAFKA-8523 Avoiding raw type usage
* KAFKA-8523 Gracefully handling tombstone events in InsertField SMT
---
.../kafka/connect/transforms/InsertField.java | 8 +++-
.../kafka/connect/transforms/InsertFieldTest.java | 52 +++++++++++++++++++---
2 files changed, 54 insertions(+), 6 deletions(-)
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
index 5e472a9..93ba79c 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -127,13 +127,19 @@ public abstract class InsertField<R extends
ConnectRecord<R>> implements Transfo
@Override
public R apply(R record) {
- if (operatingSchema(record) == null) {
+ if (isTombstoneRecord(record)) {
+ return record;
+ } else if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}
+ private boolean isTombstoneRecord(R record) {
+ return record.value() == null;
+ }
+
private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record),
PURPOSE);
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index a0a0975..b22872c 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -104,11 +104,53 @@ public class InsertFieldTest {
final SourceRecord transformedRecord = xform.apply(record);
- assertEquals(42L, ((Map) transformedRecord.value()).get("magic"));
- assertEquals("test", ((Map)
transformedRecord.value()).get("topic_field"));
- assertEquals(0, ((Map)
transformedRecord.value()).get("partition_field"));
- assertEquals(null, ((Map)
transformedRecord.value()).get("timestamp_field"));
- assertEquals("my-instance-id", ((Map)
transformedRecord.value()).get("instance_id"));
+ assertEquals(42L, ((Map<?, ?>)
transformedRecord.value()).get("magic"));
+ assertEquals("test", ((Map<?, ?>)
transformedRecord.value()).get("topic_field"));
+ assertEquals(0, ((Map<?, ?>)
transformedRecord.value()).get("partition_field"));
+ assertEquals(null, ((Map<?, ?>)
transformedRecord.value()).get("timestamp_field"));
+ assertEquals("my-instance-id", ((Map<?, ?>)
transformedRecord.value()).get("instance_id"));
}
+
+ @Test
+ public void
insertConfiguredFieldsIntoTombstoneEventWithoutSchemaLeavesValueUnchanged() {
+ final Map<String, Object> props = new HashMap<>();
+ props.put("topic.field", "topic_field!");
+ props.put("partition.field", "partition_field");
+ props.put("timestamp.field", "timestamp_field?");
+ props.put("static.field", "instance_id");
+ props.put("static.value", "my-instance-id");
+
+ xform.configure(props);
+
+ final SourceRecord record = new SourceRecord(null, null, "test", 0,
+ null, null);
+
+ final SourceRecord transformedRecord = xform.apply(record);
+
+ assertEquals(null, transformedRecord.value());
+ assertEquals(null, transformedRecord.valueSchema());
+ }
+
+ @Test
+ public void
insertConfiguredFieldsIntoTombstoneEventWithSchemaLeavesValueUnchanged() {
+ final Map<String, Object> props = new HashMap<>();
+ props.put("topic.field", "topic_field!");
+ props.put("partition.field", "partition_field");
+ props.put("timestamp.field", "timestamp_field?");
+ props.put("static.field", "instance_id");
+ props.put("static.value", "my-instance-id");
+
+ xform.configure(props);
+
+ final Schema simpleStructSchema =
SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic",
Schema.OPTIONAL_INT64_SCHEMA).build();
+
+ final SourceRecord record = new SourceRecord(null, null, "test", 0,
+ simpleStructSchema, null);
+
+ final SourceRecord transformedRecord = xform.apply(record);
+
+ assertEquals(null, transformedRecord.value());
+ assertEquals(simpleStructSchema, transformedRecord.valueSchema());
+ }
}