Repository: kafka
Updated Branches:
  refs/heads/trunk 86aa0eb0f -> 567cc3d78


KAFKA-4183; Corrected Kafka Connect's JSON Converter to properly convert from 
null to logical values

The `JsonConverter` class has `LogicalTypeConverter` implementations for Date, 
Time, Timestamp, and Decimal, but these implementations fail when the input 
literal value (deserialized from the message) is null.

Test cases were added to check for these cases, and these failed before the 
`LogicalTypeConverter` implementations were fixed to consider whether the 
schema has a default value or is optional, similarly to how the 
`JsonToConnectTypeConverter` implementations do this. Once the fixes were made, 
the new tests pass.

Author: Randall Hauch <rha...@gmail.com>

Reviewers: Shikhar Bhushan <shik...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>

Closes #1867 from rhauch/kafka-4183


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/567cc3d7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/567cc3d7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/567cc3d7

Branch: refs/heads/trunk
Commit: 567cc3d78746391a3b050d4eea76571e4bf20e89
Parents: 86aa0eb
Author: Randall Hauch <rha...@gmail.com>
Authored: Fri Sep 16 14:55:46 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Sep 16 14:55:46 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/json/JsonConverter.java       |  4 +++
 .../kafka/connect/json/JsonConverterTest.java   | 38 ++++++++++++++++++++
 2 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/567cc3d7/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index f6c778b..a8df107 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -211,6 +211,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof byte[]))
                     throw new DataException("Invalid type for Decimal, 
underlying representation should be bytes but was " + value.getClass());
                 return Decimal.toLogical(schema, (byte[]) value);
@@ -220,6 +221,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Date, underlying 
representation should be int32 but was " + value.getClass());
                 return Date.toLogical(schema, (int) value);
@@ -229,6 +231,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Time, underlying 
representation should be int32 but was " + value.getClass());
                 return Time.toLogical(schema, (int) value);
@@ -238,6 +241,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Long))
                     throw new DataException("Invalid type for Timestamp, 
underlying representation should be int64 but was " + value.getClass());
                 return Timestamp.toLogical(schema, (long) value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/567cc3d7/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 0e7c153..d7c1ceb 100644
--- 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -58,6 +58,7 @@ import java.util.TimeZone;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -221,6 +222,16 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void decimalToConnectOptional() {
+        Schema schema = Decimal.builder(2).optional().schema();
+        // Payload is base64 encoded byte[]{0, -100}, which is the two's 
complement encoding of 156.
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"optional\": true, 
\"parameters\": { \"scale\": \"2\" } }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void dateToConnect() {
         Schema schema = Date.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -235,6 +246,15 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void dateToConnectOptional() {
+        Schema schema = Date.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.connect.data.Date\", \"version\": 1, \"optional\": true }, 
\"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void timeToConnect() {
         Schema schema = Time.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -249,6 +269,15 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void timeToConnectOptional() {
+        Schema schema = Time.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.connect.data.Time\", \"version\": 1, \"optional\": true }, 
\"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void timestampToConnect() {
         Schema schema = Timestamp.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -263,6 +292,15 @@ public class JsonConverterTest {
         assertEquals(reference, converted);
     }
 
+    @Test
+    public void timestampToConnectOptional() {
+        Schema schema = Timestamp.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"optional\": true 
}, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
     // Schema metadata
 
     @Test

Reply via email to