[FLINK-2158] Add support for null to the DateSerializer

This closes #780


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e9e0d68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e9e0d68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e9e0d68

Branch: refs/heads/master
Commit: 4e9e0d6839ba1a817432169bf6ed7b777e3096d6
Parents: f5c1768
Author: Robert Metzger <[email protected]>
Authored: Thu Jun 4 13:19:32 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jul 1 16:14:06 2015 +0200

----------------------------------------------------------------------
 .../common/typeutils/base/DateSerializer.java   | 26 ++++++++++++++--
 .../apache/flink/types/NullFieldException.java  | 12 ++++++++
 .../common/typeutils/SerializerTestBase.java    |  3 +-
 .../java/typeutils/runtime/TupleSerializer.java |  2 +-
 .../javaApiOperators/GroupReduceITCase.java     | 32 ++++++++++++++++++++
 5 files changed, 70 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 6aa11eb..d427918 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -43,11 +43,18 @@ public final class DateSerializer extends 
TypeSerializerSingleton<Date> {
 
        @Override
        public Date copy(Date from) {
+               if(from == null) {
+                       return null;
+               }
                return new Date(from.getTime());
        }
+
        
        @Override
        public Date copy(Date from, Date reuse) {
+               if(from == null) {
+                       return null;
+               }
                reuse.setTime(from.getTime());
                return reuse;
        }
@@ -59,17 +66,30 @@ public final class DateSerializer extends 
TypeSerializerSingleton<Date> {
 
        @Override
        public void serialize(Date record, DataOutputView target) throws 
IOException {
-               target.writeLong(record.getTime());
+               if(record == null) {
+                       target.writeLong(-1L);
+               } else {
+                       target.writeLong(record.getTime());
+               }
        }
 
        @Override
        public Date deserialize(DataInputView source) throws IOException {
-               return new Date(source.readLong());
+               long v = source.readLong();
+               if(v == -1L) {
+                       return null;
+               } else {
+                       return new Date(v);
+               }
        }
        
        @Override
        public Date deserialize(Date reuse, DataInputView source) throws 
IOException {
-               reuse.setTime(source.readLong());
+               long v = source.readLong();
+               if(v == -1L) {
+                       return null;
+               }
+               reuse.setTime(v);
                return reuse;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java 
b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
index c192ec2..5c48bf4 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
@@ -59,6 +59,18 @@ public class NullFieldException extends RuntimeException
                super("Field " + fieldIdx + " is null, but expected to hold a 
value.");
                this.fieldPos = fieldIdx;
        }
+
+       /**
+        * Constructs an {@code NullFieldException} with a default message, 
referring to
+        * given field number as the null field and a cause (Throwable)
+        *
+        * @param fieldIdx The index of the field that was null, but expected 
to hold a value.
+        * @param cause Pass the root cause of the error
+        */
+       public NullFieldException(int fieldIdx, Throwable cause) {
+               super("Field " + fieldIdx + " is null, but expected to hold a 
value.", cause);
+               this.fieldPos = fieldIdx;
+       }
        
        /**
         * Gets the field number that was attempted to access. If the number is 
not set, this method returns

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 59bea0c..998ae12 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -182,7 +182,8 @@ public abstract class SerializerTestBase<T> {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
+
        @Test
        public void testSerializeIndividuallyReusingValues() {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 231486d..2b330c2 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -115,7 +115,7 @@ public final class TupleSerializer<T extends Tuple> extends 
TupleSerializerBase<
                        try {
                                fieldSerializers[i].serialize(o, target);
                        } catch (NullPointerException npex) {
-                               throw new NullFieldException(i);
+                               throw new NullFieldException(i, npex);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 4061195..d52055d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -52,6 +52,7 @@ import org.junit.runners.Parameterized;
 import scala.math.BigInt;
 
 import java.util.Collection;
+import java.util.Date;
 import java.util.Iterator;
 
 @SuppressWarnings("serial")
@@ -1101,6 +1102,37 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                expected = "(1)\n";
        }
 
+       /**
+        * Fix for FLINK-2158.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testDateNullException() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<Integer, Date>> in = env.fromElements(new 
Tuple2<Integer, Date>(0, new Date(1230000000)),
+                               new Tuple2<Integer, Date>(1, null),
+                               new Tuple2<Integer, Date>(2, new 
Date(1230000000))
+               );
+
+               DataSet<String> r = in.groupBy(0).reduceGroup(new 
GroupReduceFunction<Tuple2<Integer, Date>, String>() {
+                       @Override
+                       public void reduce(Iterable<Tuple2<Integer, Date>> 
values, Collector<String> out) throws Exception {
+                               for (Tuple2<Integer, Date> e : values) {
+                                       out.collect(Integer.toString(e.f0));
+                               }
+                       }
+               });
+
+               r.writeAsText(resultPath);
+               env.execute();
+
+               expected = "0\n1\n2\n";
+       }
+
+
+
        public static class GroupReducer8 implements 
GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
                @Override
                public void reduce(

Reply via email to