This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6ba70409d2ddcbd12e9958dbe8c9ba0f4c01542
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Dec 13 09:40:25 2019 +0100

    [hotfix][table-common] Add utility for expanding a composite type to a 
schema.
---
 .../flink/table/types/utils/DataTypeUtils.java     |  98 +++++++++++++
 .../flink/table/types/utils/DataTypeUtilsTest.java | 153 +++++++++++++++++++++
 2 files changed, 251 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 4975e6d..0c25db9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -19,6 +19,9 @@
 package org.apache.flink.table.types.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
@@ -34,9 +37,12 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.StructuredType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -73,6 +79,31 @@ public final class DataTypeUtils {
                return newType;
        }
 
+       /**
+        * Expands a composite {@link DataType} to a corresponding {@link 
TableSchema}. Useful for
+        * flattening a column or mapping a physical to logical type of a table 
source
+        *
+        * <p>Throws an exception for a non composite type. You can use
+        * {@link LogicalTypeChecks#isCompositeType(LogicalType)} to check that.
+        *
+        * <p>It does not expand an atomic type on purpose, because that 
operation depends on the
+        * context. E.g. in case of a {@code FLATTEN} function such operation 
is not allowed, whereas
+        * when mapping a physical type to logical the field name should be 
derived from the logical schema.
+        *
+        * @param dataType Data type to expand. Must be a composite type.
+        * @return A corresponding table schema.
+        */
+       public static TableSchema expandCompositeTypeToSchema(DataType 
dataType) {
+               if (dataType instanceof FieldsDataType) {
+                       return expandCompositeType((FieldsDataType) dataType);
+               } else if (dataType.getLogicalType() instanceof 
LegacyTypeInformationType &&
+                       dataType.getLogicalType().getTypeRoot() == 
LogicalTypeRoot.STRUCTURED_TYPE) {
+                       return expandLegacyCompositeType(dataType);
+               }
+
+               throw new IllegalArgumentException("Expected a composite type");
+       }
+
        private DataTypeUtils() {
                // no instantiation
        }
@@ -152,4 +183,71 @@ public final class DataTypeUtils {
                        return transformation.transform(new 
KeyValueDataType(newLogicalType, newKeyType, newValueType));
                }
        }
