This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new df5336e KAFKA-3832; Kafka Connect's JSON Converter never outputs a
null value (#6027)
df5336e is described below
commit df5336e1aa4e6537b94645bb4c992f50f0093d81
Author: Renato Mefi <[email protected]>
AuthorDate: Fri Dec 28 18:39:52 2018 +0100
KAFKA-3832; Kafka Connect's JSON Converter never outputs a null value
(#6027)
When using the Connect `JsonConverter`, it's impossible to produce
tombstone messages, thus impacting the compaction of the topic. This patch
allows the converter with and without schemas to output a NULL byte value in
order to have a proper tombstone message. When it's regarding to get this data
into a connect record, the approach is the same as when the payload looks like
`"{ "schema": null, "payload": null }"`, this way the sink connectors can
maintain their functionality and reduc [...]
Reviewers: Gunnar Morling <[email protected]>, Randall Hauch
<[email protected]>, Jason Gustafson <[email protected]>
---
.../apache/kafka/connect/json/JsonConverter.java | 31 ++++++++++++----------
.../kafka/connect/json/JsonConverterTest.java | 28 +++++++++++++++----
2 files changed, 40 insertions(+), 19 deletions(-)
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index c1322b1..546fcf0 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -317,6 +317,10 @@ public class JsonConverter implements Converter,
HeaderConverter {
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
+ if (schema == null && value == null) {
+ return null;
+ }
+
JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema,
value) : convertToJsonWithoutEnvelope(schema, value);
try {
return serializer.serialize(topic, jsonValue);
@@ -328,13 +332,19 @@ public class JsonConverter implements Converter,
HeaderConverter {
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
JsonNode jsonValue;
+
+ // This handles a tombstone message
+ if (value == null) {
+ return SchemaAndValue.NULL;
+ }
+
try {
jsonValue = deserializer.deserialize(topic, value);
} catch (SerializationException e) {
throw new DataException("Converting byte[] to Kafka Connect data
failed due to serialization error: ", e);
}
- if (enableSchemas && (jsonValue == null || !jsonValue.isObject() ||
jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
+ if (enableSchemas && (!jsonValue.isObject() || jsonValue.size() != 2
|| !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) ||
!jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)))
throw new DataException("JsonConverter with schemas.enable
requires \"schema\" and \"payload\" fields and may not contain additional
fields." +
" If you are trying to deserialize plain JSON data, set
schemas.enable=false in your converter configuration.");
@@ -342,23 +352,16 @@ public class JsonConverter implements Converter,
HeaderConverter {
// was stripped during serialization and we need to fill in an
all-encompassing schema.
if (!enableSchemas) {
ObjectNode envelope = JsonNodeFactory.instance.objectNode();
- envelope.set("schema", null);
- envelope.set("payload", jsonValue);
+ envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
+ envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);
jsonValue = envelope;
}
- return jsonToConnect(jsonValue);
- }
-
- private SchemaAndValue jsonToConnect(JsonNode jsonValue) {
- if (jsonValue == null)
- return SchemaAndValue.NULL;
-
- if (!jsonValue.isObject() || jsonValue.size() != 2 ||
!jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) ||
!jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
- throw new DataException("JSON value converted to Kafka Connect
must be in envelope containing schema");
-
Schema schema =
asConnectSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
- return new SchemaAndValue(schema, convertToConnect(schema,
jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
+ return new SchemaAndValue(
+ schema,
+ convertToConnect(schema,
jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+ );
}
public ObjectNode asJsonSchema(Schema schema) {
diff --git
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 7686fdb..d5bb24c 100644
---
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -172,10 +172,13 @@ public class JsonConverterTest {
assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
}
- @Test(expected = DataException.class)
+ @Test
public void nullToConnect() {
- // When schemas are enabled, trying to decode a null should be an
error -- we should *always* have the envelope
- assertEquals(SchemaAndValue.NULL, converter.toConnectData(TOPIC,
null));
+ // When schemas are enabled, trying to decode a tombstone should be an
empty envelope
+ // the behavior is the same as when the json is "{ "schema": null,
"payload": null }"
+ // to keep compatibility with the record
+ SchemaAndValue converted = converter.toConnectData(TOPIC, null);
+ assertEquals(SchemaAndValue.NULL, converted);
}
@Test
@@ -696,6 +699,23 @@ public class JsonConverterTest {
);
}
+ @Test
+ public void nullSchemaAndNullValueToJson() {
+ // This characterizes the production of tombstone messages when Json
schemas is enabled
+ Map<String, Boolean> props =
Collections.singletonMap("schemas.enable", true);
+ converter.configure(props, true);
+ byte[] converted = converter.fromConnectData(TOPIC, null, null);
+ assertNull(converted);
+ }
+
+ @Test
+ public void nullValueToJson() {
+ // This characterizes the production of tombstone messages when Json
schemas is not enabled
+ Map<String, Boolean> props =
Collections.singletonMap("schemas.enable", false);
+ converter.configure(props, true);
+ byte[] converted = converter.fromConnectData(TOPIC, null, null);
+ assertNull(converted);
+ }
@Test(expected = DataException.class)
public void mismatchSchemaJson() {
@@ -703,8 +723,6 @@ public class JsonConverterTest {
converter.fromConnectData(TOPIC, Schema.FLOAT64_SCHEMA, true);
}
-
-
@Test
public void noSchemaToConnect() {
Map<String, Boolean> props =
Collections.singletonMap("schemas.enable", false);