[FLINK-1464] [java-api] Added ResultTypeQueryable interface implementation to 
TypeSerializerInputFormat.

This closes #349


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3f6c9ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3f6c9ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3f6c9ba

Branch: refs/heads/master
Commit: e3f6c9ba69a3e545fdd8f18b7b652fa111ade93e
Parents: 9906cba
Author: Alexander Alexandrov <alexander.s.alexand...@gmail.com>
Authored: Thu Jan 29 15:41:01 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 3 12:20:08 2015 +0100

----------------------------------------------------------------------
 .../api/java/io/TypeSerializerInputFormat.java  | 27 +++++++++++++-------
 .../api/java/io/TypeSerializerFormatTest.java   |  8 +++---
 2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3f6c9ba/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
index 0fca3e2..8e92c27 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,7 +19,9 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.BinaryInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.core.memory.DataInputView;
 
 import java.io.IOException;
@@ -28,23 +30,30 @@ import java.io.IOException;
  * Reads elements by deserializing them with a given type serializer.
  * @param <T>
  */
-public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> {
+public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> 
implements ResultTypeQueryable<T> {
 
        private static final long serialVersionUID = 2123068581665107480L;
 
+       private transient TypeInformation<T> resultType;
+
        private TypeSerializer<T> serializer;
 
-       public TypeSerializerInputFormat(TypeSerializer<T> serializer){
-               this.serializer = serializer;
+       public TypeSerializerInputFormat(TypeInformation<T> resultType) {
+               this.resultType = resultType;
+               this.serializer = resultType.createSerializer();
        }
 
        @Override
        protected T deserialize(T reuse, DataInputView dataInput) throws 
IOException {
-               if(serializer == null){
-                       throw new RuntimeException("TypeSerializerInputFormat 
requires a type serializer to " +
-                                       "be defined.");
-               }
-
                return serializer.deserialize(reuse, dataInput);
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Typing
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return resultType;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3f6c9ba/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
index ef271e7..7dd1135 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -40,6 +40,8 @@ import java.io.IOException;
 @RunWith(Parameterized.class)
 public class TypeSerializerFormatTest extends 
SequentialFormatTestBase<Tuple2<Integer, String>> {
 
+       TypeInformation<Tuple2<Integer, String>> resultType = 
TypeExtractor.getForObject(getRecord(0));
+
        private TypeSerializer<Tuple2<Integer, String>> serializer;
 
        private BlockInfo block;
@@ -47,9 +49,9 @@ public class TypeSerializerFormatTest extends 
SequentialFormatTestBase<Tuple2<In
        public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int 
degreeOfParallelism) {
                super(numberOfTuples, blockSize, degreeOfParallelism);
 
-               TypeInformation<Tuple2<Integer, String>> tti = 
TypeExtractor.getForObject(getRecord(0));
+        resultType = TypeExtractor.getForObject(getRecord(0));
 
-               serializer = tti.createSerializer();
+               serializer = resultType.createSerializer();
        }
 
        @Before
@@ -63,7 +65,7 @@ public class TypeSerializerFormatTest extends 
SequentialFormatTestBase<Tuple2<In
                
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 
this.blockSize);
 
                final TypeSerializerInputFormat<Tuple2<Integer, String>> 
inputFormat = new
-                               TypeSerializerInputFormat<Tuple2<Integer, 
String>>(serializer);
+                               TypeSerializerInputFormat<Tuple2<Integer, 
String>>(resultType);
                inputFormat.setFilePath(this.tempFile.toURI().toString());
 
                inputFormat.configure(configuration);

Reply via email to