This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d11607  [FLINK-12253][table-common] Introduce a logical type skeleton
1d11607 is described below

commit 1d11607c732b5cc6cdf40b76e28e9812bec0fab6
Author: Timo Walther <[email protected]>
AuthorDate: Tue Apr 23 13:15:34 2019 +0200

    [FLINK-12253][table-common] Introduce a logical type skeleton
---
 .../flink/table/types/logical/LogicalType.java     | 212 +++++++++++++++++++++
 .../table/types/logical/LogicalTypeFamily.java     |  60 ++++++
 .../flink/table/types/logical/LogicalTypeRoot.java | 167 ++++++++++++++++
 .../table/types/logical/LogicalTypeVisitor.java    |  33 ++++
 .../apache/flink/table/types/LogicalTypesTest.java | 125 ++++++++++++
 5 files changed, 597 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
new file mode 100644
index 0000000..6ea827c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A logical type that describes the data type of a value. It does not imply a 
concrete physical
+ * representation for transmission or storage but defines the boundaries 
between JVM-based languages
+ * and the table ecosystem.
+ *
+ * <p>The definition of a logical type is similar to the SQL standard's "data 
type" terminology but
+ * also contains information about the nullability of a value for efficient 
handling of scalar
+ * expressions.
+ *
+ * <p>Subclasses of this class define characteristics of built-in or 
user-defined types.
+ *
+ * <p>Instances of this class describe the fully parameterized, immutable type 
with additional
+ * information such as numeric precision or expected length.
+ *
+ * <p>NOTE: A logical type is just a description of a type, a planner or 
runtime might not support
+ * every type in every logical precision yet!
+ */
+@PublicEvolving
+public abstract class LogicalType implements Serializable {
+
+       private final boolean isNullable;
+
+       private final LogicalTypeRoot typeRoot;
+
+       public LogicalType(boolean isNullable, LogicalTypeRoot typeRoot) {
+               this.isNullable = isNullable;
+               this.typeRoot = Preconditions.checkNotNull(typeRoot);
+       }
+
+       /**
+        * Returns whether a value of this type can be {@code null}.
+        */
+       public boolean isNullable() {
+               return isNullable;
+       }
+
+       /**
+        * Returns the root of this type. It is an essential description 
without additional parameters.
+        */
+       public LogicalTypeRoot getTypeRoot() {
+               return typeRoot;
+       }
+
+       /**
+        * Returns a deep copy of this type with possibly different nullability.
+        *
+        * @param isNullable the intended nullability of the copied type
+        * @return a deep copy
+        */
+       public abstract LogicalType copy(boolean isNullable);
+
+       /**
+        * Returns a deep copy of this type. It requires an implementation of 
{@link #copy(boolean)}.
+        *
+        * @return a deep copy
+        */
+       public final LogicalType copy() {
+               return copy(isNullable);
+       }
+
+       /**
+        * Returns a string that fully serializes this instance. The serialized 
string can be used for
+        * transmitting or persisting a type.
+        *
+        * @return detailed string for transmission or persistence
+        */
+       public abstract String asSerializableString();
+
+       /**
+        * Returns a string that summarizes this type for printing to a 
console. An implementation might
+        * shorten long names or skips very specific properties.
+        *
+        * <p>Use {@link #asSerializableString()} for a type string that fully 
serializes
+        * this instance.
+        *
+        * @return summary string of this type for debugging purposes
+        */
+       public String asSummaryString() {
+               return asSerializableString();
+       }
+
+       /**
+        * Returns whether an instance of the given class can be represented as 
a value of this logical
+        * type when entering the table ecosystem. This method helps for the 
interoperability between
+        * JVM-based languages and the relational type system.
+        *
+        * <p>A supported conversion directly maps an input class to a logical 
type without loss of
+        * precision or type widening.
+        *
+        * <p>For example, {@code java.lang.Long} or {@code long} can be used 
as input for {@code BIGINT}
+        * independent of the set nullability.
+        *
+        * @param clazz input class to be converted into this logical type
+        * @return flag that indicates if instances of this class can be used 
as input into the table
+        * ecosystem
+        * @see #getDefaultConversion()
+        */
+       public abstract boolean supportsInputConversion(Class<?> clazz);
+
+       /**
+        * Returns whether a value of this logical type can be represented as 
an instance of the given
+        * class when leaving the table ecosystem. This method helps for the 
interoperability between
+        * JVM-based languages and the relational type system.
+        *
+        * <p>A supported conversion directly maps a logical type to an output 
class without loss of
+        * precision or type widening.
+        *
+        * <p>For example, {@code java.lang.Long} or {@code long} can be used 
as output for {@code BIGINT}
+        * if the type is not nullable. If the type is nullable, only {@code 
java.lang.Long} can represent
+        * this.
+        *
+        * @param clazz output class to be converted from this logical type
+        * @return flag that indicates if instances of this class can be used 
as output from the table
+        * ecosystem
+        * @see #getDefaultConversion()
+        */
+       public abstract boolean supportsOutputConversion(Class<?> clazz);
+
+       /**
+        * Returns the default conversion class. A value of this logical type 
is expected to be an instance
+        * of the given class when entering or is represented as an instance of 
the given class when
+        * leaving the table ecosystem if no other conversion has been 
specified.
+        *
+        * <p>For example, {@code java.lang.Long} is the default input and 
output for {@code BIGINT}.
+        *
+        * @return default class to represent values of this logical type
+        * @see #supportsInputConversion(Class)
+        * @see #supportsOutputConversion(Class)
+        */
+       public abstract Class<?> getDefaultConversion();
+
+       public abstract List<LogicalType> getChildren();
+
+       public abstract <R> R accept(LogicalTypeVisitor<R> visitor);
+
+       @Override
+       public String toString() {
+               return asSummaryString();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               LogicalType that = (LogicalType) o;
+               return isNullable == that.isNullable && typeRoot == 
that.typeRoot;
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(isNullable, typeRoot);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       protected String withNullability(String format, Object... params) {
+               if (!isNullable) {
+                       return String.format(format + " NOT NULL", params);
+               }
+               return String.format(format, params);
+       }
+
+       protected static Set<String> conversionSet(String... elements) {
+               return new HashSet<>(Arrays.asList(elements));
+       }
+
+       protected static String escapeBackticks(String s) {
+               return s.replace("`", "``");
+       }
+
+       protected static String escapeSingleQuotes(String s) {
+               return s.replace("'", "''");
+       }
+
+       protected static String escapeIdentifier(String s) {
+               return "`" + escapeBackticks(s) + "`";
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java
new file mode 100644
index 0000000..0e817b4
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration of logical type families for clustering {@link 
LogicalTypeRoot}s into categories.
+ *
+ * <p>The enumeration is very close to the SQL standard in terms of naming and 
completeness. However,
+ * it reflects just a subset of the evolving standard and contains some 
extensions (indicated
+ * by {@code EXTENSION}).
+ */
+@PublicEvolving
+public enum LogicalTypeFamily {
+
+       PREDEFINED,
+
+       CONSTRUCTED,
+
+       USER_DEFINED,
+
+       CHARACTER_STRING,
+
+       BINARY_STRING,
+
+       NUMERIC,
+
+       EXACT_NUMERIC,
+
+       APPROXIMATE_NUMERIC,
+
+       DATETIME,
+
+       TIME,
+
+       TIMESTAMP,
+
+       INTERVAL,
+
+       COLLECTION,
+
+       EXTENSION
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
new file mode 100644
index 0000000..d17443b
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * An enumeration of logical type roots containing static information about 
logical data types.
+ *
+ * <p>A root is an essential description of a {@link LogicalType} without 
additional parameters. For
+ * example, a parameterized logical type {@code DECIMAL(12,3)} possesses all 
characteristics of its
+ * root {@code DECIMAL}. Additionally, a logical type root enables efficient 
comparision during the
+ * evaluation of types.
+ *
+ * <p>The enumeration is very close to the SQL standard in terms of naming and 
completeness. However,
+ * it reflects just a subset of the evolving standard and contains some 
extensions (such as {@code NULL}
+ * or {@code ANY}).
+ *
+ * <p>See the type-implementing classes for a more detailed description of 
each type.
+ */
+@PublicEvolving
+public enum LogicalTypeRoot {
+
+       CHAR(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.CHARACTER_STRING),
+
+       VARCHAR(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.CHARACTER_STRING),
+
+       BOOLEAN(
+               LogicalTypeFamily.PREDEFINED),
+
+       BINARY(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.BINARY_STRING),
+
+       VARBINARY(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.BINARY_STRING),
+
+       DECIMAL(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.NUMERIC,
+               LogicalTypeFamily.EXACT_NUMERIC),
+
+       TINYINT(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.NUMERIC,
+               LogicalTypeFamily.EXACT_NUMERIC),
+
+       SMALLINT(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.NUMERIC,
+               LogicalTypeFamily.EXACT_NUMERIC),
+
+       INTEGER(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.NUMERIC,
+               LogicalTypeFamily.EXACT_NUMERIC),
+
+       BIGINT(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.NUMERIC,
+               LogicalTypeFamily.EXACT_NUMERIC),
+
+       FLOAT(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.NUMERIC,
+               LogicalTypeFamily.APPROXIMATE_NUMERIC),
+
+       DOUBLE(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.NUMERIC,
+               LogicalTypeFamily.APPROXIMATE_NUMERIC),
+
+       DATE(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.DATETIME),
+
+       TIME_WITHOUT_TIME_ZONE(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.DATETIME),
+
+       TIMESTAMP_WITHOUT_TIME_ZONE(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.DATETIME,
+               LogicalTypeFamily.TIMESTAMP),
+
+       TIMESTAMP_WITH_TIME_ZONE(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.DATETIME,
+               LogicalTypeFamily.TIMESTAMP),
+
+       TIMESTAMP_WITH_LOCAL_TIME_ZONE(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.DATETIME,
+               LogicalTypeFamily.TIMESTAMP,
+               LogicalTypeFamily.EXTENSION),
+
+       INTERVAL_YEAR_MONTH(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.INTERVAL),
+
+       INTERVAL_DAY_TIME(
+               LogicalTypeFamily.PREDEFINED,
+               LogicalTypeFamily.INTERVAL),
+
+       ARRAY(
+               LogicalTypeFamily.CONSTRUCTED,
+               LogicalTypeFamily.COLLECTION),
+
+       MULTISET(
+               LogicalTypeFamily.CONSTRUCTED,
+               LogicalTypeFamily.COLLECTION),
+
+       MAP(
+               LogicalTypeFamily.CONSTRUCTED,
+               LogicalTypeFamily.COLLECTION,
+               LogicalTypeFamily.EXTENSION),
+
+       ROW(
+               LogicalTypeFamily.CONSTRUCTED),
+
+       DISTINCT_TYPE(
+               LogicalTypeFamily.USER_DEFINED),
+
+       STRUCTURED_TYPE(
+               LogicalTypeFamily.USER_DEFINED),
+
+       NULL(
+               LogicalTypeFamily.EXTENSION),
+
+       ANY(
+               LogicalTypeFamily.EXTENSION);
+
+       private final Set<LogicalTypeFamily> families;
+
+       LogicalTypeRoot(LogicalTypeFamily firstFamily, LogicalTypeFamily... 
otherFamilies) {
+               this.families = 
Collections.unmodifiableSet(EnumSet.of(firstFamily, otherFamilies));
+       }
+
+       public Set<LogicalTypeFamily> getFamilies() {
+               return families;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
new file mode 100644
index 0000000..272ea91
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The visitor definition of {@link LogicalType}. The visitor transforms a 
logical type into
+ * instances of {@code R}.
+ *
+ * @param <R> result type
+ */
+@PublicEvolving
+public interface LogicalTypeVisitor<R> {
+
+       R visit(LogicalType other);
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
new file mode 100644
index 0000000..83a515f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Assert;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for subclasses of {@link 
org.apache.flink.table.types.logical.LogicalType}.
+ */
+public class LogicalTypesTest {
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static void testAll(
+               LogicalType nullableType,
+               String serializableString,
+               String summaryString,
+               Class[] supportedInputClasses,
+               Class[] supportedOutputClasses,
+               LogicalType[] children,
+               LogicalType otherType) {
+
+               testEquality(nullableType, otherType);
+
+               testNullability(nullableType);
+
+               testJavaSerializability(nullableType);
+
+               testStringSerializability(nullableType, serializableString);
+
+               testStringSummary(nullableType, summaryString);
+
+               testConversions(nullableType, supportedInputClasses, 
supportedOutputClasses);
+
+               testChildren(nullableType, children);
+       }
+
+       private static void testEquality(LogicalType nullableType, LogicalType 
otherType) {
+               assertTrue(nullableType.isNullable());
+
+               assertEquals(nullableType, nullableType);
+               assertEquals(nullableType.hashCode(), nullableType.hashCode());
+
+               assertEquals(nullableType, nullableType.copy());
+
+               assertNotEquals(nullableType, otherType);
+               assertNotEquals(nullableType.hashCode(), otherType.hashCode());
+       }
+
+       private static void testNullability(LogicalType nullableType) {
+               final LogicalType notNullInstance = nullableType.copy(false);
+
+               assertNotEquals(nullableType, notNullInstance);
+
+               assertFalse(notNullInstance.isNullable());
+       }
+
+       private static void testJavaSerializability(LogicalType 
serializableType) {
+               try {
+                       final LogicalType deserializedInstance = 
InstantiationUtil.deserializeObject(
+                               
InstantiationUtil.serializeObject(serializableType),
+                               LogicalTypesTest.class.getClassLoader());
+
+                       assertEquals(serializableType, deserializedInstance);
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private static void testStringSerializability(LogicalType 
serializableType, String serializableString) {
+               Assert.assertEquals(serializableString, 
serializableType.asSerializableString());
+       }
+
+       private static void testStringSummary(LogicalType type, String 
summaryString) {
+               Assert.assertEquals(summaryString, type.asSummaryString());
+       }
+
+       private static void testConversions(LogicalType type, Class[] inputs, 
Class[] outputs) {
+               for (Class<?> clazz : inputs) {
+                       assertTrue(type.supportsInputConversion(clazz));
+               }
+
+               for (Class<?> clazz : outputs) {
+                       assertTrue(type.supportsOutputConversion(clazz));
+               }
+
+               
assertTrue(type.supportsInputConversion(type.getDefaultConversion()));
+
+               
assertTrue(type.supportsOutputConversion(type.getDefaultConversion()));
+
+               
assertFalse(type.supportsOutputConversion(LogicalTypesTest.class));
+
+               
assertFalse(type.supportsInputConversion(LogicalTypesTest.class));
+       }
+
+       private static void testChildren(LogicalType type, LogicalType[] 
children) {
+               assertEquals(Arrays.asList(children), type.getChildren());
+       }
+}

Reply via email to