Repository: flink
Updated Branches:
  refs/heads/master fce64e193 -> b5e2e3637


[hotfix] Fix JSONDeserializationSchema for Kafka


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/042ad7b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/042ad7b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/042ad7b9

Branch: refs/heads/master
Commit: 042ad7b9020dffef5c6861d41170a6e96579cd1f
Parents: fce64e1
Author: Robert Metzger <[email protected]>
Authored: Thu Jun 9 10:56:09 2016 +0200
Committer: Robert Metzger <[email protected]>
Committed: Thu Jun 9 10:56:45 2016 +0200

----------------------------------------------------------------------
 .../util/serialization/JSONDeserializationSchema.java     | 10 ++--------
 .../connectors/kafka/JSONDeserializationSchemaTest.java   |  2 +-
 2 files changed, 3 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/042ad7b9/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
index 49e9da8..d170058 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -18,22 +18,20 @@ package org.apache.flink.streaming.util.serialization;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import java.io.IOException;
 
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
 
 /**
  * DeserializationSchema that deserializes a JSON String into an ObjectNode.
  * <p>
  * Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
  */
-public class JSONDeserializationSchema implements 
KeyedDeserializationSchema<ObjectNode> {
+public class JSONDeserializationSchema extends 
AbstractDeserializationSchema<ObjectNode> {
        private ObjectMapper mapper;
 
        @Override
-       public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset) throws IOException {
+       public ObjectNode deserialize(byte[] message) throws IOException {
                if (mapper == null) {
                        mapper = new ObjectMapper();
                }
@@ -45,8 +43,4 @@ public class JSONDeserializationSchema implements 
KeyedDeserializationSchema<Obj
                return false;
        }
 
-       @Override
-       public TypeInformation<ObjectNode> getProducedType() {
-               return getForClass(ObjectNode.class);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/042ad7b9/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
index f8e3fd1..1882a7e 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
@@ -33,7 +33,7 @@ public class JSONDeserializationSchemaTest {
                byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
 
                JSONDeserializationSchema schema = new 
JSONDeserializationSchema();
-               ObjectNode deserializedValue = schema.deserialize(null, 
serializedValue, "", 0, 0);
+               ObjectNode deserializedValue = 
schema.deserialize(serializedValue);
 
                Assert.assertEquals(4, deserializedValue.get("key").asInt());
                Assert.assertEquals("world", 
deserializedValue.get("value").asText());

Reply via email to