[FLINK-2158] Add support for null to the DateSerializer This closes #780
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e9e0d68 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e9e0d68 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e9e0d68 Branch: refs/heads/master Commit: 4e9e0d6839ba1a817432169bf6ed7b777e3096d6 Parents: f5c1768 Author: Robert Metzger <[email protected]> Authored: Thu Jun 4 13:19:32 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Jul 1 16:14:06 2015 +0200 ---------------------------------------------------------------------- .../common/typeutils/base/DateSerializer.java | 26 ++++++++++++++-- .../apache/flink/types/NullFieldException.java | 12 ++++++++ .../common/typeutils/SerializerTestBase.java | 3 +- .../java/typeutils/runtime/TupleSerializer.java | 2 +- .../javaApiOperators/GroupReduceITCase.java | 32 ++++++++++++++++++++ 5 files changed, 70 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java index 6aa11eb..d427918 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java @@ -43,11 +43,18 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> { @Override public Date copy(Date from) { + if(from == null) { + return null; + } return new Date(from.getTime()); } + @Override public Date copy(Date from, Date reuse) { + if(from == null) { + return null; + } reuse.setTime(from.getTime()); return reuse; } @@ -59,17 +66,30 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> { @Override public void serialize(Date record, DataOutputView target) throws IOException { - target.writeLong(record.getTime()); + if(record == null) { + target.writeLong(-1L); + } else { + target.writeLong(record.getTime()); + } } @Override public Date deserialize(DataInputView source) throws IOException { - return new Date(source.readLong()); + long v = source.readLong(); + if(v == -1L) { + return null; + } else { + return new Date(v); + } } @Override public Date deserialize(Date reuse, DataInputView source) throws IOException { - reuse.setTime(source.readLong()); + long v = source.readLong(); + if(v == -1L) { + return null; + } + reuse.setTime(v); return reuse; } http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java index c192ec2..5c48bf4 100644 --- a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java +++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java @@ -59,6 +59,18 @@ public class NullFieldException extends RuntimeException super("Field " + fieldIdx + " is null, but expected to hold a value."); this.fieldPos = fieldIdx; } + + /** + * Constructs an {@code NullFieldException} with a default message, referring to + * given field number as the null field and a cause (Throwable) + * + * @param fieldIdx The index of the field that was null, but expected to hold a value. + * @param cause Pass the root cause of the error + */ + public NullFieldException(int fieldIdx, Throwable cause) { + super("Field " + fieldIdx + " is null, but expected to hold a value.", cause); + this.fieldPos = fieldIdx; + } /** * Gets the field number that was attempted to access. If the number is not set, this method returns http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 59bea0c..998ae12 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -182,7 +182,8 @@ public abstract class SerializerTestBase<T> { fail("Exception in test: " + e.getMessage()); } } - + + @Test public void testSerializeIndividuallyReusingValues() { try { http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index 231486d..2b330c2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -115,7 +115,7 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase< try { fieldSerializers[i].serialize(o, target); } catch (NullPointerException npex) { - throw new NullFieldException(i); + throw new NullFieldException(i, npex); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 4061195..d52055d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -52,6 +52,7 @@ import org.junit.runners.Parameterized; import scala.math.BigInt; import java.util.Collection; +import java.util.Date; import java.util.Iterator; @SuppressWarnings("serial") @@ -1101,6 +1102,37 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { expected = "(1)\n"; } + /** + * Fix for FLINK-2158. + * + * @throws Exception + */ + @Test + public void testDateNullException() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Integer, Date>> in = env.fromElements(new Tuple2<Integer, Date>(0, new Date(1230000000)), + new Tuple2<Integer, Date>(1, null), + new Tuple2<Integer, Date>(2, new Date(1230000000)) + ); + + DataSet<String> r = in.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, Date>, String>() { + @Override + public void reduce(Iterable<Tuple2<Integer, Date>> values, Collector<String> out) throws Exception { + for (Tuple2<Integer, Date> e : values) { + out.collect(Integer.toString(e.f0)); + } + } + }); + + r.writeAsText(resultPath); + env.execute(); + + expected = "0\n1\n2\n"; + } + + + public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> { @Override public void reduce(
