[FLINK-1464] [java-api] Added ResultTypeQueryable interface implementation to TypeSerializerInputFormat.
This closes #349 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3f6c9ba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3f6c9ba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3f6c9ba Branch: refs/heads/master Commit: e3f6c9ba69a3e545fdd8f18b7b652fa111ade93e Parents: 9906cba Author: Alexander Alexandrov <alexander.s.alexand...@gmail.com> Authored: Thu Jan 29 15:41:01 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 3 12:20:08 2015 +0100 ---------------------------------------------------------------------- .../api/java/io/TypeSerializerInputFormat.java | 27 +++++++++++++------- .../api/java/io/TypeSerializerFormatTest.java | 8 +++--- 2 files changed, 23 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e3f6c9ba/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java index 0fca3e2..8e92c27 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,7 +19,9 @@ package org.apache.flink.api.java.io; import org.apache.flink.api.common.io.BinaryInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.memory.DataInputView; import java.io.IOException; @@ -28,23 +30,30 @@ import java.io.IOException; * Reads elements by deserializing them with a given type serializer. * @param <T> */ -public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> { +public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> implements ResultTypeQueryable<T> { private static final long serialVersionUID = 2123068581665107480L; + private transient TypeInformation<T> resultType; + private TypeSerializer<T> serializer; - public TypeSerializerInputFormat(TypeSerializer<T> serializer){ - this.serializer = serializer; + public TypeSerializerInputFormat(TypeInformation<T> resultType) { + this.resultType = resultType; + this.serializer = resultType.createSerializer(); } @Override protected T deserialize(T reuse, DataInputView dataInput) throws IOException { - if(serializer == null){ - throw new RuntimeException("TypeSerializerInputFormat requires a type serializer to " + - "be defined."); - } - return serializer.deserialize(reuse, dataInput); } + + // -------------------------------------------------------------------------------------------- + // Typing + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation<T> getProducedType() { + return resultType; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e3f6c9ba/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java index ef271e7..7dd1135 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java @@ -40,6 +40,8 @@ import java.io.IOException; @RunWith(Parameterized.class) public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer, String>> { + TypeInformation<Tuple2<Integer, String>> resultType = TypeExtractor.getForObject(getRecord(0)); + private TypeSerializer<Tuple2<Integer, String>> serializer; private BlockInfo block; @@ -47,9 +49,9 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism) { super(numberOfTuples, blockSize, degreeOfParallelism); - TypeInformation<Tuple2<Integer, String>> tti = TypeExtractor.getForObject(getRecord(0)); + resultType = TypeExtractor.getForObject(getRecord(0)); - serializer = tti.createSerializer(); + serializer = resultType.createSerializer(); } @Before @@ -63,7 +65,7 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize); final TypeSerializerInputFormat<Tuple2<Integer, String>> inputFormat = new - TypeSerializerInputFormat<Tuple2<Integer, String>>(serializer); + TypeSerializerInputFormat<Tuple2<Integer, String>>(resultType); inputFormat.setFilePath(this.tempFile.toURI().toString()); inputFormat.configure(configuration);