This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 2455e41  KAFKA-5996; JsonConverter generates Mismatching schema 
DataException (#4523)
2455e41 is described below

commit 2455e41cf8394c82e54278c710ce9b4e44e9c6e7
Author: ConcurrencyPractitioner <yohan.richard...@gmail.com>
AuthorDate: Wed Feb 14 08:46:22 2018 -0800

    KAFKA-5996; JsonConverter generates Mismatching schema DataException (#4523)
    
    JsonConverter should use object equality rather than reference equality in 
`convertToJson`.
    
    Reviewers: Bartlomiej Tartanus <bartektarta...@gmail.com>, Randall Hauch 
<rha...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../org/apache/kafka/connect/json/JsonConverter.java |  2 +-
 .../apache/kafka/connect/json/JsonConverterTest.java | 20 +++++++++++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 32ded44..c1322b1 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -673,7 +673,7 @@ public class JsonConverter implements Converter, 
HeaderConverter {
                 }
                 case STRUCT: {
                     Struct struct = (Struct) value;
-                    if (struct.schema() != schema)
+                    if (!struct.schema().equals(schema))
                         throw new DataException("Mismatching schema.");
                     ObjectNode obj = JsonNodeFactory.instance.objectNode();
                     for (Field field : schema.fields()) {
diff --git 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 0a71044..7686fdb 100644
--- 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -563,6 +563,20 @@ public class JsonConverterTest {
                 converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
     }
 
+    @Test
+    public void structSchemaIdentical() {
+        Schema schema = SchemaBuilder.struct().field("field1", 
Schema.BOOLEAN_SCHEMA)
+                                              .field("field2", 
Schema.STRING_SCHEMA)
+                                              .field("field3", 
Schema.STRING_SCHEMA)
+                                              .field("field4", 
Schema.BOOLEAN_SCHEMA).build();
+        Schema inputSchema = SchemaBuilder.struct().field("field1", 
Schema.BOOLEAN_SCHEMA)
+                                                   .field("field2", 
Schema.STRING_SCHEMA)
+                                                   .field("field3", 
Schema.STRING_SCHEMA)
+                                                   .field("field4", 
Schema.BOOLEAN_SCHEMA).build();
+        Struct input = new Struct(inputSchema).put("field1", 
true).put("field2", "string2").put("field3", "string3").put("field4", false);
+        assertStructSchemaEqual(schema, input);
+    }
+
 
     @Test
     public void decimalToJson() throws IOException {
@@ -735,7 +749,6 @@ public class JsonConverterTest {
 
         JsonConverter rc = new JsonConverter();
         rc.configure(workerProps, false);
-
     }
 
 
@@ -791,4 +804,9 @@ public class JsonConverterTest {
         assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
         assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
     }
+    
+    private void assertStructSchemaEqual(Schema schema, Struct struct) {
+        converter.fromConnectData(TOPIC, schema, struct);
+        assertEquals(schema, struct.schema());
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to