Repository: flink Updated Branches: refs/heads/master 93d9384a4 -> c7595840f
[FLINK-3524] [kafka] Add JSONDeserializationSchema This closes #1834 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7595840 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7595840 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7595840 Branch: refs/heads/master Commit: c7595840febcdc9dfeaefc93b4576933acb84f15 Parents: 93d9384 Author: zentol <[email protected]> Authored: Wed Mar 23 11:24:50 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Mon Apr 4 15:43:18 2016 +0200 ---------------------------------------------------------------------- docs/apis/streaming/connectors/kafka.md | 9 ++- .../JSONDeserializationSchema.java | 52 ++++++++++++++ .../JSONKeyValueDeserializationSchema.java | 72 ++++++++++++++++++++ .../kafka/JSONDeserializationSchemaTest.java | 41 +++++++++++ .../JSONKeyValueDeserializationSchemaTest.java | 68 ++++++++++++++++++ 5 files changed, 240 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/docs/apis/streaming/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md index 2311c14..bc8c727 100644 --- a/docs/apis/streaming/connectors/kafka.md +++ b/docs/apis/streaming/connectors/kafka.md @@ -146,8 +146,13 @@ method gets called for each Kafka message, passing the value from Kafka. For accessing both the key and value of the Kafka message, the `KeyedDeserializationSchema` has the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`. -For convenience, Flink provides a `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`) -which creates a schema based on a Flink `TypeInformation`. +For convenience, Flink provides the following schemas: +1. `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`) which creates + a schema based on a Flink `TypeInformation`. +2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which turns the serialized JSON + into an ObjectNode object, from which fields can be accessed using objectNode.get("field").as(Int/String/...)(). + The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as + an optional "metadata" field that exposes the offset/partition/topic for this message. #### Kafka Consumers and Fault Tolerance http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java new file mode 100644 index 0000000..49e9da8 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; + +/** + * DeserializationSchema that deserializes a JSON String into an ObjectNode. + * <p> + * Fields can be accessed by calling objectNode.get(<name>).as(<type>) + */ +public class JSONDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> { + private ObjectMapper mapper; + + @Override + public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + if (mapper == null) { + mapper = new ObjectMapper(); + } + return mapper.readValue(message, ObjectNode.class); + } + + @Override + public boolean isEndOfStream(ObjectNode nextElement) { + return false; + } + + @Override + public TypeInformation<ObjectNode> getProducedType() { + return getForClass(ObjectNode.class); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java new file mode 100644 index 0000000..261a111 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; + +/** + * DeserializationSchema that deserializes a JSON String into an ObjectNode. + * <p> + * Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>) + * <p> + * Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>) + * <p> + * Metadata fields can be accessed by calling objectNode.get("metadata").get(<name>).as(<type>) and include + * the "offset" (long), "topic" (String) and "partition" (int). + */ +public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> { + private final boolean includeMetadata; + private ObjectMapper mapper; + + public JSONKeyValueDeserializationSchema(boolean includeMetadata) { + this.includeMetadata = includeMetadata; + } + + @Override + public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + if (mapper == null) { + mapper = new ObjectMapper(); + } + ObjectNode node = mapper.createObjectNode(); + node.set("key", mapper.readValue(messageKey, JsonNode.class)); + node.set("value", mapper.readValue(message, JsonNode.class)); + if (includeMetadata) { + node.putObject("metadata") + .put("offset", offset) + .put("topic", topic) + .put("partition", partition); + } + return node; + } + + @Override + public boolean isEndOfStream(ObjectNode nextElement) { + return false; + } + + @Override + public TypeInformation<ObjectNode> getProducedType() { + return getForClass(ObjectNode.class); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java new file mode 100644 index 0000000..f8e3fd1 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class JSONDeserializationSchemaTest { + @Test + public void testDeserialize() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode initialValue = mapper.createObjectNode(); + initialValue.put("key", 4).put("value", "world"); + byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + + JSONDeserializationSchema schema = new JSONDeserializationSchema(); + ObjectNode deserializedValue = schema.deserialize(null, serializedValue, "", 0, 0); + + Assert.assertEquals(4, deserializedValue.get("key").asInt()); + Assert.assertEquals("world", deserializedValue.get("value").asText()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java new file mode 100644 index 0000000..86d3105 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class JSONKeyValueDeserializationSchemaTest { + @Test + public void testDeserializeWithoutMetadata() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode initialKey = mapper.createObjectNode(); + initialKey.put("index", 4); + byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + + ObjectNode initialValue = mapper.createObjectNode(); + initialValue.put("word", "world"); + byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + + JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); + ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0); + + + Assert.assertTrue(deserializedValue.get("metadata") == null); + Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); + Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); + } + + @Test + public void testDeserializeWithMetadata() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode initialKey = mapper.createObjectNode(); + initialKey.put("index", 4); + byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + + ObjectNode initialValue = mapper.createObjectNode(); + initialValue.put("word", "world"); + byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + + JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true); + ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4); + + Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); + Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); + Assert.assertEquals("topic#1", deserializedValue.get("metadata").get("topic").asText()); + Assert.assertEquals(4, deserializedValue.get("metadata").get("offset").asInt()); + Assert.assertEquals(3, deserializedValue.get("metadata").get("partition").asInt()); + } +}
