Repository: kafka
Updated Branches:
  refs/heads/trunk 896ad63f1 -> 431c3b093


KAFKA-4183; Centralize checking for optional and default values in JsonConverter

Cleaner to just check once for optional & default value from the 
`convertToConnect()` function.

It also helps address an issue with conversions for logical type schemas that 
have default values and null as the included value. That test case is 
_probably_ not an issue in practice, since when using the `JsonConverter` to 
serialize a missing field with a default value, it will serialize the default 
value for the field. But in the face of JSON data streaming in from a topic 
being [generous on input, strict on 
output](http://tedwise.com/2009/05/27/generous-on-input-strict-on-output) seems 
best.

Author: Shikhar Bhushan <shik...@confluent.io>

Reviewers: Randall Hauch <rha...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>

Closes #1872 from shikhar/kafka-4183


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/431c3b09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/431c3b09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/431c3b09

Branch: refs/heads/trunk
Commit: 431c3b0937da103b6e79e9f970c0862898205909
Parents: 896ad63
Author: Shikhar Bhushan <shik...@confluent.io>
Authored: Mon Sep 19 12:49:38 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Sep 19 12:49:38 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/json/JsonConverter.java       | 41 +++-------
 .../kafka/connect/json/JsonConverterTest.java   | 79 +++++++++++++++++++-
 2 files changed, 88 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/431c3b09/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index a8df107..a4ce32a 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -58,61 +58,46 @@ public class JsonConverter implements Converter {
 
     private static final HashMap<Schema.Type, JsonToConnectTypeConverter> 
TO_CONNECT_CONVERTERS = new HashMap<>();
 
-    private static Object checkOptionalAndDefault(Schema schema) {
-        if (schema.defaultValue() != null)
-            return schema.defaultValue();
-        if (schema.isOptional())
-            return null;
-        throw new DataException("Invalid null value for required field");
-    }
-
     static {
         TO_CONNECT_CONVERTERS.put(Schema.Type.BOOLEAN, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.booleanValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.INT8, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return (byte) value.intValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.INT16, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return (short) value.intValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.INT32, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.intValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.INT64, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.longValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.FLOAT32, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.floatValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.FLOAT64, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.doubleValue();
             }
         });
@@ -120,7 +105,6 @@ public class JsonConverter implements Converter {
             @Override
             public Object convert(Schema schema, JsonNode value) {
                 try {
-                    if (value.isNull()) return checkOptionalAndDefault(schema);
                     return value.binaryValue();
                 } catch (IOException e) {
                     throw new DataException("Invalid bytes field", e);
@@ -130,15 +114,12 @@ public class JsonConverter implements Converter {
         TO_CONNECT_CONVERTERS.put(Schema.Type.STRING, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
                 return value.textValue();
             }
         });
         TO_CONNECT_CONVERTERS.put(Schema.Type.ARRAY, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
                 Schema elemSchema = schema == null ? null : 
schema.valueSchema();
                 ArrayList<Object> result = new ArrayList<>();
                 for (JsonNode elem : value) {
@@ -150,8 +131,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_CONVERTERS.put(Schema.Type.MAP, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
                 Schema keySchema = schema == null ? null : schema.keySchema();
                 Schema valueSchema = schema == null ? null : 
schema.valueSchema();
 
@@ -185,8 +164,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_CONVERTERS.put(Schema.Type.STRUCT, new 
JsonToConnectTypeConverter() {
             @Override
             public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
                 if (!value.isObject())
                     throw new DataException("Structs should be encoded as JSON 
objects, but found " + value.getNodeType());
 
@@ -211,7 +188,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
-                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof byte[]))
                     throw new DataException("Invalid type for Decimal, 
underlying representation should be bytes but was " + value.getClass());
                 return Decimal.toLogical(schema, (byte[]) value);
@@ -221,7 +197,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
-                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Date, underlying 
representation should be int32 but was " + value.getClass());
                 return Date.toLogical(schema, (int) value);
@@ -231,7 +206,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
-                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Time, underlying 
representation should be int32 but was " + value.getClass());
                 return Time.toLogical(schema, (int) value);
@@ -241,7 +215,6 @@ public class JsonConverter implements Converter {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new 
LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
-                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Long))
                     throw new DataException("Invalid type for Timestamp, 
underlying representation should be int64 but was " + value.getClass());
                 return Timestamp.toLogical(schema, (long) value);
@@ -688,10 +661,16 @@ public class JsonConverter implements Converter {
 
 
     private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
-        JsonToConnectTypeConverter typeConverter;
         final Schema.Type schemaType;
         if (schema != null) {
             schemaType = schema.type();
+            if (jsonValue.isNull()) {
+                if (schema.defaultValue() != null)
+                    return schema.defaultValue(); // any logical type 
conversions should already have been applied
+                if (schema.isOptional())
+                    return null;
+                throw new DataException("Invalid null value for required " + 
schemaType +  " field");
+            }
         } else {
             switch (jsonValue.getNodeType()) {
                 case NULL:
@@ -724,9 +703,10 @@ public class JsonConverter implements Converter {
                     break;
             }
         }
-        typeConverter = TO_CONNECT_CONVERTERS.get(schemaType);
+
+        final JsonToConnectTypeConverter typeConverter = 
TO_CONNECT_CONVERTERS.get(schemaType);
         if (typeConverter == null)
-            throw new DataException("Unknown schema type: " + schema.type());
+            throw new DataException("Unknown schema type: " + 
String.valueOf(schemaType));
 
         Object converted = typeConverter.convert(schema, jsonValue);
         if (schema != null && schema.name() != null) {
@@ -737,7 +717,6 @@ public class JsonConverter implements Converter {
         return converted;
     }
 
-
     private interface JsonToConnectTypeConverter {
         Object convert(Schema schema, JsonNode value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/431c3b09/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index d7c1ceb..74aad33 100644
--- 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -224,7 +224,6 @@ public class JsonConverterTest {
     @Test
     public void decimalToConnectOptional() {
         Schema schema = Decimal.builder(2).optional().schema();
-        // Payload is base64 encoded byte[]{0, -100}, which is the two's 
complement encoding of 156.
         String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"optional\": true, 
\"parameters\": { \"scale\": \"2\" } }, \"payload\": null }";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
         assertEquals(schema, schemaAndValue.schema());
@@ -232,6 +231,26 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void decimalToConnectWithDefaultValue() {
+        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        Schema schema = Decimal.builder(2).defaultValue(reference).build();
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"default\": 
\"AJw=\", \"parameters\": { \"scale\": \"2\" } }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
+    public void decimalToConnectOptionalWithDefaultValue() {
+        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        Schema schema = 
Decimal.builder(2).optional().defaultValue(reference).build();
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"optional\": true, 
\"default\": \"AJw=\", \"parameters\": { \"scale\": \"2\" } }, \"payload\": 
null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
     public void dateToConnect() {
         Schema schema = Date.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -255,6 +274,26 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void dateToConnectWithDefaultValue() {
+        java.util.Date reference = new java.util.Date(0);
+        Schema schema = Date.builder().defaultValue(reference).schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.connect.data.Date\", \"version\": 1, \"default\": 0 }, 
\"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
+    public void dateToConnectOptionalWithDefaultValue() {
+        java.util.Date reference = new java.util.Date(0);
+        Schema schema = 
Date.builder().optional().defaultValue(reference).schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.connect.data.Date\", \"version\": 1, \"optional\": true, 
\"default\": 0 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
     public void timeToConnect() {
         Schema schema = Time.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -278,6 +317,26 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void timeToConnectWithDefaultValue() {
+        java.util.Date reference = new java.util.Date(0);
+        Schema schema = Time.builder().defaultValue(reference).schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.connect.data.Time\", \"version\": 1, \"default\": 0 }, 
\"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
+    public void timeToConnectOptionalWithDefaultValue() {
+        java.util.Date reference = new java.util.Date(0);
+        Schema schema = 
Time.builder().optional().defaultValue(reference).schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.connect.data.Time\", \"version\": 1, \"optional\": true, 
\"default\": 0 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, schemaAndValue.value());
+    }
+
+    @Test
     public void timestampToConnect() {
         Schema schema = Timestamp.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
@@ -301,6 +360,24 @@ public class JsonConverterTest {
         assertNull(schemaAndValue.value());
     }
 
+    @Test
+    public void timestampToConnectWithDefaultValue() {
+        Schema schema = Timestamp.builder().defaultValue(new 
java.util.Date(42)).schema();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"default\": 42 }, 
\"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(new java.util.Date(42), schemaAndValue.value());
+    }
+
+    @Test
+    public void timestampToConnectOptionalWithDefaultValue() {
+        Schema schema = Timestamp.builder().optional().defaultValue(new 
java.util.Date(42)).schema();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"org.apache.kafka.connect.data.Timestamp\", \"version\": 1,  \"optional\": 
true, \"default\": 42 }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(new java.util.Date(42), schemaAndValue.value());
+    }
+
     // Schema metadata
 
     @Test

Reply via email to