This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 463cedd  [FLINK-12850][core] Introduce 
LocalDate/LocalTime/LocalDateTime TypeInfo
463cedd is described below

commit 463cedddec1bd55545f6e3e39ada04229ef5ceb5
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jun 17 14:00:07 2019 +0800

    [FLINK-12850][core] Introduce LocalDate/LocalTime/LocalDateTime TypeInfo
    
    This closes #8757
---
 .../api/common/typeinfo/LocalTimeTypeInfo.java     | 171 +++++++++++++++++++
 .../apache/flink/api/common/typeinfo/Types.java    |  17 ++
 .../common/typeutils/base/LocalDateComparator.java | 187 +++++++++++++++++++++
 .../common/typeutils/base/LocalDateSerializer.java | 114 +++++++++++++
 .../typeutils/base/LocalDateTimeComparator.java    | 153 +++++++++++++++++
 .../typeutils/base/LocalDateTimeSerializer.java    | 118 +++++++++++++
 .../common/typeutils/base/LocalTimeComparator.java | 125 ++++++++++++++
 .../common/typeutils/base/LocalTimeSerializer.java | 117 +++++++++++++
 .../api/common/typeinfo/LocalTimeTypeInfoTest.java |  36 ++++
 .../typeutils/base/LocalDateComparatorTest.java    |  49 ++++++
 .../typeutils/base/LocalDateSerializerTest.java    |  56 ++++++
 .../base/LocalDateTimeComparatorTest.java          |  51 ++++++
 .../base/LocalDateTimeSerializerTest.java          |  58 +++++++
 .../typeutils/base/LocalTimeComparatorTest.java    |  48 ++++++
 .../typeutils/base/LocalTimeSerializerTest.java    |  55 ++++++
 .../apache/flink/api/scala/typeutils/Types.scala   |  15 ++
 16 files changed, 1370 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfo.java
