Repository: flink
Updated Branches:
  refs/heads/master 93d9384a4 -> c7595840f


[FLINK-3524] [kafka] Add JSONDeserializationSchema

This closes #1834


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7595840
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7595840
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7595840

Branch: refs/heads/master
Commit: c7595840febcdc9dfeaefc93b4576933acb84f15
Parents: 93d9384
Author: zentol <[email protected]>
Authored: Wed Mar 23 11:24:50 2016 +0100
Committer: Robert Metzger <[email protected]>
Committed: Mon Apr 4 15:43:18 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/kafka.md         |  9 ++-
 .../JSONDeserializationSchema.java              | 52 ++++++++++++++
 .../JSONKeyValueDeserializationSchema.java      | 72 ++++++++++++++++++++
 .../kafka/JSONDeserializationSchemaTest.java    | 41 +++++++++++
 .../JSONKeyValueDeserializationSchemaTest.java  | 68 ++++++++++++++++++
 5 files changed, 240 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md 
b/docs/apis/streaming/connectors/kafka.md
index 2311c14..bc8c727 100644
--- a/docs/apis/streaming/connectors/kafka.md
+++ b/docs/apis/streaming/connectors/kafka.md
@@ -146,8 +146,13 @@ method gets called for each Kafka message, passing the 
value from Kafka.
 For accessing both the key and value of the Kafka message, the 
`KeyedDeserializationSchema` has
 the following deserialize method ` T deserialize(byte[] messageKey, byte[] 
message, String topic, int partition, long offset)`.
 
-For convenience, Flink provides a `TypeInformationSerializationSchema` (and 
`TypeInformationKeyValueSerializationSchema`) 
-which creates a schema based on a Flink `TypeInformation`.
+For convenience, Flink provides the following schemas:
+1. `TypeInformationSerializationSchema` (and 
`TypeInformationKeyValueSerializationSchema`) which creates 
+    a schema based on a Flink `TypeInformation`.
+2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which 
turns the serialized JSON 
+    into an ObjectNode object, from which fields can be accessed using 
objectNode.get("field").as(Int/String/...)(). 
+    The KeyValue objectNode contains a "key" and "value" field which contain 
all fields, as well as 
+    an optional "metadata" field that exposes the offset/partition/topic for 
this message.
 
 #### Kafka Consumers and Fault Tolerance
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
new file mode 100644
index 0000000..49e9da8
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an ObjectNode.
+ * <p>
+ * Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
+ */
+public class JSONDeserializationSchema implements 
KeyedDeserializationSchema<ObjectNode> {
+       private ObjectMapper mapper;
+
+       @Override
+       public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset) throws IOException {
+               if (mapper == null) {
+                       mapper = new ObjectMapper();
+               }
+               return mapper.readValue(message, ObjectNode.class);
+       }
+
+       @Override
+       public boolean isEndOfStream(ObjectNode nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<ObjectNode> getProducedType() {
+               return getForClass(ObjectNode.class);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
new file mode 100644
index 0000000..261a111
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an ObjectNode.
+ * <p>
+ * Key fields can be accessed by calling 
objectNode.get("key").get(&lt;name>).as(&lt;type>)
+ * <p>
+ * Value fields can be accessed by calling 
objectNode.get("value").get(&lt;name>).as(&lt;type>)
+ * <p>
+ * Metadata fields can be accessed by calling 
objectNode.get("metadata").get(&lt;name>).as(&lt;type>) and include
+ * the "offset" (long), "topic" (String) and "partition" (int).
+ */
+public class JSONKeyValueDeserializationSchema implements 
KeyedDeserializationSchema<ObjectNode> {
+       private final boolean includeMetadata;
+       private ObjectMapper mapper;
+
+       public JSONKeyValueDeserializationSchema(boolean includeMetadata) {
+               this.includeMetadata = includeMetadata;
+       }
+
+       @Override
+       public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset) throws IOException {
+               if (mapper == null) {
+                       mapper = new ObjectMapper();
+               }
+               ObjectNode node = mapper.createObjectNode();
+               node.set("key", mapper.readValue(messageKey, JsonNode.class));
+               node.set("value", mapper.readValue(message, JsonNode.class));
+               if (includeMetadata) {
+                       node.putObject("metadata")
+                               .put("offset", offset)
+                               .put("topic", topic)
+                               .put("partition", partition);
+               }
+               return node;
+       }
+
+       @Override
+       public boolean isEndOfStream(ObjectNode nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<ObjectNode> getProducedType() {
+               return getForClass(ObjectNode.class);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
new file mode 100644
index 0000000..f8e3fd1
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class JSONDeserializationSchemaTest {
+       @Test
+       public void testDeserialize() throws IOException {
+               ObjectMapper mapper = new ObjectMapper();
+               ObjectNode initialValue = mapper.createObjectNode();
+               initialValue.put("key", 4).put("value", "world");
+               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+
+               JSONDeserializationSchema schema = new 
JSONDeserializationSchema();
+               ObjectNode deserializedValue = schema.deserialize(null, 
serializedValue, "", 0, 0);
+
+               Assert.assertEquals(4, deserializedValue.get("key").asInt());
+               Assert.assertEquals("world", 
deserializedValue.get("value").asText());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7595840/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
new file mode 100644
index 0000000..86d3105
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class JSONKeyValueDeserializationSchemaTest {
+       @Test
+       public void testDeserializeWithoutMetadata() throws IOException {
+               ObjectMapper mapper = new ObjectMapper();
+               ObjectNode initialKey = mapper.createObjectNode();
+               initialKey.put("index", 4);
+               byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+
+               ObjectNode initialValue = mapper.createObjectNode();
+               initialValue.put("word", "world");
+               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+
+               JSONKeyValueDeserializationSchema schema = new 
JSONKeyValueDeserializationSchema(false);
+               ObjectNode deserializedValue = 
schema.deserialize(serializedKey, serializedValue, "", 0, 0);
+
+
+               Assert.assertTrue(deserializedValue.get("metadata") == null);
+               Assert.assertEquals(4, 
deserializedValue.get("key").get("index").asInt());
+               Assert.assertEquals("world", 
deserializedValue.get("value").get("word").asText());
+       }
+
+       @Test
+       public void testDeserializeWithMetadata() throws IOException {
+               ObjectMapper mapper = new ObjectMapper();
+               ObjectNode initialKey = mapper.createObjectNode();
+               initialKey.put("index", 4);
+               byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+
+               ObjectNode initialValue = mapper.createObjectNode();
+               initialValue.put("word", "world");
+               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+
+               JSONKeyValueDeserializationSchema schema = new 
JSONKeyValueDeserializationSchema(true);
+               ObjectNode deserializedValue = 
schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4);
+
+               Assert.assertEquals(4, 
deserializedValue.get("key").get("index").asInt());
+               Assert.assertEquals("world", 
deserializedValue.get("value").get("word").asText());
+               Assert.assertEquals("topic#1", 
deserializedValue.get("metadata").get("topic").asText());
+               Assert.assertEquals(4, 
deserializedValue.get("metadata").get("offset").asInt());
+               Assert.assertEquals(3, 
deserializedValue.get("metadata").get("partition").asInt());
+       }
+}

Reply via email to