Repository: kafka Updated Branches: refs/heads/0.9.0 767de4b5d -> e32ef73a5
KAFKA-3055; Fix JsonConverter mangling the Schema in Connect Author: ksenji <[email protected]> Reviewers: Dong Lin <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #722 from ksenji/trunk (cherry picked from commit 2679524604b611046e9826b2a1fba461d42f06f4) Signed-off-by: Ewen Cheslack-Postava <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e32ef73a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e32ef73a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e32ef73a Branch: refs/heads/0.9.0 Commit: e32ef73a5aee780aad57e41a881a01fe9e557cc0 Parents: 767de4b Author: Kishore Senji <[email protected]> Authored: Mon Jan 4 11:47:31 2016 -0500 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Jan 4 11:47:54 2016 -0500 ---------------------------------------------------------------------- .../java/org/apache/kafka/connect/json/JsonConverter.java | 2 +- .../org/apache/kafka/connect/json/JsonConverterTest.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e32ef73a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java ---------------------------------------------------------------------- diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index 815d32b..5cd8cee 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -395,7 +395,7 @@ public class JsonConverter implements Converter { jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME); ArrayNode fields = JsonNodeFactory.instance.arrayNode(); for (Field field : schema.fields()) { - ObjectNode fieldJsonSchema = asJsonSchema(field.schema()); + ObjectNode fieldJsonSchema = asJsonSchema(field.schema()).deepCopy(); fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name()); fields.add(fieldJsonSchema); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e32ef73a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java ---------------------------------------------------------------------- diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index e56b009..c923285 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -431,15 +431,17 @@ public class JsonConverterTest { @Test public void structToJson() { - Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build(); - Struct input = new Struct(schema).put("field1", true).put("field2", "string"); + Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).field("field3", Schema.STRING_SCHEMA).field("field4", Schema.BOOLEAN_SCHEMA).build(); + Struct input = new Struct(schema).put("field1", true).put("field2", "string2").put("field3", "string3").put("field4", false); JsonNode converted = parse(converter.fromConnectData(TOPIC, schema, input)); validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"), + assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }, { \"field\": \"field3\", \"type\": \"string\", \"optional\": false }, { \"field\": \"field4\", \"type\": \"boolean\", \"optional\": false }] }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(JsonNodeFactory.instance.objectNode() .put("field1", true) - .put("field2", "string"), + .put("field2", "string2") + .put("field3", "string3") + .put("field4", false), converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); }
