[
https://issues.apache.org/jira/browse/FLINK-10127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588084#comment-16588084
]
ASF GitHub Bot commented on FLINK-10127:
----------------------------------------
asfgit closed pull request #6549: [FLINK-10127] Add Instant to basic types
URL: https://github.com/apache/flink/pull/6549
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index f19865ea0f0..00c4c311fc9 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -40,6 +40,8 @@
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatComparator;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.InstantComparator;
+import org.apache.flink.api.common.typeutils.base.InstantSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongComparator;
@@ -53,6 +55,7 @@
import java.lang.reflect.Constructor;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
@@ -83,6 +86,8 @@
public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new
BasicTypeInfo<>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, null);
public static final BasicTypeInfo<BigInteger> BIG_INT_TYPE_INFO = new
BasicTypeInfo<>(BigInteger.class, new Class<?>[]{}, BigIntSerializer.INSTANCE,
BigIntComparator.class);
public static final BasicTypeInfo<BigDecimal> BIG_DEC_TYPE_INFO = new
BasicTypeInfo<>(BigDecimal.class, new Class<?>[]{}, BigDecSerializer.INSTANCE,
BigDecComparator.class);
+ public static final BasicTypeInfo<Instant> INSTANT_TYPE_INFO = new
BasicTypeInfo<>(Instant.class, new Class<?>[]{}, InstantSerializer.INSTANCE,
InstantComparator.class);
+
//
--------------------------------------------------------------------------------------------
@@ -250,5 +255,6 @@ public String toString() {
TYPES.put(void.class, VOID_TYPE_INFO);
TYPES.put(BigInteger.class, BIG_INT_TYPE_INFO);
TYPES.put(BigDecimal.class, BIG_DEC_TYPE_INFO);
+ TYPES.put(Instant.class, INSTANT_TYPE_INFO);
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index 0140e9c8049..8e7538a97ed 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -43,6 +43,7 @@
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -150,6 +151,12 @@
*/
public static final TypeInformation<Timestamp> SQL_TIMESTAMP =
SqlTimeTypeInfo.TIMESTAMP;
+
+ /**
+ * Returns type infomation for {@link java.time.Instant}. Supports a
null value.
+ */
+ public static final TypeInformation<Instant> INSTANT =
BasicTypeInfo.INSTANT_TYPE_INFO;
+
//CHECKSTYLE.OFF: MethodName
/**
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java
new file mode 100644
index 00000000000..28a7be0eaca
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.time.Instant;
+
+/**
+ * Comparator for comparing Java Instant.
+ */
+@Internal
+public final class InstantComparator extends BasicTypeComparator<Instant>{
+
+ private static final long serialVersionUID = 1L;
+ private static final long SECONDS_MIN_VALUE =
Instant.MIN.getEpochSecond();
+
+ public InstantComparator(boolean ascending) {
+ super(ascending);
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView
secondSource) throws IOException {
+ final long lSeconds = firstSource.readLong();
+ final long rSeconds = secondSource.readLong();
+ final int comp;
+ if (lSeconds == rSeconds) {
+ final int lNanos = firstSource.readInt();
+ final int rNanos = secondSource.readInt();
+ comp = (lNanos < rNanos ? -1 : (lNanos == rNanos ? 0 :
1));
+ } else {
+ comp = lSeconds < rSeconds ? -1 : 1;
+ }
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return InstantSerializer.SECONDS_BYTES +
InstantSerializer.NANOS_BYTES;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(Instant record, MemorySegment target, int
offset, int numBytes) {
+ final int secondsBytes = InstantSerializer.SECONDS_BYTES;
+ final long normalizedSeconds = record.getEpochSecond() -
SECONDS_MIN_VALUE;
+ if (numBytes >= secondsBytes) {
+ target.putLongBigEndian(offset, normalizedSeconds);
+ offset += secondsBytes;
+ numBytes -= secondsBytes;
+
+ final int nanosBytes = InstantSerializer.NANOS_BYTES;
+ if (numBytes >= nanosBytes) {
+ target.putIntBigEndian(offset,
record.getNano());
+ offset += nanosBytes;
+ numBytes -= nanosBytes;
+ for (int i = 0; i < numBytes; i++) {
+ target.put(offset + i, (byte) 0);
+ }
+ } else {
+ final int nanos = record.getNano();
+ for (int i = 0; i < numBytes; i++) {
+ target.put(offset + i, (byte) (nanos
>>> ((3 - i) << 3)));
+ }
+ }
+ } else {
+ for (int i = 0; i < numBytes; i++) {
+ target.put(offset + i, (byte)
(normalizedSeconds >>> ((7 - i) << 3)));
+ }
+ }
+ }
+
+ @Override
+ public TypeComparator<Instant> duplicate() {
+ return new InstantComparator(ascendingComparison);
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
new file mode 100644
index 00000000000..2476ef6665b
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.Instant;
+
+/**
+ * Serializer for serializing/deserializing Instant values including null
values.
+ */
+@Internal
+public final class InstantSerializer extends TypeSerializerSingleton<Instant> {
+ static final int SECONDS_BYTES = Long.BYTES;
+ static final int NANOS_BYTES = Integer.BYTES;
+
+ private static final long NULL_SECONDS = Long.MIN_VALUE;
+ //Nanos of normal Instant is between 0 and 999,999,999,
+ //therefore we can use Integer.MIN_VALUE to represent NULL Instant
+ //regardless supported range of seconds
+ private static final int NULL_NANOS = Integer.MIN_VALUE;
+
+ public static final InstantSerializer INSTANCE = new
InstantSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public Instant createInstance() {
+ return Instant.EPOCH;
+ }
+
+ @Override
+ public Instant copy(Instant from) {
+ return from;
+ }
+
+ @Override
+ public Instant copy(Instant from, Instant reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return SECONDS_BYTES + NANOS_BYTES;
+ }
+
+ @Override
+ public void serialize(Instant record, DataOutputView target) throws
IOException {
+ if (record == null) {
+ target.writeLong(NULL_SECONDS);
+ target.writeInt(NULL_NANOS);
+ } else {
+ target.writeLong(record.getEpochSecond());
+ target.writeInt(record.getNano());
+ }
+ }
+
+ @Override
+ public Instant deserialize(DataInputView source) throws IOException {
+ final long seconds = source.readLong();
+ final int nanos = source.readInt();
+ if (seconds == NULL_SECONDS && nanos == NULL_NANOS) {
+ return null;
+ }
+ return Instant.ofEpochSecond(seconds, nanos);
+ }
+
+ @Override
+ public Instant deserialize(Instant reuse, DataInputView source) throws
IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ target.writeLong(source.readLong());
+ target.writeInt(source.readInt());
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof InstantSerializer;
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java
new file mode 100644
index 00000000000..010df046edf
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.Instant;
+
+/**
+ * A test for the {@link InstantComparator}.
+ */
+public class InstantComparatorTest extends ComparatorTestBase<Instant> {
+
+ @Override
+ protected TypeComparator<Instant> createComparator(boolean ascending) {
+ return new InstantComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<Instant> createSerializer() {
+ return new InstantSerializer();
+ }
+
+ @Override
+ protected Instant[] getSortedTestData() {
+ return new Instant[] {
+ Instant.EPOCH,
+ Instant.parse("1970-01-01T00:00:00.001Z"),
+ Instant.parse("1990-10-14T02:42:25.123Z"),
+ Instant.parse("1990-10-14T02:42:25.123000001Z"),
+ Instant.parse("1990-10-14T02:42:25.123000002Z"),
+ Instant.parse("2013-08-12T14:15:59.478Z"),
+ Instant.parse("2013-08-12T14:15:59.479Z"),
+ Instant.parse("2040-05-12T18:00:45.999Z"),
+ Instant.MAX
+ };
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java
new file mode 100644
index 00000000000..7c14c6988b0
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * A test for the {@link InstantSerializer}.
+ */
+public class InstantSerializerTest extends SerializerTestBase<Instant> {
+ @Override
+ protected TypeSerializer<Instant> createSerializer() {
+ return new InstantSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 12;
+ }
+
+ @Override
+ protected Class<Instant> getTypeClass() {
+ return Instant.class;
+ }
+
+
+ private static long rndSeconds(Random rnd) {
+ return (long) (Instant.MIN.getEpochSecond()
+ + rnd.nextDouble() * (Instant.MAX.getEpochSecond() -
Instant.MIN.getEpochSecond()));
+ }
+
+ private static int rndNanos(Random rnd) {
+ return (int) (rnd.nextDouble() * 999999999);
+ }
+
+ @Override
+ protected Instant[] getTestData() {
+ final Random rnd = new Random(874597969123412341L);
+
+ return new Instant[] {
+ Instant.EPOCH, Instant.MIN, Instant.MAX,
+ Instant.ofEpochSecond(rndSeconds(rnd), rndNanos(rnd)),
+ Instant.ofEpochSecond(1534135584,949495),
+ Instant.ofEpochSecond(56090783)
+ };
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
index 5707701e070..bbd75d71307 100644
--- a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
@@ -24,6 +24,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.time.Instant;
import java.util.Date;
import static org.junit.Assert.assertEquals;
@@ -33,7 +34,7 @@
static Class<?>[] classes = {String.class, Integer.class,
Boolean.class, Byte.class,
Short.class, Long.class, Float.class, Double.class,
Character.class, Date.class,
- Void.class, BigInteger.class, BigDecimal.class};
+ Void.class, BigInteger.class, BigDecimal.class, Instant.class};
@Test
public void testBasicTypeInfoEquality() {
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
index 4ce9b0f09df..a4ec6edb537 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
@@ -156,6 +156,11 @@ object Types {
*/
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp] = JTypes.SQL_TIMESTAMP
+ /**
+ * Returns type information for [[java.time.Instant]]. Supports a null
value.
+ */
+ val INSTANT: TypeInformation[java.time.Instant] = JTypes.INSTANT
+
/**
* Returns type information for [[org.apache.flink.types.Row]] with fields
of the given types.
* A row itself must not be null.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add TypeInformation and serializers for JDK8 Instant
> -----------------------------------------------------
>
> Key: FLINK-10127
> URL: https://issues.apache.org/jira/browse/FLINK-10127
> Project: Flink
> Issue Type: Improvement
> Reporter: Alexey Trenikhin
> Priority: Minor
> Labels: pull-request-available
>
> Currently Flink's basic types include all Java primitives and their boxed
> form, plus {{void}}, {{String}}, {{Date}}, {{BigDecimal}}, and
> {{BigInteger}}. New JDK8 Instance type should be added as well
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)