Repository: kafka
Updated Branches:
  refs/heads/0.10.0 ee6e50024 -> 64acd9085


KAFKA-4183; Corrected Kafka Connect's JSON Converter to properly convert from 
null to logical values

The `JsonConverter` class has `LogicalTypeConverter` implementations for Date, 
Time, Timestamp, and Decimal, but these implementations fail when the input 
literal value (deserialized from the message) is null.

Test cases were added to check for these cases, and these failed before the 
`LogicalTypeConverter` implementations were fixed to consider whether the 
schema has a default value or is optional, similarly to how the 
`JsonToConnectTypeConverter` implementations do this. Once the fixes were made, 
the new tests pass.

Author: Randall Hauch <rha...@gmail.com>

Reviewers: Shikhar Bhushan <shik...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>

Closes #1867 from rhauch/kafka-4183


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1e5bf02a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1e5bf02a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1e5bf02a

Branch: refs/heads/0.10.0
Commit: 1e5bf02ab338ba05342709666e47d76bdb5670b0
Parents: ee6e500
Author: Randall Hauch <rha...@gmail.com>
Authored: Fri Sep 16 14:55:46 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Sep 19 13:33:37 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/json/JsonConverter.java       |  4 +++
 .../kafka/connect/json/JsonConverterTest.java   | 38 ++++++++++++++++++++
 2 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1e5bf02a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index d9a6859..b35b24a 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -211,6 +211,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof byte[]))
                     throw new DataException("Invalid type for Decimal, 
underlying representation should be bytes but was " + value.getClass());
                 return Decimal.toLogical(schema, (byte[]) value);
@@ -220,6 +221,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Date, underlying 
representation should be int32 but was " + value.getClass());
                 return Date.toLogical(schema, (int) value);
@@ -229,6 +231,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Time, underlying 
representation should be int32 but was " + value.getClass());
                 return Time.toLogical(schema, (int) value);
@@ -238,6 +241,7 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Long))
                     throw new DataException("Invalid type for Timestamp, 
underlying representation should be int64 but was " + value.getClass());
                 return Timestamp.toLogical(schema, (long) value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1e5bf02a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index c923285..7700f18 100644
--- 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -54,6 +54,7 @@ import java.util.TimeZone;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -217,6 +218,16 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void decimalToConnectOptional() {
+        Schema schema = Decimal.builder(2).optional().schema();
+        // Payload is base64 encoded byte[]{0, -100}, which is the two's 
complement encoding of 156.
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"optional\": true, 
\"parameters\": { \"scale\": \"2\" } }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void dateToConnect() {
         Schema schema = Date.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -231,6 +242,15 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void dateToConnectOptional() {
+        Schema schema = Date.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.connect.data.Date\", \"version\": 1, \"optional\": true }, 
\"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void timeToConnect() {
         Schema schema = Time.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -245,6 +265,15 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void timeToConnectOptional() {
+        Schema schema = Time.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.connect.data.Time\", \"version\": 1, \"optional\": true }, 
\"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
+    @Test
     public void timestampToConnect() {
         Schema schema = Timestamp.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -259,6 +288,15 @@ public class JsonConverterTest {
         assertEquals(reference, converted);
     }
 
+    @Test
+    public void timestampToConnectOptional() {
+        Schema schema = Timestamp.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"optional\": true 
}, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
     // Schema metadata
 
     @Test

Reply via email to