This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 2a80b72 KAFKA-9667: Connect JSON serde strip trailing zeros (#8230)
2a80b72 is described below
commit 2a80b7299366640b1ae266cc0d1b94d4a4726f28
Author: Andy Coates <[email protected]>
AuthorDate: Thu May 7 22:21:08 2020 +0100
KAFKA-9667: Connect JSON serde strip trailing zeros (#8230)
This change turns on exact decimal processing in JSON Converter for
deserializing decimals, meaning trailing zeros are maintained. Serialization
was already using the decimal scale to output the right value, so this change
means a value of `1.2300` can now be serialized to JSON and deserialized back
to Connect without any loss of information.
Author: Andy Coates <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Almog Gavra
<[email protected]>
---
.../apache/kafka/connect/json/JsonConverter.java | 81 ++++++++++++----------
.../kafka/connect/json/JsonDeserializer.java | 12 +++-
.../apache/kafka/connect/json/JsonSerializer.java | 20 ++++++
.../kafka/connect/json/JsonConverterTest.java | 31 ++++++++-
4 files changed, 103 insertions(+), 41 deletions(-)
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 3ef6aae..7af202b 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
@@ -54,6 +52,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+
/**
* Implementation of Converter that uses JSON to store schemas and objects. By
default this converter will serialize Connect keys, values,
* and headers with schemas, although this can be disabled with {@link
JsonConverterConfig#SCHEMAS_ENABLE_CONFIG schemas.enable}
@@ -191,6 +191,9 @@ public class JsonConverter implements Converter,
HeaderConverter {
// Convert values in Kafka Connect form into/from their logical types.
These logical converters are discovered by logical type
// names specified in the field
private static final HashMap<String, LogicalTypeConverter>
LOGICAL_CONVERTERS = new HashMap<>();
+
+ private static final JsonNodeFactory JSON_NODE_FACTORY =
JsonNodeFactory.withExactBigDecimals(true);
+
static {
LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new
LogicalTypeConverter() {
@Override
@@ -201,9 +204,9 @@ public class JsonConverter implements Converter,
HeaderConverter {
final BigDecimal decimal = (BigDecimal) value;
switch (config.decimalFormat()) {
case NUMERIC:
- return JsonNodeFactory.instance.numberNode(decimal);
+ return JSON_NODE_FACTORY.numberNode(decimal);
case BASE64:
- return
JsonNodeFactory.instance.binaryNode(Decimal.fromLogical(schema, decimal));
+ return
JSON_NODE_FACTORY.binaryNode(Decimal.fromLogical(schema, decimal));
default:
throw new DataException("Unexpected " +
JsonConverterConfig.DECIMAL_FORMAT_CONFIG + ": " + config.decimalFormat());
}
@@ -229,7 +232,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
public JsonNode toJson(final Schema schema, final Object value,
final JsonConverterConfig config) {
if (!(value instanceof java.util.Date))
throw new DataException("Invalid type for Date, expected
Date but was " + value.getClass());
- return
JsonNodeFactory.instance.numberNode(Date.fromLogical(schema, (java.util.Date)
value));
+ return JSON_NODE_FACTORY.numberNode(Date.fromLogical(schema,
(java.util.Date) value));
}
@Override
@@ -245,7 +248,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
public JsonNode toJson(final Schema schema, final Object value,
final JsonConverterConfig config) {
if (!(value instanceof java.util.Date))
throw new DataException("Invalid type for Time, expected
Date but was " + value.getClass());
- return
JsonNodeFactory.instance.numberNode(Time.fromLogical(schema, (java.util.Date)
value));
+ return JSON_NODE_FACTORY.numberNode(Time.fromLogical(schema,
(java.util.Date) value));
}
@Override
@@ -261,7 +264,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
public JsonNode toJson(final Schema schema, final Object value,
final JsonConverterConfig config) {
if (!(value instanceof java.util.Date))
throw new DataException("Invalid type for Timestamp,
expected Date but was " + value.getClass());
- return
JsonNodeFactory.instance.numberNode(Timestamp.fromLogical(schema,
(java.util.Date) value));
+ return
JSON_NODE_FACTORY.numberNode(Timestamp.fromLogical(schema, (java.util.Date)
value));
}
@Override
@@ -277,15 +280,23 @@ public class JsonConverter implements Converter,
HeaderConverter {
private Cache<Schema, ObjectNode> fromConnectSchemaCache;
private Cache<JsonNode, Schema> toConnectSchemaCache;
- private final JsonSerializer serializer = new JsonSerializer();
+ private final JsonSerializer serializer;
private final JsonDeserializer deserializer;
public JsonConverter() {
- // this ensures that the JsonDeserializer maintains full precision on
- // floating point numbers that cannot fit into float64
- final Set<DeserializationFeature> deserializationFeatures = new
HashSet<>();
-
deserializationFeatures.add(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
- deserializer = new JsonDeserializer(deserializationFeatures);
+ serializer = new JsonSerializer(
+ mkSet(),
+ JSON_NODE_FACTORY
+ );
+
+ deserializer = new JsonDeserializer(
+ mkSet(
+ // this ensures that the JsonDeserializer maintains full
precision on
+ // floating point numbers that cannot fit into float64
+ DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS
+ ),
+ JSON_NODE_FACTORY
+ );
}
@Override
@@ -362,7 +373,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
// The deserialized data should either be an envelope object
containing the schema and the payload or the schema
// was stripped during serialization and we need to fill in an
all-encompassing schema.
if (!config.schemasEnabled()) {
- ObjectNode envelope = JsonNodeFactory.instance.objectNode();
+ ObjectNode envelope = JSON_NODE_FACTORY.objectNode();
envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);
jsonValue = envelope;
@@ -413,17 +424,17 @@ public class JsonConverter implements Converter,
HeaderConverter {
jsonSchema = JsonSchema.STRING_SCHEMA.deepCopy();
break;
case ARRAY:
- jsonSchema =
JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME,
JsonSchema.ARRAY_TYPE_NAME);
+ jsonSchema =
JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME,
JsonSchema.ARRAY_TYPE_NAME);
jsonSchema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME,
asJsonSchema(schema.valueSchema()));
break;
case MAP:
- jsonSchema =
JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME,
JsonSchema.MAP_TYPE_NAME);
+ jsonSchema =
JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME,
JsonSchema.MAP_TYPE_NAME);
jsonSchema.set(JsonSchema.MAP_KEY_FIELD_NAME,
asJsonSchema(schema.keySchema()));
jsonSchema.set(JsonSchema.MAP_VALUE_FIELD_NAME,
asJsonSchema(schema.valueSchema()));
break;
case STRUCT:
- jsonSchema =
JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME,
JsonSchema.STRUCT_TYPE_NAME);
- ArrayNode fields = JsonNodeFactory.instance.arrayNode();
+ jsonSchema =
JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME,
JsonSchema.STRUCT_TYPE_NAME);
+ ArrayNode fields = JSON_NODE_FACTORY.arrayNode();
for (Field field : schema.fields()) {
ObjectNode fieldJsonSchema =
asJsonSchema(field.schema()).deepCopy();
fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name());
@@ -443,7 +454,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
if (schema.doc() != null)
jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc());
if (schema.parameters() != null) {
- ObjectNode jsonSchemaParams =
JsonNodeFactory.instance.objectNode();
+ ObjectNode jsonSchemaParams = JSON_NODE_FACTORY.objectNode();
for (Map.Entry<String, String> prop :
schema.parameters().entrySet())
jsonSchemaParams.put(prop.getKey(), prop.getValue());
jsonSchema.set(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME,
jsonSchemaParams);
@@ -596,7 +607,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue());
if (schema.isOptional())
- return JsonNodeFactory.instance.nullNode();
+ return JSON_NODE_FACTORY.nullNode();
throw new DataException("Conversion error: null value for field
that is required and has no default value");
}
@@ -617,32 +628,32 @@ public class JsonConverter implements Converter,
HeaderConverter {
}
switch (schemaType) {
case INT8:
- return JsonNodeFactory.instance.numberNode((Byte) value);
+ return JSON_NODE_FACTORY.numberNode((Byte) value);
case INT16:
- return JsonNodeFactory.instance.numberNode((Short) value);
+ return JSON_NODE_FACTORY.numberNode((Short) value);
case INT32:
- return JsonNodeFactory.instance.numberNode((Integer)
value);
+ return JSON_NODE_FACTORY.numberNode((Integer) value);
case INT64:
- return JsonNodeFactory.instance.numberNode((Long) value);
+ return JSON_NODE_FACTORY.numberNode((Long) value);
case FLOAT32:
- return JsonNodeFactory.instance.numberNode((Float) value);
+ return JSON_NODE_FACTORY.numberNode((Float) value);
case FLOAT64:
- return JsonNodeFactory.instance.numberNode((Double) value);
+ return JSON_NODE_FACTORY.numberNode((Double) value);
case BOOLEAN:
- return JsonNodeFactory.instance.booleanNode((Boolean)
value);
+ return JSON_NODE_FACTORY.booleanNode((Boolean) value);
case STRING:
CharSequence charSeq = (CharSequence) value;
- return
JsonNodeFactory.instance.textNode(charSeq.toString());
+ return JSON_NODE_FACTORY.textNode(charSeq.toString());
case BYTES:
if (value instanceof byte[])
- return JsonNodeFactory.instance.binaryNode((byte[])
value);
+ return JSON_NODE_FACTORY.binaryNode((byte[]) value);
else if (value instanceof ByteBuffer)
- return
JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
+ return JSON_NODE_FACTORY.binaryNode(((ByteBuffer)
value).array());
else
throw new DataException("Invalid type for bytes type:
" + value.getClass());
case ARRAY: {
Collection collection = (Collection) value;
- ArrayNode list = JsonNodeFactory.instance.arrayNode();
+ ArrayNode list = JSON_NODE_FACTORY.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null :
schema.valueSchema();
JsonNode fieldValue = convertToJson(valueSchema, elem);
@@ -668,9 +679,9 @@ public class JsonConverter implements Converter,
HeaderConverter {
ObjectNode obj = null;
ArrayNode list = null;
if (objectMode)
- obj = JsonNodeFactory.instance.objectNode();
+ obj = JSON_NODE_FACTORY.objectNode();
else
- list = JsonNodeFactory.instance.arrayNode();
+ list = JSON_NODE_FACTORY.arrayNode();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null :
schema.keySchema();
Schema valueSchema = schema == null ? null :
schema.valueSchema();
@@ -680,7 +691,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
if (objectMode)
obj.set(mapKey.asText(), mapValue);
else
-
list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
+
list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue));
}
return objectMode ? obj : list;
}
@@ -688,7 +699,7 @@ public class JsonConverter implements Converter,
HeaderConverter {
Struct struct = (Struct) value;
if (!struct.schema().equals(schema))
throw new DataException("Mismatching schema.");
- ObjectNode obj = JsonNodeFactory.instance.objectNode();
+ ObjectNode obj = JSON_NODE_FACTORY.objectNode();
for (Field field : schema.fields()) {
obj.set(field.name(), convertToJson(field.schema(),
struct.get(field)));
}
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
index a656e53..2e6e821 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.json;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.errors.SerializationException;
@@ -29,13 +30,13 @@ import org.apache.kafka.common.serialization.Deserializer;
* structured data without having associated Java classes. This deserializer
also supports Connect schemas.
*/
public class JsonDeserializer implements Deserializer<JsonNode> {
- private ObjectMapper objectMapper = new ObjectMapper();
+ private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Default constructor needed by Kafka
*/
public JsonDeserializer() {
- this(Collections.emptySet());
+ this(Collections.emptySet(),
JsonNodeFactory.withExactBigDecimals(true));
}
/**
@@ -43,9 +44,14 @@ public class JsonDeserializer implements
Deserializer<JsonNode> {
* for the deserializer
*
* @param deserializationFeatures the specified deserialization features
+ * @param jsonNodeFactory the json node factory to use.
*/
- JsonDeserializer(final Set<DeserializationFeature>
deserializationFeatures) {
+ JsonDeserializer(
+ final Set<DeserializationFeature> deserializationFeatures,
+ final JsonNodeFactory jsonNodeFactory
+ ) {
deserializationFeatures.forEach(objectMapper::enable);
+ objectMapper.setNodeFactory(jsonNodeFactory);
}
@Override
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
index 94ec0a8..0f2b62b 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
@@ -18,9 +18,14 @@ package org.apache.kafka.connect.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
+import java.util.Collections;
+import java.util.Set;
+
/**
* Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree
model allows handling arbitrarily
* structured data without corresponding Java classes. This serializer also
supports Connect schemas.
@@ -32,7 +37,22 @@ public class JsonSerializer implements Serializer<JsonNode> {
* Default constructor needed by Kafka
*/
public JsonSerializer() {
+ this(Collections.emptySet(),
JsonNodeFactory.withExactBigDecimals(true));
+ }
+ /**
+ * A constructor that additionally specifies some {@link
SerializationFeature}
+ * for the serializer
+ *
+ * @param serializationFeatures the specified serialization features
+ * @param jsonNodeFactory the json node factory to use.
+ */
+ JsonSerializer(
+ final Set<SerializationFeature> serializationFeatures,
+ final JsonNodeFactory jsonNodeFactory
+ ) {
+ serializationFeatures.forEach(objectMapper::enable);
+ objectMapper.setNodeFactory(jsonNodeFactory);
}
@Override
diff --git
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 2a56950..2e189e2 100644
---
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.json;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -65,8 +66,11 @@ import static org.junit.Assert.fail;
public class JsonConverterTest {
private static final String TOPIC = "topic";
- ObjectMapper objectMapper = new ObjectMapper();
- JsonConverter converter = new JsonConverter();
+ private final ObjectMapper objectMapper = new ObjectMapper()
+ .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
+ .setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
+
+ private final JsonConverter converter = new JsonConverter();
@Before
public void setUp() {
@@ -273,6 +277,16 @@ public class JsonConverterTest {
}
@Test
+ public void numericDecimalWithTrailingZerosToConnect() {
+ BigDecimal reference = new BigDecimal(new BigInteger("15600"), 4);
+ Schema schema = Decimal.schema(4);
+ String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\":
\"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {
\"scale\": \"4\" } }, \"payload\": 1.5600 }";
+ SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC,
msg.getBytes());
+ assertEquals(schema, schemaAndValue.schema());
+ assertEquals(reference, schemaAndValue.value());
+ }
+
+ @Test
public void highPrecisionNumericDecimalToConnect() {
// this number is too big to be kept in a float64!
BigDecimal reference = new BigDecimal("1.23456789123456789");
@@ -634,7 +648,18 @@ public class JsonConverterTest {
}
@Test
- public void decimalToJsonWithoutSchema() throws IOException {
+ public void decimalWithTrailingZerosToNumericJson() {
+
converter.configure(Collections.singletonMap(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
DecimalFormat.NUMERIC.name()), false);
+ JsonNode converted = parse(converter.fromConnectData(TOPIC,
Decimal.schema(4), new BigDecimal(new BigInteger("15600"), 4)));
+ validateEnvelope(converted);
+ assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false,
\"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1,
\"parameters\": { \"scale\": \"4\" } }"),
+ converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+ assertTrue("expected node to be numeric",
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNumber());
+ assertEquals(new BigDecimal("1.5600"),
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).decimalValue());
+ }
+
+ @Test
+ public void decimalToJsonWithoutSchema() {
assertThrows(
"expected data exception when serializing BigDecimal without
schema",
DataException.class,