This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d255bef5daca2aab238ed9e00ddf14a00193a5a4
Author: Timo Walther <[email protected]>
AuthorDate: Mon May 18 11:30:27 2020 +0200

    [FLINK-17541][table] Support inline structured types
    
    This enables inline structured types in the Blink planner. Inline
    structured types are extracted (e.g. in UDFs) and don't need to be
    registered in a catalog. This finalizes FLIP-65 for scalar and
    table functions because existing functions can be migrated with a
    replacement to the new type system.
    
    Structured type support should still be declared as experimental
    until we have more tests and can also deal with structured types
    in sources and sinks.
    
    This closes #12228.
---
 .../flink/table/types/logical/StructuredType.java  |  19 ++-
 .../types/logical/utils/LogicalTypeCasts.java      |   2 +-
 .../planner/plan/schema/StructuredRelDataType.java | 137 +++++++++++++++++++++
 .../table/planner/calcite/FlinkTypeFactory.scala   |  10 ++
 .../planner/runtime/stream/sql/FunctionITCase.java | 106 ++++++++++++++++
 5 files changed, 270 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
index d8aa593..2c93438 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types.logical;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
@@ -44,15 +45,14 @@ import java.util.stream.Collectors;
  * by an {@link ObjectIdentifier} or anonymously defined, unregistered types 
(usually reflectively
  * extracted) that are identified by an implementation {@link Class}.
  *
+ * <h1>Logical properties</h1>
+ *
  * <p>A structured type can declare a super type and allows single inheritance 
for more complex type
  * hierarchies, similar to JVM-based languages.
  *
  * <p>A structured type can be declared {@code final} for preventing further 
inheritance (default
  * behavior) or {@code not final} for allowing subtypes.
  *
- * <p>A structured type must offer a default constructor with zero arguments 
or a full constructor
- * that assigns all attributes.
- *
  * <p>A structured type can be declared {@code not instantiable} if a more 
specific type is
  * required or {@code instantiable} if instances can be created from this type 
(default behavior).
  *
@@ -61,6 +61,19 @@ import java.util.stream.Collectors;
  *
  * <p>NOTE: Compared to the SQL standard, this class is incomplete. We might 
add new features such
  * as method declarations in the future. Also ordering is not supported yet.
