This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 463cedd [FLINK-12850][core] Introduce
LocalDate/LocalTime/LocalDateTime TypeInfo
463cedd is described below
commit 463cedddec1bd55545f6e3e39ada04229ef5ceb5
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jun 17 14:00:07 2019 +0800
[FLINK-12850][core] Introduce LocalDate/LocalTime/LocalDateTime TypeInfo
This closes #8757
---
.../api/common/typeinfo/LocalTimeTypeInfo.java | 171 +++++++++++++++++++
.../apache/flink/api/common/typeinfo/Types.java | 17 ++
.../common/typeutils/base/LocalDateComparator.java | 187 +++++++++++++++++++++
.../common/typeutils/base/LocalDateSerializer.java | 114 +++++++++++++
.../typeutils/base/LocalDateTimeComparator.java | 153 +++++++++++++++++
.../typeutils/base/LocalDateTimeSerializer.java | 118 +++++++++++++
.../common/typeutils/base/LocalTimeComparator.java | 125 ++++++++++++++
.../common/typeutils/base/LocalTimeSerializer.java | 117 +++++++++++++
.../api/common/typeinfo/LocalTimeTypeInfoTest.java | 36 ++++
.../typeutils/base/LocalDateComparatorTest.java | 49 ++++++
.../typeutils/base/LocalDateSerializerTest.java | 56 ++++++
.../base/LocalDateTimeComparatorTest.java | 51 ++++++
.../base/LocalDateTimeSerializerTest.java | 58 +++++++
.../typeutils/base/LocalTimeComparatorTest.java | 48 ++++++
.../typeutils/base/LocalTimeSerializerTest.java | 55 ++++++
.../apache/flink/api/scala/typeutils/Types.scala | 15 ++
16 files changed, 1370 insertions(+)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfo.java
new file mode 100644
index 0000000..baaa260
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfo.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateComparator;
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeComparator;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeComparator;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
+
+import java.lang.reflect.Constructor;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.Temporal;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Type information for Java LocalDate/LocalTime/LocalDateTime.
+ */
+@PublicEvolving
+public class LocalTimeTypeInfo<T extends Temporal> extends TypeInformation<T>
implements AtomicType<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final LocalTimeTypeInfo<LocalDate> LOCAL_DATE = new
LocalTimeTypeInfo<>(
+ LocalDate.class, LocalDateSerializer.INSTANCE,
LocalDateComparator.class);
+
+ public static final LocalTimeTypeInfo<LocalTime> LOCAL_TIME = new
LocalTimeTypeInfo<>(
+ LocalTime.class, LocalTimeSerializer.INSTANCE,
LocalTimeComparator.class);
+
+ public static final LocalTimeTypeInfo<LocalDateTime> LOCAL_DATE_TIME =
new LocalTimeTypeInfo<>(
+ LocalDateTime.class, LocalDateTimeSerializer.INSTANCE,
LocalDateTimeComparator.class);
+
+ //
--------------------------------------------------------------------------------------------
+
+ private final Class<T> clazz;
+
+ private final TypeSerializer<T> serializer;
+
+ private final Class<? extends TypeComparator<T>> comparatorClass;
+
+ protected LocalTimeTypeInfo(Class<T> clazz, TypeSerializer<T>
serializer, Class<? extends TypeComparator<T>> comparatorClass) {
+ this.clazz = checkNotNull(clazz);
+ this.serializer = checkNotNull(serializer);
+ this.comparatorClass = checkNotNull(comparatorClass);
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class<T> getTypeClass() {
+ return clazz;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<T> createSerializer(ExecutionConfig
executionConfig) {
+ return serializer;
+ }
+
+ @Override
+ public TypeComparator<T> createComparator(boolean sortOrderAscending,
ExecutionConfig executionConfig) {
+ return instantiateComparator(comparatorClass,
sortOrderAscending);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clazz, serializer, comparatorClass);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof LocalTimeTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof LocalTimeTypeInfo) {
+ @SuppressWarnings("unchecked")
+ LocalTimeTypeInfo<T> other = (LocalTimeTypeInfo<T>) obj;
+
+ return other.canEqual(this) &&
+ this.clazz == other.clazz &&
+ serializer.equals(other.serializer) &&
+ this.comparatorClass ==
other.comparatorClass;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return clazz.getSimpleName();
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private static <X> TypeComparator<X> instantiateComparator(Class<?
extends TypeComparator<X>> comparatorClass, boolean ascendingOrder) {
+ try {
+ Constructor<? extends TypeComparator<X>> constructor =
comparatorClass.getConstructor(boolean.class);
+ return constructor.newInstance(ascendingOrder);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not initialize
comparator " + comparatorClass.getName(), e);
+ }
+ }
+
+ public static LocalTimeTypeInfo getInfoFor(Class type) {
+ checkNotNull(type);
+
+ if (type == LocalDate.class) {
+ return LOCAL_DATE;
+ }
+ else if (type == LocalTime.class) {
+ return LOCAL_TIME;
+ }
+ else if (type == LocalDateTime.class) {
+ return LOCAL_DATE_TIME;
+ }
+ return null;
+ }
+
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index 8e7538a..ede4396 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -44,6 +44,9 @@ import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -151,6 +154,20 @@ public class Types {
*/
public static final TypeInformation<Timestamp> SQL_TIMESTAMP =
SqlTimeTypeInfo.TIMESTAMP;
+ /**
+ * Returns type information for {@link java.time.LocalDate}. Supports a
null value.
+ */
+ public static final TypeInformation<LocalDate> LOCAL_DATE =
LocalTimeTypeInfo.LOCAL_DATE;
+
+ /**
+ * Returns type information for {@link java.time.LocalTime}. Supports a
null value.
+ */
+ public static final TypeInformation<LocalTime> LOCAL_TIME =
LocalTimeTypeInfo.LOCAL_TIME;
+
+ /**
+ * Returns type information for {@link java.time.LocalDateTime}.
Supports a null value.
+ */
+ public static final TypeInformation<LocalDateTime> LOCAL_DATE_TIME =
LocalTimeTypeInfo.LOCAL_DATE_TIME;
/**
* Returns type infomation for {@link java.time.Instant}. Supports a
null value.
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java
new file mode 100644
index 0000000..aee0053
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDate;
+
+/**
+ * This class can not extend {@link BasicTypeComparator}, because LocalDate is
a
+ * Comparable of ChronoLocalDate instead of Comparable of LocalDate.
+ */
+@Internal
+public final class LocalDateComparator extends TypeComparator<LocalDate>
implements Serializable {
+
+ private transient LocalDate reference;
+
+ protected final boolean ascendingComparison;
+
+ // For use by getComparators
+ @SuppressWarnings("rawtypes")
+ private final LocalDateComparator[] comparators = new
LocalDateComparator[] {this};
+
+ public LocalDateComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(LocalDate value) {
+ return value.hashCode();
+ }
+
+ @Override
+ public void setReference(LocalDate toCompare) {
+ this.reference = toCompare;
+ }
+
+ @Override
+ public boolean equalToReference(LocalDate candidate) {
+ return candidate.equals(reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<LocalDate>
referencedComparator) {
+ int comp = ((LocalDateComparator)
referencedComparator).reference.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(LocalDate first, LocalDate second) {
+ int cmp = first.compareTo(second);
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(LocalDate record, DataOutputView
target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public TypeComparator[] getFlatComparators() {
+ return comparators;
+ }
+
+ @Override
+ public LocalDate readWithKeyDenormalization(LocalDate reuse,
DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView
secondSource) throws IOException {
+ return compareSerializedLocalDate(firstSource, secondSource,
ascendingComparison);
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return 6;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(LocalDate record, MemorySegment target,
int offset, int numBytes) {
+ putNormalizedKeyLocalDate(record, target, offset, numBytes);
+ }
+
+ @Override
+ public LocalDateComparator duplicate() {
+ return new LocalDateComparator(ascendingComparison);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Static Helpers for Date Comparison
+ //
--------------------------------------------------------------------------------------------
+
+ public static int compareSerializedLocalDate(DataInputView firstSource,
DataInputView secondSource,
+ boolean ascendingComparison) throws IOException {
+ int cmp = firstSource.readInt() - secondSource.readInt();
+ if (cmp == 0) {
+ cmp = firstSource.readByte() - secondSource.readByte();
+ if (cmp == 0) {
+ cmp = firstSource.readByte() -
secondSource.readByte();
+ }
+ }
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ public static void putNormalizedKeyLocalDate(LocalDate record,
MemorySegment target, int offset, int numBytes) {
+ int year = record.getYear();
+ int unsignedYear = year - Integer.MIN_VALUE;
+ if (numBytes >= 4) {
+ target.putIntBigEndian(offset, unsignedYear);
+ numBytes -= 4;
+ offset += 4;
+ } else if (numBytes > 0) {
+ for (int i = 0; numBytes > 0; numBytes--, i++) {
+ target.put(offset + i, (byte) (unsignedYear >>>
((3 - i) << 3)));
+ }
+ return;
+ }
+
+ int month = record.getMonthValue();
+ if (numBytes > 0) {
+ target.put(offset, (byte) (month & 0xff -
Byte.MIN_VALUE));
+ numBytes -= 1;
+ offset += 1;
+ }
+
+ int day = record.getDayOfMonth();
+ if (numBytes > 0) {
+ target.put(offset, (byte) (day & 0xff -
Byte.MIN_VALUE));
+ numBytes -= 1;
+ offset += 1;
+ }
+
+ for (int i = 0; i < numBytes; i++) {
+ target.put(offset + i, (byte) 0);
+ }
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializer.java
new file mode 100644
index 0000000..9c7225d
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.LocalDate;
+
+@Internal
+public final class LocalDateSerializer extends
TypeSerializerSingleton<LocalDate> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final LocalDateSerializer INSTANCE = new
LocalDateSerializer();
+ private static final LocalDate EPOCH = LocalDate.ofEpochDay(0L);
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public LocalDate createInstance() {
+ return EPOCH;
+ }
+
+ @Override
+ public LocalDate copy(LocalDate from) {
+ return from;
+ }
+
+ @Override
+ public LocalDate copy(LocalDate from, LocalDate reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return 6;
+ }
+
+ @Override
+ public void serialize(LocalDate record, DataOutputView target) throws
IOException {
+ if (record == null) {
+ target.writeInt(Integer.MIN_VALUE);
+ target.writeShort(Short.MIN_VALUE);
+ } else {
+ target.writeInt(record.getYear());
+ target.writeByte(record.getMonthValue());
+ target.writeByte(record.getDayOfMonth());
+ }
+ }
+
+ @Override
+ public LocalDate deserialize(DataInputView source) throws IOException {
+ final int year = source.readInt();
+ if (year == Integer.MIN_VALUE) {
+ source.readShort();
+ return null;
+ } else {
+ return LocalDate.of(year, source.readByte(),
source.readByte());
+ }
+ }
+
+ @Override
+ public LocalDate deserialize(LocalDate reuse, DataInputView source)
throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ target.writeInt(source.readInt());
+ target.writeShort(source.readShort());
+ }
+
+ @Override
+ public TypeSerializerSnapshot<LocalDate> snapshotConfiguration() {
+ return new LocalDateSerializerSnapshot();
+ }
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Serializer configuration snapshot for compatibility and format
evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class LocalDateSerializerSnapshot extends
SimpleTypeSerializerSnapshot<LocalDate> {
+
+ public LocalDateSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparator.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparator.java
new file mode 100644
index 0000000..5364ab7
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+/**
+ * This class can not extend {@link BasicTypeComparator}, because
LocalDateTime is a
+ * Comparable of ChronoLocalDateTime instead of Comparable of LocalDateTime.
+ */
+@Internal
+public final class LocalDateTimeComparator extends
TypeComparator<LocalDateTime> implements Serializable {
+
+ private transient LocalDateTime reference;
+
+ protected final boolean ascendingComparison;
+ protected final LocalDateComparator dateComparator;
+ protected final LocalTimeComparator timeComparator;
+
+ // For use by getComparators
+ @SuppressWarnings("rawtypes")
+ private final LocalDateTimeComparator[] comparators = new
LocalDateTimeComparator[] {this};
+
+ public LocalDateTimeComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ this.dateComparator = new LocalDateComparator(ascending);
+ this.timeComparator = new LocalTimeComparator(ascending);
+ }
+
+ @Override
+ public int hash(LocalDateTime value) {
+ return value.hashCode();
+ }
+
+ @Override
+ public void setReference(LocalDateTime toCompare) {
+ this.reference = toCompare;
+ }
+
+ @Override
+ public boolean equalToReference(LocalDateTime candidate) {
+ return candidate.equals(reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<LocalDateTime>
referencedComparator) {
+ int comp = ((LocalDateTimeComparator)
referencedComparator).reference.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(LocalDateTime first, LocalDateTime second) {
+ int cmp = first.compareTo(second);
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(LocalDateTime record,
DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public TypeComparator[] getFlatComparators() {
+ return comparators;
+ }
+
+ @Override
+ public LocalDateTime readWithKeyDenormalization(LocalDateTime reuse,
DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView
secondSource) throws IOException {
+ int cmp = dateComparator.compareSerialized(firstSource,
secondSource);
+ if (cmp == 0) {
+ cmp = timeComparator.compareSerialized(firstSource,
secondSource);
+ }
+ return cmp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return dateComparator.getNormalizeKeyLen() +
timeComparator.getNormalizeKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(LocalDateTime record, MemorySegment
target, int offset, int numBytes) {
+ int dateNKLen = dateComparator.getNormalizeKeyLen();
+ if (numBytes <= dateNKLen) {
+ dateComparator.putNormalizedKey(record.toLocalDate(),
target, offset, numBytes);
+ } else {
+ dateComparator.putNormalizedKey(record.toLocalDate(),
target, offset, dateNKLen);
+ timeComparator.putNormalizedKey(
+ record.toLocalTime(), target, offset +
dateNKLen, numBytes - dateNKLen);
+ }
+ }
+
+ @Override
+ public LocalDateTimeComparator duplicate() {
+ return new LocalDateTimeComparator(ascendingComparison);
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializer.java
new file mode 100644
index 0000000..c8bd16c
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+@Internal
+public final class LocalDateTimeSerializer extends
TypeSerializerSingleton<LocalDateTime> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final LocalDateTimeSerializer INSTANCE = new
LocalDateTimeSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public LocalDateTime createInstance() {
+ return LocalDateTime.of(
+ LocalDateSerializer.INSTANCE.createInstance(),
+ LocalTimeSerializer.INSTANCE.createInstance());
+ }
+
+ @Override
+ public LocalDateTime copy(LocalDateTime from) {
+ return from;
+ }
+
+ @Override
+ public LocalDateTime copy(LocalDateTime from, LocalDateTime reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return LocalDateSerializer.INSTANCE.getLength() +
LocalTimeSerializer.INSTANCE.getLength();
+ }
+
+ @Override
+ public void serialize(LocalDateTime record, DataOutputView target)
throws IOException {
+ if (record == null) {
+ LocalDateSerializer.INSTANCE.serialize(null, target);
+ LocalTimeSerializer.INSTANCE.serialize(null, target);
+ } else {
+
LocalDateSerializer.INSTANCE.serialize(record.toLocalDate(), target);
+
LocalTimeSerializer.INSTANCE.serialize(record.toLocalTime(), target);
+ }
+ }
+
+ @Override
+ public LocalDateTime deserialize(DataInputView source) throws
IOException {
+ LocalDate localDate =
LocalDateSerializer.INSTANCE.deserialize(source);
+ LocalTime localTime =
LocalTimeSerializer.INSTANCE.deserialize(source);
+ if (localDate == null && localTime == null) {
+ return null;
+ } else if (localDate == null || localTime == null) {
+ throw new IOException("Exactly one of LocalDate and
LocalTime is null.");
+ } else {
+ return LocalDateTime.of(localDate, localTime);
+ }
+ }
+
+ @Override
+ public LocalDateTime deserialize(LocalDateTime reuse, DataInputView
source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ LocalDateSerializer.INSTANCE.copy(source, target);
+ LocalTimeSerializer.INSTANCE.copy(source, target);
+ }
+
+ @Override
+ public TypeSerializerSnapshot<LocalDateTime> snapshotConfiguration() {
+ return new LocalDateTimeSerializerSnapshot();
+ }
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Serializer configuration snapshot for compatibility and format
evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class LocalDateTimeSerializerSnapshot extends
SimpleTypeSerializerSnapshot<LocalDateTime> {
+
+ public LocalDateTimeSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparator.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparator.java
new file mode 100644
index 0000000..2a92a3c
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.time.LocalTime;
+
+@Internal
+public final class LocalTimeComparator extends BasicTypeComparator<LocalTime> {
+
+ private static final long serialVersionUID = 1L;
+
+ public LocalTimeComparator(boolean ascending) {
+ super(ascending);
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView
secondSource) throws IOException {
+ return compareSerializedLocalTime(firstSource, secondSource,
ascendingComparison);
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return 7;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(LocalTime record, MemorySegment target,
int offset, int numBytes) {
+ putNormalizedKeyLocalTime(record, target, offset, numBytes);
+ }
+
+ @Override
+ public LocalTimeComparator duplicate() {
+ return new LocalTimeComparator(ascendingComparison);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Static Helpers for Date Comparison
+ //
--------------------------------------------------------------------------------------------
+
+ public static int compareSerializedLocalTime(DataInputView firstSource,
DataInputView secondSource,
+ boolean ascendingComparison) throws IOException {
+ int cmp = firstSource.readByte() - secondSource.readByte();
+ if (cmp == 0) {
+ cmp = firstSource.readByte() - secondSource.readByte();
+ if (cmp == 0) {
+ cmp = firstSource.readByte() -
secondSource.readByte();
+ if (cmp == 0) {
+ cmp = firstSource.readInt() -
secondSource.readInt();
+ }
+ }
+ }
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ public static void putNormalizedKeyLocalTime(LocalTime record,
MemorySegment target, int offset, int numBytes) {
+ int hour = record.getHour();
+ if (numBytes > 0) {
+ target.put(offset, (byte) (hour & 0xff -
Byte.MIN_VALUE));
+ numBytes -= 1;
+ offset += 1;
+ }
+
+ int minute = record.getMinute();
+ if (numBytes > 0) {
+ target.put(offset, (byte) (minute & 0xff -
Byte.MIN_VALUE));
+ numBytes -= 1;
+ offset += 1;
+ }
+
+ int second = record.getSecond();
+ if (numBytes > 0) {
+ target.put(offset, (byte) (second & 0xff -
Byte.MIN_VALUE));
+ numBytes -= 1;
+ offset += 1;
+ }
+
+ int nano = record.getNano();
+ int unsignedNano = nano - Integer.MIN_VALUE;
+ if (numBytes >= 4) {
+ target.putIntBigEndian(offset, unsignedNano);
+ numBytes -= 4;
+ offset += 4;
+ } else if (numBytes > 0) {
+ for (int i = 0; numBytes > 0; numBytes--, i++) {
+ target.put(offset + i, (byte) (unsignedNano >>>
((3 - i) << 3)));
+ }
+ return;
+ }
+
+ for (int i = 0; i < numBytes; i++) {
+ target.put(offset + i, (byte) 0);
+ }
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializer.java
new file mode 100644
index 0000000..366d051
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializer.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.LocalTime;
+
+@Internal
+public final class LocalTimeSerializer extends
TypeSerializerSingleton<LocalTime> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final LocalTimeSerializer INSTANCE = new
LocalTimeSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public LocalTime createInstance() {
+ return LocalTime.MIDNIGHT;
+ }
+
+ @Override
+ public LocalTime copy(LocalTime from) {
+ return from;
+ }
+
+ @Override
+ public LocalTime copy(LocalTime from, LocalTime reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return 7;
+ }
+
+ @Override
+ public void serialize(LocalTime record, DataOutputView target) throws
IOException {
+ if (record == null) {
+ target.writeByte(Byte.MIN_VALUE);
+ target.writeShort(Short.MIN_VALUE);
+ target.writeInt(Integer.MIN_VALUE);
+ } else {
+ target.writeByte(record.getHour());
+ target.writeByte(record.getMinute());
+ target.writeByte(record.getSecond());
+ target.writeInt(record.getNano());
+ }
+ }
+
+ @Override
+ public LocalTime deserialize(DataInputView source) throws IOException {
+ final byte hour = source.readByte();
+ if (hour == Byte.MIN_VALUE) {
+ source.readShort();
+ source.readInt();
+ return null;
+ } else {
+ return LocalTime.of(hour, source.readByte(),
source.readByte(), source.readInt());
+ }
+ }
+
+ @Override
+ public LocalTime deserialize(LocalTime reuse, DataInputView source)
throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ target.writeByte(source.readByte());
+ target.writeShort(source.readShort());
+ target.writeInt(source.readInt());
+ }
+
+ @Override
+ public TypeSerializerSnapshot<LocalTime> snapshotConfiguration() {
+ return new LocalTimeSerializerSnapshot();
+ }
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Serializer configuration snapshot for compatibility and format
evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class LocalTimeSerializerSnapshot extends
SimpleTypeSerializerSnapshot<LocalTime> {
+
+ public LocalTimeSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfoTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfoTest.java
new file mode 100644
index 0000000..4602deb
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/LocalTimeTypeInfoTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/**
+ * Test for {@link SqlTimeTypeInfo}.
+ */
+public class LocalTimeTypeInfoTest extends
TypeInformationTestBase<LocalTimeTypeInfo<?>> {
+
+ @Override
+ protected LocalTimeTypeInfo<?>[] getTestData() {
+ return new LocalTimeTypeInfo<?>[] {
+ LocalTimeTypeInfo.LOCAL_DATE,
+ LocalTimeTypeInfo.LOCAL_TIME,
+ LocalTimeTypeInfo.LOCAL_DATE_TIME
+ };
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateComparatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateComparatorTest.java
new file mode 100644
index 0000000..fd0fee7
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateComparatorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalDate;
+
+public class LocalDateComparatorTest extends ComparatorTestBase<LocalDate> {
+
+ @Override
+ protected TypeComparator<LocalDate> createComparator(boolean ascending)
{
+ return new LocalDateComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<LocalDate> createSerializer() {
+ return new LocalDateSerializer();
+ }
+
+ @Override
+ protected LocalDate[] getSortedTestData() {
+ return new LocalDate[] {
+ LocalDate.of(0, 1, 1),
+ LocalDate.of(1970, 1, 1),
+ LocalDate.of(1990, 10, 14),
+ LocalDate.of(2013, 8, 12),
+ LocalDate.of(2040, 5, 12)
+ };
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializerTest.java
new file mode 100644
index 0000000..bbafeb8
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalDate;
+
+/**
+ * A test for the {@link LocalDateSerializer}.
+ */
+public class LocalDateSerializerTest extends SerializerTestBase<LocalDate> {
+
+ @Override
+ protected TypeSerializer<LocalDate> createSerializer() {
+ return new LocalDateSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 6;
+ }
+
+ @Override
+ protected Class<LocalDate> getTypeClass() {
+ return LocalDate.class;
+ }
+
+ @Override
+ protected LocalDate[] getTestData() {
+ return new LocalDate[] {
+ LocalDate.of(0, 1, 1),
+ LocalDate.of(1970, 1, 1),
+ LocalDate.of(1990, 10, 14),
+ LocalDate.of(2013, 8, 12),
+ LocalDate.of(2040, 5, 12)
+ };
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparatorTest.java
new file mode 100644
index 0000000..2c455ef
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeComparatorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalDateTime;
+
+public class LocalDateTimeComparatorTest extends
ComparatorTestBase<LocalDateTime> {
+
+ @Override
+ protected TypeComparator<LocalDateTime> createComparator(boolean
ascending) {
+ return new LocalDateTimeComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<LocalDateTime> createSerializer() {
+ return new LocalDateTimeSerializer();
+ }
+
+ @Override
+ protected LocalDateTime[] getSortedTestData() {
+ return new LocalDateTime[] {
+ LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0),
+ LocalDateTime.of(1990, 10, 14, 2, 42, 25,
123_000_000),
+ LocalDateTime.of(1990, 10, 14, 2, 42, 25,
123_000_001),
+ LocalDateTime.of(1990, 10, 14, 2, 42, 25,
123_000_002),
+ LocalDateTime.of(2013, 8, 12, 14, 15, 59,
478_000_000),
+ LocalDateTime.of(2013, 8, 12, 14, 15, 59,
479_000_000),
+ LocalDateTime.of(2040, 5, 12, 18, 0, 45,
999_000_000)
+ };
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializerTest.java
new file mode 100644
index 0000000..fbf1ab8
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalDateTime;
+
+/**
+ * A test for the {@link LocalDateTimeSerializer}.
+ */
+public class LocalDateTimeSerializerTest extends
SerializerTestBase<LocalDateTime> {
+
+ @Override
+ protected TypeSerializer<LocalDateTime> createSerializer() {
+ return new LocalDateTimeSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 13;
+ }
+
+ @Override
+ protected Class<LocalDateTime> getTypeClass() {
+ return LocalDateTime.class;
+ }
+
+ @Override
+ protected LocalDateTime[] getTestData() {
+ return new LocalDateTime[] {
+ LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0),
+ LocalDateTime.of(1990, 10, 14, 2, 42, 25, 123_000_000),
+ LocalDateTime.of(1990, 10, 14, 2, 42, 25, 123_000_001),
+ LocalDateTime.of(1990, 10, 14, 2, 42, 25, 123_000_002),
+ LocalDateTime.of(2013, 8, 12, 14, 15, 59, 478_000_000),
+ LocalDateTime.of(2013, 8, 12, 14, 15, 59, 479_000_000),
+ LocalDateTime.of(2040, 5, 12, 18, 0, 45, 999_000_000)
+ };
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparatorTest.java
new file mode 100644
index 0000000..c0599a9
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeComparatorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalTime;
+
+public class LocalTimeComparatorTest extends ComparatorTestBase<LocalTime> {
+
+ @Override
+ protected TypeComparator<LocalTime> createComparator(boolean ascending)
{
+ return new LocalTimeComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<LocalTime> createSerializer() {
+ return new LocalTimeSerializer();
+ }
+
+ @Override
+ protected LocalTime[] getSortedTestData() {
+ return new LocalTime[] {
+ LocalTime.of(0, 0, 0),
+ LocalTime.of(2, 42, 25),
+ LocalTime.of(14, 15, 59),
+ LocalTime.of(18, 0, 45)
+ };
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializerTest.java
new file mode 100644
index 0000000..5187952
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LocalTimeSerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.LocalTime;
+
+/**
+ * A test for the {@link LocalTimeSerializer}.
+ */
+public class LocalTimeSerializerTest extends SerializerTestBase<LocalTime> {
+
+ @Override
+ protected TypeSerializer<LocalTime> createSerializer() {
+ return new LocalTimeSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 7;
+ }
+
+ @Override
+ protected Class<LocalTime> getTypeClass() {
+ return LocalTime.class;
+ }
+
+ @Override
+ protected LocalTime[] getTestData() {
+ return new LocalTime[] {
+ LocalTime.of(0, 0, 0),
+ LocalTime.of(2, 42, 25),
+ LocalTime.of(14, 15, 59),
+ LocalTime.of(18, 0, 45)
+ };
+ }
+}
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
index a4ec6ed..fbf61c8 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
@@ -157,6 +157,21 @@ object Types {
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp] = JTypes.SQL_TIMESTAMP
/**
+ * Returns type information for [[java.time.LocalDate]]. Supports a null
value.
+ */
+ val LOCAL_DATE: TypeInformation[java.time.LocalDate] = JTypes.LOCAL_DATE
+
+ /**
+ * Returns type information for [[java.time.LocalTime]]. Supports a null
value.
+ */
+ val LOCAL_TIME: TypeInformation[java.time.LocalTime] = JTypes.LOCAL_TIME
+
+ /**
+ * Returns type information for [[java.time.LocalDateTime]]. Supports a
null value.
+ */
+ val LOCAL_DATE_TIME: TypeInformation[java.time.LocalDateTime] =
JTypes.LOCAL_DATE_TIME
+
+ /**
* Returns type information for [[java.time.Instant]]. Supports a null
value.
*/
val INSTANT: TypeInformation[java.time.Instant] = JTypes.INSTANT