Repository: flink
Updated Branches:
  refs/heads/master 010265b64 -> 92e1c82cc


[FLINK-3933] [streaming API] Add AbstractDeserializationSchema that handles 
produced type extraction.

The AbstractDeserializationSchema creates the produced type information 
automatically from the
generic parameters.

This closes #2010


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92e1c82c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92e1c82c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92e1c82c

Branch: refs/heads/master
Commit: 92e1c82cc545b80c3f82e01a97708aa8d70b3806
Parents: 010265b
Author: Stephan Ewen <[email protected]>
Authored: Thu May 19 12:37:05 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri May 20 09:56:31 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/kafka.md         |  19 ++-
 .../flink/api/java/typeutils/TypeExtractor.java |   4 +-
 .../AbstractDeserializationSchema.java          |  68 +++++++++++
 .../serialization/DeserializationSchema.java    |   3 +
 .../util/serialization/SimpleStringSchema.java  |   3 +-
 .../util/AbstractDeserializationSchemaTest.java | 116 +++++++++++++++++++
 6 files changed, 203 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md 
b/docs/apis/streaming/connectors/kafka.md
index da3f86c..45ec6ad 100644
--- a/docs/apis/streaming/connectors/kafka.md
+++ b/docs/apis/streaming/connectors/kafka.md
@@ -87,13 +87,13 @@ Then, import the connector in your maven project:
 
 Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-#### Installing Apache Kafka
+### Installing Apache Kafka
 
 * Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
 * On 32 bit computers 
[this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in)
 problem may occur.
 * If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
 
-#### Kafka Consumer
+### Kafka Consumer
 
 Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 
0.9.0.x versions). It provides access to one or more Kafka topics.
 
@@ -142,18 +142,25 @@ for querying the list of topics and partitions.
 For this to work, the consumer needs to be able to access the consumers from 
the machine submitting the job to the Flink cluster.
 If you experience any issues with the Kafka consumer on the client side, the 
client log might contain information about failed requests, etc.
 
-##### The `DeserializationSchema`
+#### The `DeserializationSchema`
 
