Repository: flink Updated Branches: refs/heads/master fce64e193 -> b5e2e3637
[hotfix] Fix JSONDeserializationSchema for Kafka Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/042ad7b9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/042ad7b9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/042ad7b9 Branch: refs/heads/master Commit: 042ad7b9020dffef5c6861d41170a6e96579cd1f Parents: fce64e1 Author: Robert Metzger <[email protected]> Authored: Thu Jun 9 10:56:09 2016 +0200 Committer: Robert Metzger <[email protected]> Committed: Thu Jun 9 10:56:45 2016 +0200 ---------------------------------------------------------------------- .../util/serialization/JSONDeserializationSchema.java | 10 ++-------- .../connectors/kafka/JSONDeserializationSchemaTest.java | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/042ad7b9/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java index 49e9da8..d170058 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java @@ -18,22 +18,20 @@ package org.apache.flink.streaming.util.serialization; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; -import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; /** * DeserializationSchema that deserializes a JSON String into an ObjectNode. * <p> * Fields can be accessed by calling objectNode.get(<name>).as(<type>) */ -public class JSONDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> { +public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> { private ObjectMapper mapper; @Override - public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public ObjectNode deserialize(byte[] message) throws IOException { if (mapper == null) { mapper = new ObjectMapper(); } @@ -45,8 +43,4 @@ public class JSONDeserializationSchema implements KeyedDeserializationSchema<Obj return false; } - @Override - public TypeInformation<ObjectNode> getProducedType() { - return getForClass(ObjectNode.class); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/042ad7b9/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java index f8e3fd1..1882a7e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java @@ -33,7 +33,7 @@ public class JSONDeserializationSchemaTest { byte[] serializedValue = mapper.writeValueAsBytes(initialValue); JSONDeserializationSchema schema = new JSONDeserializationSchema(); - ObjectNode deserializedValue = schema.deserialize(null, serializedValue, "", 0, 0); + ObjectNode deserializedValue = schema.deserialize(serializedValue); Assert.assertEquals(4, deserializedValue.get("key").asInt()); Assert.assertEquals("world", deserializedValue.get("value").asText());