new file mode 100644
index 0000000..baaa260
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfo.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateComparator;
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeComparator;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeComparator;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
+
+import java.lang.reflect.Constructor;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.Temporal;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Type information for Java LocalDate/LocalTime/LocalDateTime.
+ */
+@PublicEvolving
+public class LocalTimeTypeInfo<T extends Temporal> extends TypeInformation<T> 
implements AtomicType<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final LocalTimeTypeInfo<LocalDate> LOCAL_DATE = new 
LocalTimeTypeInfo<>(
+                       LocalDate.class, LocalDateSerializer.INSTANCE, 
LocalDateComparator.class);
+
+       public static final LocalTimeTypeInfo<LocalTime> LOCAL_TIME = new 
LocalTimeTypeInfo<>(
+                       LocalTime.class, LocalTimeSerializer.INSTANCE, 
LocalTimeComparator.class);
+
+       public static final LocalTimeTypeInfo<LocalDateTime> LOCAL_DATE_TIME = 
new LocalTimeTypeInfo<>(
+                       LocalDateTime.class, LocalDateTimeSerializer.INSTANCE, 
LocalDateTimeComparator.class);
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private final Class<T> clazz;
+
+       private final TypeSerializer<T> serializer;
+
+       private final Class<? extends TypeComparator<T>> comparatorClass;
+
+       protected LocalTimeTypeInfo(Class<T> clazz, TypeSerializer<T> 
serializer, Class<? extends TypeComparator<T>> comparatorClass) {
+               this.clazz = checkNotNull(clazz);
+               this.serializer = checkNotNull(serializer);
+               this.comparatorClass = checkNotNull(comparatorClass);
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @Override
+       public Class<T> getTypeClass() {
+               return clazz;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return true;
+       }
+
+       @Override
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
+               return serializer;
+       }
+
+       @Override
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
+               return instantiateComparator(comparatorClass, 
sortOrderAscending);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(clazz, serializer, comparatorClass);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof LocalTimeTypeInfo;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof LocalTimeTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       LocalTimeTypeInfo<T> other = (LocalTimeTypeInfo<T>) obj;
+
+                       return other.canEqual(this) &&
+                                       this.clazz == other.clazz &&
+                                       serializer.equals(other.serializer) &&
+                                       this.comparatorClass == 
other.comparatorClass;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return clazz.getSimpleName();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static <X> TypeComparator<X> instantiateComparator(Class<? 
extends TypeComparator<X>> comparatorClass, boolean ascendingOrder) {
+               try {
+                       Constructor<? extends TypeComparator<X>> constructor = 
comparatorClass.getConstructor(boolean.class);
+                       return constructor.newInstance(ascendingOrder);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not initialize 
comparator " + comparatorClass.getName(), e);
+               }
+       }
+
+       public static LocalTimeTypeInfo getInfoFor(Class type) {
+               checkNotNull(type);
+
+               if (type == LocalDate.class) {
+                       return LOCAL_DATE;
+               }
+               else if (type == LocalTime.class) {
+                       return LOCAL_TIME;
+               }
+               else if (type == LocalDateTime.class) {
+                       return LOCAL_DATE_TIME;
+               }
+               return null;
+       }
+
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index 8e7538a..ede4396 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -44,6 +44,9 @@ import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -151,6 +154,20 @@ public class Types {
         */
        public static final TypeInformation<Timestamp> SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
 
+       /**
+        * Returns type information for {@link java.time.LocalDate}. Supports a 
null value.
+        */
+       public static final TypeInformation<LocalDate> LOCAL_DATE = 
LocalTimeTypeInfo.LOCAL_DATE;
+
+       /**
+        * Returns type information for {@link java.time.LocalTime}. Supports a 
null value.
+        */
+       public static final TypeInformation<LocalTime> LOCAL_TIME = 
LocalTimeTypeInfo.LOCAL_TIME;
+
+       /**
+        * Returns type information for {@link java.time.LocalDateTime}. 
Supports a null value.
+        */
+       public static final TypeInformation<LocalDateTime> LOCAL_DATE_TIME = 
LocalTimeTypeInfo.LOCAL_DATE_TIME;
 
        /**
         * Returns type infomation for {@link java.time.Instant}. Supports a 
null value.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java
new file mode 100644
index 0000000..aee0053
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDate;
+
+/**
+ * This class can not extend {@link BasicTypeComparator}, because LocalDate is 
a
+ * Comparable of ChronoLocalDate instead of Comparable of LocalDate.
+ */
+@Internal
+public final class LocalDateComparator extends TypeComparator<LocalDate> 
implements Serializable {
+
+       private transient LocalDate reference;
+
+       protected final boolean ascendingComparison;
+
+       // For use by getComparators
+       @SuppressWarnings("rawtypes")
+       private final LocalDateComparator[] comparators = new 
LocalDateComparator[] {this};
+
+       public LocalDateComparator(boolean ascending) {
+               this.ascendingComparison = ascending;
+       }
+
+       @Override
+       public int hash(LocalDate value) {
+               return value.hashCode();
+       }
+
+       @Override
+       public void setReference(LocalDate toCompare) {
+               this.reference = toCompare;
+       }
+
+       @Override
+       public boolean equalToReference(LocalDate candidate) {
+               return candidate.equals(reference);
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<LocalDate> 
referencedComparator) {
+               int comp = ((LocalDateComparator) 
referencedComparator).reference.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compare(LocalDate first, LocalDate second) {
+               int cmp = first.compareTo(second);
+               return ascendingComparison ? cmp : -cmp;
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(LocalDate record, DataOutputView 
target) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public TypeComparator[] getFlatComparators() {
+               return comparators;
+       }
+
+       @Override
+       public LocalDate readWithKeyDenormalization(LocalDate reuse, 
DataInputView source) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               return compareSerializedLocalDate(firstSource, secondSource, 
ascendingComparison);
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return true;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return 6;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(LocalDate record, MemorySegment target, 
int offset, int numBytes) {
+               putNormalizedKeyLocalDate(record, target, offset, numBytes);
+       }
+
+       @Override
+       public LocalDateComparator duplicate() {
+               return new LocalDateComparator(ascendingComparison);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //                           Static Helpers for Date Comparison
+       // 
--------------------------------------------------------------------------------------------
+
+       public static int compareSerializedLocalDate(DataInputView firstSource, 
DataInputView secondSource,
+                       boolean ascendingComparison) throws IOException {
+               int cmp = firstSource.readInt() - secondSource.readInt();
+               if (cmp == 0) {
+                       cmp = firstSource.readByte() - secondSource.readByte();
+                       if (cmp == 0) {
+                               cmp = firstSource.readByte() - 
secondSource.readByte();
+                       }
+               }
+               return ascendingComparison ? cmp : -cmp;
+       }
+
+       public static void putNormalizedKeyLocalDate(LocalDate record, 
MemorySegment target, int offset, int numBytes) {
+               int year = record.getYear();
+               int unsignedYear = year - Integer.MIN_VALUE;
+               if (numBytes >= 4) {
+                       target.putIntBigEndian(offset, unsignedYear);
+                       numBytes -= 4;
+                       offset += 4;
+               } else if (numBytes > 0) {
+                       for (int i = 0; numBytes > 0; numBytes--, i++) {
+                               target.put(offset + i, (byte) (unsignedYear >>> 
((3 - i) << 3)));
+                       }
+                       return;
+               }
+
+               int month = record.getMonthValue();
+               if (numBytes > 0) {
+                       target.put(offset, (byte) (month & 0xff - 
Byte.MIN_VALUE));
+                       numBytes -= 1;
+                       offset += 1;
+               }
+
+               int day = record.getDayOfMonth();
+               if (numBytes > 0) {
+                       target.put(offset, (byte) (day & 0xff - 
Byte.MIN_VALUE));
+                       numBytes -= 1;
+                       offset += 1;
+               }
+
+               for (int i = 0; i < numBytes; i++) {
+                       target.put(offset + i, (byte) 0);
+               }
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializer.java
new file mode 100644
index 0000000..9c7225d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.LocalDate;
+
+@Internal
+public final class LocalDateSerializer extends 
TypeSerializerSingleton<LocalDate> {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final LocalDateSerializer INSTANCE = new 
LocalDateSerializer();
+       private static final LocalDate EPOCH = LocalDate.ofEpochDay(0L);
+
+       @Override
+       public boolean isImmutableType() {
+               return true;
+       }
+
+       @Override
+       public LocalDate createInstance() {
+               return EPOCH;
+       }
+
+       @Override
+       public LocalDate copy(LocalDate from) {
+               return from;
+       }
+
+       @Override
+       public LocalDate copy(LocalDate from, LocalDate reuse) {
+               return from;
+       }
+
+       @Override
+       public int getLength() {
+               return 6;
+       }
+
+       @Override
+       public void serialize(LocalDate record, DataOutputView target) throws 
IOException {
+               if (record == null) {
+                       target.writeInt(Integer.MIN_VALUE);
+                       target.writeShort(Short.MIN_VALUE);
+               } else {
+                       target.writeInt(record.getYear());
+                       target.writeByte(record.getMonthValue());
+                       target.writeByte(record.getDayOfMonth());
+               }
+       }
+
+       @Override
+       public LocalDate deserialize(DataInputView source) throws IOException {
+               final int year = source.readInt();
+               if (year == Integer.MIN_VALUE) {
+                       source.readShort();
+                       return null;
+               } else {
+                       return LocalDate.of(year, source.readByte(), 
source.readByte());
+               }
+       }
+
+       @Override
+       public LocalDate deserialize(LocalDate reuse, DataInputView source) 
throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               target.writeInt(source.readInt());
+               target.writeShort(source.readShort());
+       }
+
+       @Override
+       public TypeSerializerSnapshot<LocalDate> snapshotConfiguration() {
+               return new LocalDateSerializerSnapshot();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Serializer configuration snapshot for compatibility and format 
evolution.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final class LocalDateSerializerSnapshot extends 
SimpleTypeSerializerSnapshot<LocalDate> {
+
+               public LocalDateSerializerSnapshot() {
+                       super(() -> INSTANCE);
+               }
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparator.java
new file mode 100644
index 0000000..5364ab7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+/**
+ * This class can not extend {@link BasicTypeComparator}, because 
LocalDateTime is a
+ * Comparable of ChronoLocalDateTime instead of Comparable of LocalDateTime.
+ */
+@Internal
+public final class LocalDateTimeComparator extends 
TypeComparator<LocalDateTime> implements Serializable {
+
+       private transient LocalDateTime reference;
+
+       protected final boolean ascendingComparison;
+       protected final LocalDateComparator dateComparator;
+       protected final LocalTimeComparator timeComparator;
+
+       // For use by getComparators
+       @SuppressWarnings("rawtypes")
+       private final LocalDateTimeComparator[] comparators = new 
LocalDateTimeComparator[] {this};
+
+       public LocalDateTimeComparator(boolean ascending) {
+               this.ascendingComparison = ascending;
+               this.dateComparator = new LocalDateComparator(ascending);
+               this.timeComparator = new LocalTimeComparator(ascending);
+       }
+
+       @Override
+       public int hash(LocalDateTime value) {
+               return value.hashCode();
+       }
+
+       @Override
+       public void setReference(LocalDateTime toCompare) {
+               this.reference = toCompare;
+       }
+
+       @Override
+       public boolean equalToReference(LocalDateTime candidate) {
+               return candidate.equals(reference);
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<LocalDateTime> 
referencedComparator) {
+               int comp = ((LocalDateTimeComparator) 
referencedComparator).reference.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compare(LocalDateTime first, LocalDateTime second) {
+               int cmp = first.compareTo(second);
+               return ascendingComparison ? cmp : -cmp;
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(LocalDateTime record, 
DataOutputView target) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public TypeComparator[] getFlatComparators() {
+               return comparators;
+       }
+
+       @Override
+       public LocalDateTime readWithKeyDenormalization(LocalDateTime reuse, 
DataInputView source) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               int cmp = dateComparator.compareSerialized(firstSource, 
secondSource);
+               if (cmp == 0) {
+                       cmp = timeComparator.compareSerialized(firstSource, 
secondSource);
+               }
+               return cmp;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return true;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return dateComparator.getNormalizeKeyLen() + 
timeComparator.getNormalizeKeyLen();
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(LocalDateTime record, MemorySegment 
target, int offset, int numBytes) {
+               int dateNKLen = dateComparator.getNormalizeKeyLen();
+               if (numBytes <= dateNKLen) {
+                       dateComparator.putNormalizedKey(record.toLocalDate(), 
target, offset, numBytes);
+               } else {
+                       dateComparator.putNormalizedKey(record.toLocalDate(), 
target, offset, dateNKLen);
+                       timeComparator.putNormalizedKey(
+                                       record.toLocalTime(), target, offset + 
dateNKLen, numBytes - dateNKLen);
+               }
+       }
+
+       @Override
+       public LocalDateTimeComparator duplicate() {
+               return new LocalDateTimeComparator(ascendingComparison);
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializer.java
new file mode 100644
index 0000000..c8bd16c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+@Internal
+public final class LocalDateTimeSerializer extends 
TypeSerializerSingleton<LocalDateTime> {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final LocalDateTimeSerializer INSTANCE = new 
LocalDateTimeSerializer();
+
+       @Override
+       public boolean isImmutableType() {
+               return true;
+       }
+
+       @Override
+       public LocalDateTime createInstance() {
+               return LocalDateTime.of(
+                               LocalDateSerializer.INSTANCE.createInstance(),
+                               LocalTimeSerializer.INSTANCE.createInstance());
+       }
+
+       @Override
+       public LocalDateTime copy(LocalDateTime from) {
+               return from;
+       }
+
+       @Override
+       public LocalDateTime copy(LocalDateTime from, LocalDateTime reuse) {
+               return from;
+       }
+
+       @Override
+       public int getLength() {
+               return LocalDateSerializer.INSTANCE.getLength() + 
LocalTimeSerializer.INSTANCE.getLength();
+       }
+
+       @Override
+       public void serialize(LocalDateTime record, DataOutputView target) 
throws IOException {
+               if (record == null) {
+                       LocalDateSerializer.INSTANCE.serialize(null, target);
+                       LocalTimeSerializer.INSTANCE.serialize(null, target);
+               } else {
+                       
LocalDateSerializer.INSTANCE.serialize(record.toLocalDate(), target);
+                       
LocalTimeSerializer.INSTANCE.serialize(record.toLocalTime(), target);
+               }
+       }
+
+       @Override
+       public LocalDateTime deserialize(DataInputView source) throws 
IOException {
+               LocalDate localDate = 
LocalDateSerializer.INSTANCE.deserialize(source);
+               LocalTime localTime = 
LocalTimeSerializer.INSTANCE.deserialize(source);
+               if (localDate == null && localTime == null) {
+                       return null;
+               } else if (localDate == null || localTime == null) {
+                       throw new IOException("Exactly one of LocalDate and 
LocalTime is null.");
+               } else {
+                       return LocalDateTime.of(localDate, localTime);
+               }
+       }
+
+       @Override
+       public LocalDateTime deserialize(LocalDateTime reuse, DataInputView 
source) throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               LocalDateSerializer.INSTANCE.copy(source, target);
+               LocalTimeSerializer.INSTANCE.copy(source, target);
+       }
+
+       @Override
+       public TypeSerializerSnapshot<LocalDateTime> snapshotConfiguration() {
+               return new LocalDateTimeSerializerSnapshot();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Serializer configuration snapshot for compatibility and format 
evolution.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final class LocalDateTimeSerializerSnapshot extends 
SimpleTypeSerializerSnapshot<LocalDateTime> {
+
+               public LocalDateTimeSerializerSnapshot() {
+                       super(() -> INSTANCE);
+               }
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparator.java
new file mode 100644
index 0000000..2a92a3c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.time.LocalTime;
+
+@Internal
+public final class LocalTimeComparator extends BasicTypeComparator<LocalTime> {
+
+       private static final long serialVersionUID = 1L;
+
+       public LocalTimeComparator(boolean ascending) {
+               super(ascending);
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               return compareSerializedLocalTime(firstSource, secondSource, 
ascendingComparison);
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return true;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return 7;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(LocalTime record, MemorySegment target, 
int offset, int numBytes) {
+               putNormalizedKeyLocalTime(record, target, offset, numBytes);
+       }
+
+       @Override
+       public LocalTimeComparator duplicate() {
+               return new LocalTimeComparator(ascendingComparison);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //                           Static Helpers for Date Comparison
+       // 
--------------------------------------------------------------------------------------------
+
+       public static int compareSerializedLocalTime(DataInputView firstSource, 
DataInputView secondSource,
+                       boolean ascendingComparison) throws IOException {
+               int cmp = firstSource.readByte() - secondSource.readByte();
+               if (cmp == 0) {
+                       cmp = firstSource.readByte() - secondSource.readByte();
+                       if (cmp == 0) {
+                               cmp = firstSource.readByte() - 
secondSource.readByte();
+                               if (cmp == 0) {
+                                       cmp = firstSource.readInt() - 
secondSource.readInt();
+                               }
+                       }
+               }
+               return ascendingComparison ? cmp : -cmp;
+       }
+
+       public static void putNormalizedKeyLocalTime(LocalTime record, 
MemorySegment target, int offset, int numBytes) {
+               int hour = record.getHour();
+               if (numBytes > 0) {
+                       target.put(offset, (byte) (hour & 0xff - 
Byte.MIN_VALUE));
+                       numBytes -= 1;
+                       offset += 1;
+               }
+
+               int minute = record.getMinute();
+               if (numBytes > 0) {
+                       target.put(offset, (byte) (minute & 0xff - 
Byte.MIN_VALUE));
+                       numBytes -= 1;
+                       offset += 1;
+               }
+
+               int second = record.getSecond();
+               if (numBytes > 0) {
+                       target.put(offset, (byte) (second & 0xff - 
Byte.MIN_VALUE));
+                       numBytes -= 1;
+                       offset += 1;
+               }
+
+               int nano = record.getNano();
+               int unsignedNano = nano - Integer.MIN_VALUE;
+               if (numBytes >= 4) {
+                       target.putIntBigEndian(offset, unsignedNano);
+                       numBytes -= 4;
+                       offset += 4;
+               } else if (numBytes > 0) {
+                       for (int i = 0; numBytes > 0; numBytes--, i++) {
+                               target.put(offset + i, (byte) (unsignedNano >>> 
((3 - i) << 3)));
+                       }
+                       return;
+               }
+
+               for (int i = 0; i < numBytes; i++) {
+                       target.put(offset + i, (byte) 0);
+               }
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializer.java
new file mode 100644
index 0000000..366d051
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializer.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.LocalTime;
+
+@Internal
+public final class LocalTimeSerializer extends 
TypeSerializerSingleton<LocalTime> {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final LocalTimeSerializer INSTANCE = new 
LocalTimeSerializer();
+
+       @Override
+       public boolean isImmutableType() {
+               return true;
+       }
+
+       @Override
+       public LocalTime createInstance() {
+               return LocalTime.MIDNIGHT;
+       }
+
+       @Override
+       public LocalTime copy(LocalTime from) {
+               return from;
+       }
+
+       @Override
+       public LocalTime copy(LocalTime from, LocalTime reuse) {
+               return from;
+       }
+
+       @Override
+       public int getLength() {
+               return 7;
+       }
+
+       @Override
+       public void serialize(LocalTime record, DataOutputView target) throws 
IOException {
+               if (record == null) {
+                       target.writeByte(Byte.MIN_VALUE);
+                       target.writeShort(Short.MIN_VALUE);
+                       target.writeInt(Integer.MIN_VALUE);
+               } else {
+                       target.writeByte(record.getHour());
+                       target.writeByte(record.getMinute());
+                       target.writeByte(record.getSecond());
+                       target.writeInt(record.getNano());
+               }
+       }
+
+       @Override
+       public LocalTime deserialize(DataInputView source) throws IOException {
+               final byte hour = source.readByte();
+               if (hour == Byte.MIN_VALUE) {
+                       source.readShort();
+                       source.readInt();
+                       return null;
+               } else {
+                       return LocalTime.of(hour, source.readByte(), 
source.readByte(), source.readInt());
+               }
+       }
+
+       @Override
+       public LocalTime deserialize(LocalTime reuse, DataInputView source) 
throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               target.writeByte(source.readByte());
+               target.writeShort(source.readShort());
+               target.writeInt(source.readInt());
+       }
+
+       @Override
+       public TypeSerializerSnapshot<LocalTime> snapshotConfiguration() {
+               return new LocalTimeSerializerSnapshot();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Serializer configuration snapshot for compatibility and format 
evolution.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final class LocalTimeSerializerSnapshot extends 
SimpleTypeSerializerSnapshot<LocalTime> {
+
+               public LocalTimeSerializerSnapshot() {
+                       super(() -> INSTANCE);
+               }
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfoTest.java
new file mode 100644
index 0000000..4602deb
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfoTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/**
+ * Test for {@link SqlTimeTypeInfo}.
+ */
+public class LocalTimeTypeInfoTest extends 
TypeInformationTestBase<LocalTimeTypeInfo<?>> {
+
+       @Override
+       protected LocalTimeTypeInfo<?>[] getTestData() {
+               return new LocalTimeTypeInfo<?>[] {
+                               LocalTimeTypeInfo.LOCAL_DATE,
+                               LocalTimeTypeInfo.LOCAL_TIME,
+                               LocalTimeTypeInfo.LOCAL_DATE_TIME
+               };
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateComparatorTest.java
new file mode 100644
index 0000000..fd0fee7
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateComparatorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalDate;
+
+public class LocalDateComparatorTest extends ComparatorTestBase<LocalDate> {
+
+       @Override
+       protected TypeComparator<LocalDate> createComparator(boolean ascending) 
{
+               return new LocalDateComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<LocalDate> createSerializer() {
+               return new LocalDateSerializer();
+       }
+
+       @Override
+       protected LocalDate[] getSortedTestData() {
+               return new LocalDate[] {
+                               LocalDate.of(0, 1, 1),
+                               LocalDate.of(1970, 1, 1),
+                               LocalDate.of(1990, 10, 14),
+                               LocalDate.of(2013, 8, 12),
+                               LocalDate.of(2040, 5, 12)
+               };
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializerTest.java
new file mode 100644
index 0000000..bbafeb8
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalDate;
+
+/**
+ * A test for the {@link LocalDateSerializer}.
+ */
+public class LocalDateSerializerTest extends SerializerTestBase<LocalDate> {
+
+       @Override
+       protected TypeSerializer<LocalDate> createSerializer() {
+               return new LocalDateSerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return 6;
+       }
+
+       @Override
+       protected Class<LocalDate> getTypeClass() {
+               return LocalDate.class;
+       }
+
+       @Override
+       protected LocalDate[] getTestData() {
+               return new LocalDate[] {
+                       LocalDate.of(0, 1, 1),
+                       LocalDate.of(1970, 1, 1),
+                       LocalDate.of(1990, 10, 14),
+                       LocalDate.of(2013, 8, 12),
+                       LocalDate.of(2040, 5, 12)
+               };
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparatorTest.java
new file mode 100644
index 0000000..2c455ef
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparatorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalDateTime;
+
+public class LocalDateTimeComparatorTest extends 
ComparatorTestBase<LocalDateTime> {
+
+       @Override
+       protected TypeComparator<LocalDateTime> createComparator(boolean 
ascending) {
+               return new LocalDateTimeComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<LocalDateTime> createSerializer() {
+               return new LocalDateTimeSerializer();
+       }
+
+       @Override
+       protected LocalDateTime[] getSortedTestData() {
+               return new LocalDateTime[] {
+                               LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0),
+                               LocalDateTime.of(1990, 10, 14, 2, 42, 25, 
123_000_000),
+                               LocalDateTime.of(1990, 10, 14, 2, 42, 25, 
123_000_001),
+                               LocalDateTime.of(1990, 10, 14, 2, 42, 25, 
123_000_002),
+                               LocalDateTime.of(2013, 8, 12, 14, 15, 59, 
478_000_000),
+                               LocalDateTime.of(2013, 8, 12, 14, 15, 59, 
479_000_000),
+                               LocalDateTime.of(2040, 5, 12, 18, 0, 45, 
999_000_000)
+               };
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializerTest.java
new file mode 100644
index 0000000..fbf1ab8
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalDateTime;
+
+/**
+ * A test for the {@link LocalDateTimeSerializer}.
+ */
+public class LocalDateTimeSerializerTest extends 
SerializerTestBase<LocalDateTime> {
+
+       @Override
+       protected TypeSerializer<LocalDateTime> createSerializer() {
+               return new LocalDateTimeSerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return 13;
+       }
+
+       @Override
+       protected Class<LocalDateTime> getTypeClass() {
+               return LocalDateTime.class;
+       }
+
+       @Override
+       protected LocalDateTime[] getTestData() {
+               return new LocalDateTime[] {
+                       LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0),
+                       LocalDateTime.of(1990, 10, 14, 2, 42, 25, 123_000_000),
+                       LocalDateTime.of(1990, 10, 14, 2, 42, 25, 123_000_001),
+                       LocalDateTime.of(1990, 10, 14, 2, 42, 25, 123_000_002),
+                       LocalDateTime.of(2013, 8, 12, 14, 15, 59, 478_000_000),
+                       LocalDateTime.of(2013, 8, 12, 14, 15, 59, 479_000_000),
+                       LocalDateTime.of(2040, 5, 12, 18, 0, 45, 999_000_000)
+               };
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparatorTest.java
new file mode 100644
index 0000000..c0599a9
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparatorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalTime;
+
+public class LocalTimeComparatorTest extends ComparatorTestBase<LocalTime> {
+
+       @Override
+       protected TypeComparator<LocalTime> createComparator(boolean ascending) 
{
+               return new LocalTimeComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<LocalTime> createSerializer() {
+               return new LocalTimeSerializer();
+       }
+
+       @Override
+       protected LocalTime[] getSortedTestData() {
+               return new LocalTime[] {
+                               LocalTime.of(0, 0, 0),
+                               LocalTime.of(2, 42, 25),
+                               LocalTime.of(14, 15, 59),
+                               LocalTime.of(18, 0, 45)
+               };
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializerTest.java
new file mode 100644
index 0000000..5187952
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalTime;
+
+/**
+ * A test for the {@link LocalTimeSerializer}.
+ */
+public class LocalTimeSerializerTest extends SerializerTestBase<LocalTime> {
+
+       @Override
+       protected TypeSerializer<LocalTime> createSerializer() {
+               return new LocalTimeSerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return 7;
+       }
+
+       @Override
+       protected Class<LocalTime> getTypeClass() {
+               return LocalTime.class;
+       }
+
+       @Override
+       protected LocalTime[] getTestData() {
+               return new LocalTime[] {
+                       LocalTime.of(0, 0, 0),
+                       LocalTime.of(2, 42, 25),
+                       LocalTime.of(14, 15, 59),
+                       LocalTime.of(18, 0, 45)
+               };
+       }
+}
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
index a4ec6ed..fbf61c8 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
@@ -157,6 +157,21 @@ object Types {
   val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp] = JTypes.SQL_TIMESTAMP
 
   /**
+    * Returns type information for [[java.time.LocalDate]]. Supports a null 
value.
+    */
+  val LOCAL_DATE: TypeInformation[java.time.LocalDate] = JTypes.LOCAL_DATE
+
+  /**
+    * Returns type information for [[java.time.LocalTime]]. Supports a null 
value.
+    */
+  val LOCAL_TIME: TypeInformation[java.time.LocalTime] = JTypes.LOCAL_TIME
+
+  /**
+    * Returns type information for [[java.time.LocalDateTime]]. Supports a 
null value.
+    */
+  val LOCAL_DATE_TIME: TypeInformation[java.time.LocalDateTime] = 
JTypes.LOCAL_DATE_TIME
+
+  /**
     * Returns type information for [[java.time.Instant]]. Supports a null 
value.
     */
   val INSTANT: TypeInformation[java.time.Instant] = JTypes.INSTANT

Reply via email to