-The `FlinkKafkaConsumer08` needs to know how to turn the data in Kafka into 
Java objects. The 
+The Flink Kafka Consumer needs to know how to turn the binary data in Kafka 
into Java/Scala objects. The 
 `DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
+It is usually helpful to start from the `AbstractDeserializationSchema`, which 
takes care of describing the
+produced Java/Scala type to Flink's type system. Users that implement a 
vanilla `DeserializationSchema` need
+to implement the `getProducedType(...)` method themselves.
+
 For accessing both the key and value of the Kafka message, the 
`KeyedDeserializationSchema` has
 the following deserialize method ` T deserialize(byte[] messageKey, byte[] 
message, String topic, int partition, long offset)`.
 
 For convenience, Flink provides the following schemas:
+
 1. `TypeInformationSerializationSchema` (and 
`TypeInformationKeyValueSerializationSchema`) which creates 
-    a schema based on a Flink `TypeInformation`.
+    a schema based on a Flink's `TypeInformation`. This is useful if the data 
is both written and read by Flink.
+    This schema is a performant Flink-specific alternative to other generic 
serialization approaches.
+ 
 2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which 
turns the serialized JSON 
     into an ObjectNode object, from which fields can be accessed using 
objectNode.get("field").as(Int/String/...)(). 
     The KeyValue objectNode contains a "key" and "value" field which contain 
all fields, as well as 
@@ -191,7 +198,7 @@ Flink on YARN supports automatic restart of lost YARN 
containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit 
the offsets to Zookeeper.
 
-#### Kafka Producer
+### Kafka Producer
 
 The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can 
specify a custom partitioner that assigns
 records to partitions.

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index f2b9fd2..45420a2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -272,7 +272,7 @@ public class TypeExtractor {
                }
                return new 
TypeExtractor().privateCreateTypeInfo(InputFormat.class, 
inputFormatInterface.getClass(), 0, null, null);
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Generic extraction methods
        // 
--------------------------------------------------------------------------------------------
@@ -596,7 +596,7 @@ public class TypeExtractor {
                        }
                        
                        if(curT == Tuple0.class) {
-                               return new TupleTypeInfo(Tuple0.class, new 
TypeInformation<?>[0]);
+                               return new TupleTypeInfo(Tuple0.class);
                        }
                        
                        // check if immediate child of Tuple has generics

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
new file mode 100644
index 0000000..77e76e5
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema describes how to turn the byte messages 
delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala 
objects) that are
+ * processed by Flink.
+ * 
+ * <p>This base variant of the deserialization schema produces the type 
information
+ * automatically by extracting it from the generic class arguments.
+ * 
+ * @param <T> The type created by the deserialization schema.
+ */
+public abstract class AbstractDeserializationSchema<T> implements 
DeserializationSchema<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * De-serializes the byte message.
+        *
+        * @param message The message, as a byte array.
+        * @return The de-serialized message as an object.
+        */
+       @Override
+       public abstract T deserialize(byte[] message) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        * 
+        * <p>This default implementation returns always false, meaning the 
stream is interpreted
+        * to be unbounded.
+        *
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       @Override
+       public boolean isEndOfStream(T nextElement) {
+               return false;
+       }
+       
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return 
TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 
0, null, null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index b376d69..2e27ba6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -28,6 +28,9 @@ import 
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
  * data sources (for example Apache Kafka) into data types (Java/Scala 
objects) that are
  * processed by Flink.
  * 
+ * <p>Note: In most cases, one should start from {@link 
AbstractDeserializationSchema}, which
+ * takes care of producing the return type information automatically.
+ * 
  * @param <T> The type created by the deserialization schema.
  */
 @Public

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index a051d32..2de4c01 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -25,8 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
  * Very simple serialization schema for strings.
  */
 @PublicEvolving
-public class SimpleStringSchema implements DeserializationSchema<String>,
-               SerializationSchema<String> {
+public class SimpleStringSchema implements DeserializationSchema<String>, 
SerializationSchema<String> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
new file mode 100644
index 0000000..220c1cd
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
+
+import org.codehaus.jackson.map.util.JSONPObject;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AbstractDeserializationSchemaTest {
+
+       @Test
+       public void testTypeExtractionTuple() {
+               TypeInformation<Tuple2<byte[], byte[]>> type = new 
TupleSchema().getProducedType();
+               TypeInformation<Tuple2<byte[], byte[]>> expected = 
TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){});
+               assertEquals(expected, type);
+       }
+       
+       @Test
+       public void testTypeExtractionTupleAnonymous() {
+               TypeInformation<Tuple2<byte[], byte[]>> type = new 
AbstractDeserializationSchema<Tuple2<byte[], byte[]>>() {
+                       @Override
+                       public Tuple2<byte[], byte[]> deserialize(byte[] 
message) throws IOException {
+                               throw new UnsupportedOperationException();
+                       }
+               }.getProducedType();
+               
+               TypeInformation<Tuple2<byte[], byte[]>> expected = 
TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){});
+               assertEquals(expected, type);
+       }
+
+       @Test
+       public void testTypeExtractionGeneric() {
+               TypeInformation<JSONPObject> type = new 
JsonSchema().getProducedType();
+               TypeInformation<JSONPObject> expected = TypeInformation.of(new 
TypeHint<JSONPObject>(){});
+               assertEquals(expected, type);
+       }
+
+       @Test
+       public void testTypeExtractionGenericAnonymous() {
+               TypeInformation<JSONPObject> type = new 
AbstractDeserializationSchema<JSONPObject>() {
+                       @Override
+                       public JSONPObject deserialize(byte[] message) throws 
IOException {
+                               throw new UnsupportedOperationException();
+                       }
+               }.getProducedType();
+
+               TypeInformation<JSONPObject> expected = TypeInformation.of(new 
TypeHint<JSONPObject>(){});
+               assertEquals(expected, type);
+       }
+
+       @Test
+       public void testTypeExtractionRawException() {
+               try {
+                       new RawSchema().getProducedType();
+                       fail();
+               } catch (InvalidTypesException e) {
+                       // expected
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Test types
+       // 
------------------------------------------------------------------------
+
+       private static class TupleSchema extends 
AbstractDeserializationSchema<Tuple2<byte[], byte[]>> {
+
+               @Override
+               public Tuple2<byte[], byte[]> deserialize(byte[] message) 
throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+       }
+       
+       private static class JsonSchema extends 
AbstractDeserializationSchema<JSONPObject> {
+
+               @Override
+               public JSONPObject deserialize(byte[] message) throws 
IOException {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       @SuppressWarnings("rawtypes")
+       private static class RawSchema extends AbstractDeserializationSchema {
+
+               @Override
+               public Object deserialize(byte[] message) throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+       }
+}

Reply via email to