+
+       private static TableSchema expandCompositeType(FieldsDataType dataType) 
{
+               Map<String, DataType> fieldDataTypes = 
dataType.getFieldDataTypes();
+               return dataType.getLogicalType().accept(new 
LogicalTypeDefaultVisitor<TableSchema>() {
+                       @Override
+                       public TableSchema visit(RowType rowType) {
+                               return expandRowType(rowType, fieldDataTypes);
+                       }
+
+                       @Override
+                       public TableSchema visit(StructuredType structuredType) 
{
+                               return expandStructuredType(structuredType, 
fieldDataTypes);
+                       }
+
+                       @Override
+                       public TableSchema visit(DistinctType distinctType) {
+                               return 
distinctType.getSourceType().accept(this);
+                       }
+
+                       @Override
+                       protected TableSchema defaultMethod(LogicalType 
logicalType) {
+                               throw new IllegalArgumentException("Expected a 
composite type");
+                       }
+               });
+       }
+
+       private static TableSchema expandLegacyCompositeType(DataType dataType) 
{
+               // legacy composite type
+               CompositeType<?> compositeType = (CompositeType<?>) 
((LegacyTypeInformationType<?>) dataType.getLogicalType())
+                       .getTypeInformation();
+
+               String[] fieldNames = compositeType.getFieldNames();
+               TypeInformation<?>[] fieldTypes = Arrays.stream(fieldNames)
+                       .map(compositeType::getTypeAt)
+                       .toArray(TypeInformation[]::new);
+
+               return new TableSchema(fieldNames, fieldTypes);
+       }
+
+       private static TableSchema expandStructuredType(
+               StructuredType structuredType,
+               Map<String, DataType> fieldDataTypes) {
+               String[] fieldNames = structuredType.getAttributes()
+                       .stream()
+                       .map(StructuredType.StructuredAttribute::getName)
+                       .toArray(String[]::new);
+               DataType[] dataTypes = structuredType.getAttributes()
+                       .stream()
+                       .map(attr -> fieldDataTypes.get(attr.getName()))
+                       .toArray(DataType[]::new);
+               return TableSchema.builder()
+                       .fields(fieldNames, dataTypes)
+                       .build();
+       }
+
+       private static TableSchema expandRowType(
+               RowType rowType,
+               Map<String, DataType> fieldDataTypes) {
+               String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+               DataType[] dataTypes = rowType.getFields()
+                       .stream()
+                       .map(field -> fieldDataTypes.get(field.getName()))
+                       .toArray(DataType[]::new);
+               return TableSchema.builder()
+                       .fields(fieldNames, dataTypes)
+                       .build();
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
new file mode 100644
index 0000000..cc62c76
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link DataTypeUtils}.
+ */
+public class DataTypeUtilsTest {
+       @Test
+       public void testExpandRowType() {
+               DataType dataType = ROW(
+                       FIELD("f0", INT()),
+                       FIELD("f1", STRING()),
+                       FIELD("f2", TIMESTAMP(5).bridgedTo(Timestamp.class)),
+                       FIELD("f3", TIMESTAMP(3)));
+               TableSchema schema = 
DataTypeUtils.expandCompositeTypeToSchema(dataType);
+
+               assertThat(
+                       schema,
+                       equalTo(
+                               TableSchema.builder()
+                                       .field("f0", INT())
+                                       .field("f1", STRING())
+                                       .field("f2", 
TIMESTAMP(5).bridgedTo(Timestamp.class))
+                                       .field("f3", 
TIMESTAMP(3).bridgedTo(LocalDateTime.class))
+                                       .build()));
+       }
+
+       @Test
+       public void testExpandLegacyCompositeType() {
+               DataType dataType = 
TypeConversions.fromLegacyInfoToDataType(new TupleTypeInfo<>(
+                       Types.STRING,
+                       Types.INT,
+                       Types.SQL_TIMESTAMP));
+               TableSchema schema = 
DataTypeUtils.expandCompositeTypeToSchema(dataType);
+
+               assertThat(
+                       schema,
+                       equalTo(
+                               TableSchema.builder()
+                                       .field("f0", STRING())
+                                       .field("f1", INT())
+                                       .field("f2", 
TIMESTAMP(3).bridgedTo(Timestamp.class))
+                                       .build()));
+       }
+
+       @Test
+       public void testExpandStructuredType() {
+               StructuredType logicalType = 
StructuredType.newBuilder(ObjectIdentifier.of("catalog", "database", "type"))
+                       .attributes(Arrays.asList(
+                               new StructuredType.StructuredAttribute("f0", 
DataTypes.INT().getLogicalType()),
+                               new StructuredType.StructuredAttribute("f1", 
DataTypes.STRING().getLogicalType()),
+                               new StructuredType.StructuredAttribute("f2", 
DataTypes.TIMESTAMP(5).getLogicalType()),
+                               new StructuredType.StructuredAttribute("f3", 
DataTypes.TIMESTAMP(3).getLogicalType())
+                       ))
+                       .build();
+
+               Map<String, DataType> dataTypes = new HashMap<>();
+               dataTypes.put("f0", DataTypes.INT());
+               dataTypes.put("f1", DataTypes.STRING());
+               dataTypes.put("f2", 
DataTypes.TIMESTAMP(5).bridgedTo(Timestamp.class));
+               dataTypes.put("f3", DataTypes.TIMESTAMP(3));
+               FieldsDataType dataType = new FieldsDataType(logicalType, 
dataTypes);
+
+               TableSchema schema = 
DataTypeUtils.expandCompositeTypeToSchema(dataType);
+
+               assertThat(
+                       schema,
+                       equalTo(
+                               TableSchema.builder()
+                                       .field("f0", INT())
+                                       .field("f1", STRING())
+                                       .field("f2", 
TIMESTAMP(5).bridgedTo(Timestamp.class))
+                                       .field("f3", 
TIMESTAMP(3).bridgedTo(LocalDateTime.class))
+                                       .build()));
+       }
+
+       @Test
+       public void testExpandDistinctType() {
+               FieldsDataType dataType = (FieldsDataType) ROW(
+                       FIELD("f0", INT()),
+                       FIELD("f1", STRING()),
+                       FIELD("f2", TIMESTAMP(5).bridgedTo(Timestamp.class)),
+                       FIELD("f3", TIMESTAMP(3)));
+
+               LogicalType originalLogicalType = dataType.getLogicalType();
+               DistinctType distinctLogicalType = DistinctType.newBuilder(
+                       ObjectIdentifier.of("catalog", "database", "type"),
+                       originalLogicalType)
+                       .build();
+               DataType distinctDataType = new 
FieldsDataType(distinctLogicalType, dataType.getFieldDataTypes());
+
+               TableSchema schema = 
DataTypeUtils.expandCompositeTypeToSchema(distinctDataType);
+
+               assertThat(
+                       schema,
+                       equalTo(
+                               TableSchema.builder()
+                                       .field("f0", INT())
+                                       .field("f1", STRING())
+                                       .field("f2", 
TIMESTAMP(5).bridgedTo(Timestamp.class))
+                                       .field("f3", 
TIMESTAMP(3).bridgedTo(LocalDateTime.class))
+                                       .build()));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testExpandThrowExceptionOnAtomicType() {
+               
DataTypeUtils.expandCompositeTypeToSchema(DataTypes.TIMESTAMP());
+       }
+}

Reply via email to