+ *
+ * <h1>Physical properties</h1>
+ *
+ * <p>A structured type can be defined fully logically (e.g. by using a {@code 
CREATE TYPE} DDL). The
+ * implementation class is optional and only used at the edges of the table 
ecosystem (e.g. when bridging
+ * to a function or connector). Serialization and equality ({@code 
hashCode/equals}) are handled by
+ * the runtime based on the logical type. In other words: {@code 
hashCode/equals} of an implementation
+ * class are not used. Custom equality, casting logic, and further overloaded 
operators will be supported
+ * once we allow defining methods on structured types.
+ *
+ * <p>An implementation class must offer a default constructor with zero 
arguments or a full constructor
+ * that assigns all attributes. Other physical properties such as the 
conversion classes of attributes
+ * are defined by a {@link DataType} when a structured type is used.
  */
 @PublicEvolving
 public final class StructuredType extends UserDefinedType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index f22f9be..ea63e0a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -305,7 +305,7 @@ public final class LogicalTypeCasts {
                } else if (targetRoot == DISTINCT_TYPE) {
                        return supportsCasting(sourceType, ((DistinctType) 
targetType).getSourceType(), allowExplicit);
                } else if (sourceRoot == STRUCTURED_TYPE || targetRoot == 
STRUCTURED_TYPE) {
-                       // TODO structured types are not supported yet
+                       // inheritance is not supported yet, so structured type 
must be fully equal
                        return false;
                } else if (sourceRoot == NULL) {
                        // null can be cast to an arbitrary type
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
new file mode 100644
index 0000000..4593a28
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.schema;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeComparability;
+import org.apache.calcite.rel.type.RelDataTypeFamily;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.ObjectSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@link RelDataType} representation of a {@link StructuredType}.
+ *
+ * <p>It extends {@link ObjectSqlType} for preserving the original logical 
type (including an optional
+ * implementation class) and supporting anonymous/unregistered structured 
types from Table API.
+ */
+@Internal
+public final class StructuredRelDataType extends ObjectSqlType {
+
+       private final StructuredType structuredType;
+
+       private StructuredRelDataType(StructuredType structuredType, 
List<RelDataTypeField> fields) {
+               super(
+                       SqlTypeName.STRUCTURED,
+                       createSqlIdentifier(structuredType),
+                       structuredType.isNullable(),
+                       fields,
+                       createRelDataTypeComparability(structuredType));
+               this.structuredType = structuredType;
+               computeDigest(); // recompute digest
+       }
+
+       public static StructuredRelDataType create(FlinkTypeFactory factory, 
StructuredType structuredType) {
+               final List<RelDataTypeField> fields = new ArrayList<>();
+               for (int i = 0; i < structuredType.getAttributes().size(); i++) 
{
+                       final StructuredAttribute attribute = 
structuredType.getAttributes().get(i);
+                       final RelDataTypeField field = new RelDataTypeFieldImpl(
+                               attribute.getName(),
+                               i,
+                               
factory.createFieldTypeFromLogicalType(attribute.getType()));
+                       fields.add(field);
+               }
+               return new StructuredRelDataType(structuredType, fields);
+       }
+
+       public StructuredType getStructuredType() {
+               return structuredType;
+       }
+
+       public StructuredRelDataType createWithNullability(boolean nullable) {
+               if (nullable == isNullable()) {
+                       return this;
+               }
+               return new StructuredRelDataType((StructuredType) 
structuredType.copy(nullable), fieldList);
+       }
+
+       @Override
+       public RelDataTypeFamily getFamily() {
+               return this; // every user-defined type is its own family
+       }
+
+       @Override
+       protected void generateTypeString(StringBuilder sb, boolean withDetail) 
{
+               // called by super constructor
+               if (structuredType == null) {
+                       return;
+               }
+               if (withDetail) {
+                       if (structuredType.getObjectIdentifier().isPresent()) {
+                               
sb.append(structuredType.asSerializableString());
+                       }
+                       // in case of inline structured type we are using a 
temporary identifier
+                       else {
+                               sb.append(structuredType.asSummaryString());
+                               if (structuredType.isNullable()) {
+                                       sb.append(" NOT NULL");
+                               }
+                       }
+               } else {
+                       sb.append(structuredType.asSummaryString());
+               }
+       }
+
+       @Override
+       protected void computeDigest() {
+               final StringBuilder sb = new StringBuilder();
+               generateTypeString(sb, true);
+               digest = sb.toString();
+       }
+
+       private static SqlIdentifier createSqlIdentifier(StructuredType 
structuredType) {
+               return structuredType.getObjectIdentifier()
+                       .map(i -> new SqlIdentifier(i.toList(), 
SqlParserPos.ZERO))
+                       .orElseGet(() -> new 
SqlIdentifier(structuredType.asSummaryString(), SqlParserPos.ZERO));
+       }
+
+       private static RelDataTypeComparability 
createRelDataTypeComparability(StructuredType structuredType) {
+               switch (structuredType.getComparision()) {
+                       case EQUALS:
+                               return RelDataTypeComparability.UNORDERED;
+                       case FULL:
+                               return RelDataTypeComparability.ALL;
+                       case NONE:
+                               return RelDataTypeComparability.NONE;
+                       default:
+                               throw new IllegalArgumentException("Unsupported 
structured type comparision.");
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 61a0d0e..57d64f8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -107,6 +107,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
           // fields are not expanded in "SELECT *"
           StructKind.PEEK_FIELDS_NO_EXPAND)
 
+      case LogicalTypeRoot.STRUCTURED_TYPE =>
+        val structuredType = t.asInstanceOf[StructuredType]
+        StructuredRelDataType.create(this, structuredType)
+
       case LogicalTypeRoot.ARRAY =>
         val arrayType = t.asInstanceOf[ArrayType]
         
createArrayType(createFieldTypeFromLogicalType(arrayType.getElementType), -1)
@@ -326,6 +330,9 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
       case raw: RawRelDataType =>
         raw.createWithNullability(isNullable)
 
+      case structured: StructuredRelDataType =>
+        structured.createWithNullability(isNullable)
+
       case generic: GenericRelDataType =>
         new GenericRelDataType(generic.genericType, isNullable, typeSystem)
 
@@ -521,6 +528,9 @@ object FlinkTypeFactory {
       case ROW if relDataType.isInstanceOf[RelRecordType] =>
         toLogicalRowType(relDataType)
 
+      case STRUCTURED if relDataType.isInstanceOf[StructuredRelDataType] =>
+        relDataType.asInstanceOf[StructuredRelDataType].getStructuredType
+
       case MULTISET => new 
MultisetType(toLogicalType(relDataType.getComponentType))
 
       case ARRAY => new ArrayType(toLogicalType(relDataType.getComponentType))
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index 319e942..c8929ac 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -697,6 +697,37 @@ public class FunctionITCase extends StreamingTestBase {
        }
 
        @Test
+       public void testStructuredScalarFunction() {
+               final List<Row> sourceData = Arrays.asList(
+                       Row.of("Bob", 42),
+                       Row.of("Alice", 12),
+                       Row.of(null, 0)
+               );
+
+               final List<Row> sinkData = Arrays.asList(
+                       Row.of("Bob 42", "Tyler"),
+                       Row.of("Alice 12", "Tyler"),
+                       Row.of("<<null>>", "Tyler")
+               );
+
+               TestCollectionTableFactory.reset();
+               TestCollectionTableFactory.initData(sourceData);
+
+               tEnv().executeSql("CREATE TABLE SourceTable(s STRING, i INT NOT 
NULL) WITH ('connector' = 'COLLECTION')");
+               tEnv().executeSql("CREATE TABLE SinkTable(s1 STRING, s2 STRING) 
WITH ('connector' = 'COLLECTION')");
+
+               
tEnv().createTemporarySystemFunction("StructuredScalarFunction", 
StructuredScalarFunction.class);
+               execInsertSqlAndWaitResult(
+                       "INSERT INTO SinkTable " +
+                       "SELECT " +
+                       "  StructuredScalarFunction(StructuredScalarFunction(s, 
i)), " +
+                       "  StructuredScalarFunction('Tyler', 27).name " +
+                       "FROM SourceTable");
+
+               assertThat(TestCollectionTableFactory.getResult(), 
equalTo(sinkData));
+       }
+
+       @Test
        public void testInvalidCustomScalarFunction() {
                tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH 
('connector' = 'COLLECTION')");
 
@@ -745,6 +776,32 @@ public class FunctionITCase extends StreamingTestBase {
        }
 
        @Test
+       public void testStructuredTableFunction() {
+               final List<Row> sourceData = Arrays.asList(
+                       Row.of("Bob", 42),
+                       Row.of("Alice", 12),
+                       Row.of(null, 0)
+               );
+
+               final List<Row> sinkData = Arrays.asList(
+                       Row.of("Bob", 42),
+                       Row.of("Alice", 12),
+                       Row.of(null, 0)
+               );
+
+               TestCollectionTableFactory.reset();
+               TestCollectionTableFactory.initData(sourceData);
+
+               tEnv().executeSql("CREATE TABLE SourceTable(s STRING, i INT NOT 
NULL) WITH ('connector' = 'COLLECTION')");
+               tEnv().executeSql("CREATE TABLE SinkTable(s STRING, i INT NOT 
NULL) WITH ('connector' = 'COLLECTION')");
+
+               tEnv().createTemporarySystemFunction("StructuredTableFunction", 
StructuredTableFunction.class);
+               execInsertSqlAndWaitResult("INSERT INTO SinkTable SELECT 
t.name, t.age FROM SourceTable, LATERAL TABLE(StructuredTableFunction(s, i)) 
t");
+
+               assertThat(TestCollectionTableFactory.getResult(), 
equalTo(sinkData));
+       }
+
+       @Test
        public void testDynamicTableFunction() throws Exception {
                final Row[] sinkData = new Row[]{
                        Row.of("Test is a string"),
@@ -1008,4 +1065,53 @@ public class FunctionITCase extends StreamingTestBase {
                                .build();
                }
        }
+
+       /**
+        * Function that creates and consumes structured types.
+        */
+       public static class StructuredScalarFunction extends ScalarFunction {
+               public StructuredUser eval(String name, int age) {
+                       if (name == null) {
+                               return null;
+                       }
+                       return new StructuredUser(name, age);
+               }
+
+               public String eval(StructuredUser user) {
+                       if (user == null) {
+                               return "<<null>>";
+                       }
+                       return user.toString();
+               }
+       }
+
+       /**
+        * Table function that returns a structured type.
+        */
+       public static class StructuredTableFunction extends 
TableFunction<StructuredUser> {
+               public void eval(String name, int age) {
+                       if (name == null) {
+                               collect(null);
+                       }
+                       collect(new StructuredUser(name, age));
+               }
+       }
+
+       /**
+        * Example POJO for structured type.
+        */
+       public static class StructuredUser {
+               public final String name;
+               public final int age;
+
+               public StructuredUser(String name, int age) {
+                       this.name = name;
+                       this.age = age;
+               }
+
+               @Override
+               public String toString() {
+                       return name + " " + age;
+               }
+       }
 }

Reply via email to