Repository: kafka Updated Branches: refs/heads/trunk f85c18032 -> af85e05b9
KAFKA-5164: Ensure SetSchemaMetadata updates key or value when Schema changes When the `SetSchemaMetadata` SMT is used to change the name and/or version of the key or valueâs schema, any references to the old schema in the key or value must be changed to reference the new schema. Only keys or values that are `Struct` have such references, and so currently only these are adjusted. This is based on `trunk` since the fix is expected to be targeted to the 0.11.1 release. Author: Randall Hauch <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #3198 from rhauch/kafka-5164 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af85e05b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af85e05b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af85e05b Branch: refs/heads/trunk Commit: af85e05b98a41cd5f8ac45a853b2ddd28463c084 Parents: f85c180 Author: Randall Hauch <[email protected]> Authored: Fri Jun 2 10:02:40 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Jun 2 10:02:40 2017 -0700 ---------------------------------------------------------------------- .../connect/transforms/SetSchemaMetadata.java | 33 +++++++- .../transforms/SetSchemaMetadataTest.java | 80 ++++++++++++++++++++ 2 files changed, 111 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/af85e05b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java index bb581de..901ac9f 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java @@ -20,7 +20,9 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.ConnectSchema; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.Map; @@ -101,7 +103,8 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T @Override protected R newRecord(R record, Schema updatedSchema) { - return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, record.key(), record.valueSchema(), record.value(), record.timestamp()); + Object updatedKey = updateSchemaIn(record.key(), updatedSchema); + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedKey, record.valueSchema(), record.value(), record.timestamp()); } } @@ -116,8 +119,34 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T @Override protected R newRecord(R record, Schema updatedSchema) { - return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp()); + Object updatedValue = updateSchemaIn(record.value(), updatedSchema); + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); } } + /** + * Utility to check the supplied key or value for references to the old Schema, + * and if so to return an updated key or value object that references the new Schema. + * Note that this method assumes that the new Schema may have a different name and/or version, + * but has fields that exactly match those of the old Schema. + * <p> + * Currently only {@link Struct} objects have references to the {@link Schema}. + * + * @param keyOrValue the key or value object; may be null + * @param updatedSchema the updated schema that has been potentially renamed + * @return the original key or value object if it does not reference the old schema, or + * a copy of the key or value object with updated references to the new schema. + */ + protected static Object updateSchemaIn(Object keyOrValue, Schema updatedSchema) { + if (keyOrValue instanceof Struct) { + Struct origStruct = (Struct) keyOrValue; + Struct newStruct = new Struct(updatedSchema); + for (Field field : updatedSchema.fields()) { + // assume both schemas have exact same fields with same names and schemas ... + newStruct.put(field, origStruct.get(field)); + } + return newStruct; + } + return keyOrValue; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/af85e05b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java index b2b14db..206c51e 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java @@ -16,7 +16,10 @@ */ package org.apache.kafka.connect.transforms; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Test; @@ -25,6 +28,7 @@ import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; public class SetSchemaMetadataTest { @@ -63,4 +67,80 @@ public class SetSchemaMetadataTest { assertEquals(new Integer(42), updatedRecord.valueSchema().version()); } + @Test + public void schemaNameAndVersionUpdateWithStruct() { + final String fieldName1 = "f1"; + final String fieldName2 = "f2"; + final String fieldValue1 = "value1"; + final int fieldValue2 = 1; + final Schema schema = SchemaBuilder.struct() + .name("my.orig.SchemaDefn") + .field(fieldName1, Schema.STRING_SCHEMA) + .field(fieldName2, Schema.INT32_SCHEMA) + .build(); + final Struct value = new Struct(schema).put(fieldName1, fieldValue1).put(fieldName2, fieldValue2); + + final Map<String, String> props = new HashMap<>(); + props.put("schema.name", "foo"); + props.put("schema.version", "42"); + final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>(); + xform.configure(props); + + final SinkRecord record = new SinkRecord("", 0, null, null, schema, value, 0); + + final SinkRecord updatedRecord = xform.apply(record); + + assertEquals("foo", updatedRecord.valueSchema().name()); + assertEquals(new Integer(42), updatedRecord.valueSchema().version()); + + // Make sure the struct's schema and fields all point to the new schema + assertMatchingSchema((Struct) updatedRecord.value(), updatedRecord.valueSchema()); + } + + @Test + public void updateSchemaOfStruct() { + final String fieldName1 = "f1"; + final String fieldName2 = "f2"; + final String fieldValue1 = "value1"; + final int fieldValue2 = 1; + final Schema schema = SchemaBuilder.struct() + .name("my.orig.SchemaDefn") + .field(fieldName1, Schema.STRING_SCHEMA) + .field(fieldName2, Schema.INT32_SCHEMA) + .build(); + final Struct value = new Struct(schema).put(fieldName1, fieldValue1).put(fieldName2, fieldValue2); + + final Schema newSchema = SchemaBuilder.struct() + .name("my.updated.SchemaDefn") + .field(fieldName1, Schema.STRING_SCHEMA) + .field(fieldName2, Schema.INT32_SCHEMA) + .build(); + + Struct newValue = (Struct) SetSchemaMetadata.updateSchemaIn(value, newSchema); + assertMatchingSchema(newValue, newSchema); + } + + @Test + public void updateSchemaOfNonStruct() { + Object value = new Integer(1); + Object updatedValue = SetSchemaMetadata.updateSchemaIn(value, Schema.INT32_SCHEMA); + assertSame(value, updatedValue); + } + + @Test + public void updateSchemaOfNull() { + Object updatedValue = SetSchemaMetadata.updateSchemaIn(null, Schema.INT32_SCHEMA); + assertEquals(null, updatedValue); + } + + protected void assertMatchingSchema(Struct value, Schema schema) { + assertSame(schema, value.schema()); + assertEquals(schema.name(), value.schema().name()); + for (Field field : schema.fields()) { + String fieldName = field.name(); + assertEquals(schema.field(fieldName).name(), value.schema().field(fieldName).name()); + assertEquals(schema.field(fieldName).index(), value.schema().field(fieldName).index()); + assertSame(schema.field(fieldName).schema(), value.schema().field(fieldName).schema()); + } + } }
