This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6ba70409d2ddcbd12e9958dbe8c9ba0f4c01542 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Fri Dec 13 09:40:25 2019 +0100 [hotfix][table-common] Add utility for expanding a composite type to a schema. --- .../flink/table/types/utils/DataTypeUtils.java | 98 +++++++++++++ .../flink/table/types/utils/DataTypeUtilsTest.java | 153 +++++++++++++++++++++ 2 files changed, 251 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java index 4975e6d..0c25db9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java @@ -19,6 +19,9 @@ package org.apache.flink.table.types.utils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.CollectionDataType; import org.apache.flink.table.types.DataType; @@ -34,9 +37,12 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.StructuredType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.flink.util.Preconditions; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -73,6 +79,31 @@ public final class DataTypeUtils { return newType; } + /** + * Expands a composite {@link DataType} to a corresponding {@link TableSchema}. Useful for + * flattening a column or mapping a physical to logical type of a table source + * + * <p>Throws an exception for a non composite type. You can use + * {@link LogicalTypeChecks#isCompositeType(LogicalType)} to check that. + * + * <p>It does not expand an atomic type on purpose, because that operation depends on the + * context. E.g. in case of a {@code FLATTEN} function such operation is not allowed, whereas + * when mapping a physical type to logical the field name should be derived from the logical schema. + * + * @param dataType Data type to expand. Must be a composite type. + * @return A corresponding table schema. + */ + public static TableSchema expandCompositeTypeToSchema(DataType dataType) { + if (dataType instanceof FieldsDataType) { + return expandCompositeType((FieldsDataType) dataType); + } else if (dataType.getLogicalType() instanceof LegacyTypeInformationType && + dataType.getLogicalType().getTypeRoot() == LogicalTypeRoot.STRUCTURED_TYPE) { + return expandLegacyCompositeType(dataType); + } + + throw new IllegalArgumentException("Expected a composite type"); + } + private DataTypeUtils() { // no instantiation } @@ -152,4 +183,71 @@ public final class DataTypeUtils { return transformation.transform(new KeyValueDataType(newLogicalType, newKeyType, newValueType)); } } + + private static TableSchema expandCompositeType(FieldsDataType dataType) { + Map<String, DataType> fieldDataTypes = dataType.getFieldDataTypes(); + return dataType.getLogicalType().accept(new LogicalTypeDefaultVisitor<TableSchema>() { + @Override + public TableSchema visit(RowType rowType) { + return expandRowType(rowType, fieldDataTypes); + } + + @Override + public TableSchema visit(StructuredType structuredType) { + return expandStructuredType(structuredType, fieldDataTypes); + } + + @Override + public TableSchema visit(DistinctType distinctType) { + return distinctType.getSourceType().accept(this); + } + + @Override + protected TableSchema defaultMethod(LogicalType logicalType) { + throw new IllegalArgumentException("Expected a composite type"); + } + }); + } + + private static TableSchema expandLegacyCompositeType(DataType dataType) { + // legacy composite type + CompositeType<?> compositeType = (CompositeType<?>) ((LegacyTypeInformationType<?>) dataType.getLogicalType()) + .getTypeInformation(); + + String[] fieldNames = compositeType.getFieldNames(); + TypeInformation<?>[] fieldTypes = Arrays.stream(fieldNames) + .map(compositeType::getTypeAt) + .toArray(TypeInformation[]::new); + + return new TableSchema(fieldNames, fieldTypes); + } + + private static TableSchema expandStructuredType( + StructuredType structuredType, + Map<String, DataType> fieldDataTypes) { + String[] fieldNames = structuredType.getAttributes() + .stream() + .map(StructuredType.StructuredAttribute::getName) + .toArray(String[]::new); + DataType[] dataTypes = structuredType.getAttributes() + .stream() + .map(attr -> fieldDataTypes.get(attr.getName())) + .toArray(DataType[]::new); + return TableSchema.builder() + .fields(fieldNames, dataTypes) + .build(); + } + + private static TableSchema expandRowType( + RowType rowType, + Map<String, DataType> fieldDataTypes) { + String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + DataType[] dataTypes = rowType.getFields() + .stream() + .map(field -> fieldDataTypes.get(field.getName())) + .toArray(DataType[]::new); + return TableSchema.builder() + .fields(fieldNames, dataTypes) + .build(); + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java new file mode 100644 index 0000000..cc62c76 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.utils; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.StructuredType; + +import org.junit.Test; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link DataTypeUtils}. + */ +public class DataTypeUtilsTest { + @Test + public void testExpandRowType() { + DataType dataType = ROW( + FIELD("f0", INT()), + FIELD("f1", STRING()), + FIELD("f2", TIMESTAMP(5).bridgedTo(Timestamp.class)), + FIELD("f3", TIMESTAMP(3))); + TableSchema schema = DataTypeUtils.expandCompositeTypeToSchema(dataType); + + assertThat( + schema, + equalTo( + TableSchema.builder() + .field("f0", INT()) + .field("f1", STRING()) + .field("f2", TIMESTAMP(5).bridgedTo(Timestamp.class)) + .field("f3", TIMESTAMP(3).bridgedTo(LocalDateTime.class)) + .build())); + } + + @Test + public void testExpandLegacyCompositeType() { + DataType dataType = TypeConversions.fromLegacyInfoToDataType(new TupleTypeInfo<>( + Types.STRING, + Types.INT, + Types.SQL_TIMESTAMP)); + TableSchema schema = DataTypeUtils.expandCompositeTypeToSchema(dataType); + + assertThat( + schema, + equalTo( + TableSchema.builder() + .field("f0", STRING()) + .field("f1", INT()) + .field("f2", TIMESTAMP(3).bridgedTo(Timestamp.class)) + .build())); + } + + @Test + public void testExpandStructuredType() { + StructuredType logicalType = StructuredType.newBuilder(ObjectIdentifier.of("catalog", "database", "type")) + .attributes(Arrays.asList( + new StructuredType.StructuredAttribute("f0", DataTypes.INT().getLogicalType()), + new StructuredType.StructuredAttribute("f1", DataTypes.STRING().getLogicalType()), + new StructuredType.StructuredAttribute("f2", DataTypes.TIMESTAMP(5).getLogicalType()), + new StructuredType.StructuredAttribute("f3", DataTypes.TIMESTAMP(3).getLogicalType()) + )) + .build(); + + Map<String, DataType> dataTypes = new HashMap<>(); + dataTypes.put("f0", DataTypes.INT()); + dataTypes.put("f1", DataTypes.STRING()); + dataTypes.put("f2", DataTypes.TIMESTAMP(5).bridgedTo(Timestamp.class)); + dataTypes.put("f3", DataTypes.TIMESTAMP(3)); + FieldsDataType dataType = new FieldsDataType(logicalType, dataTypes); + + TableSchema schema = DataTypeUtils.expandCompositeTypeToSchema(dataType); + + assertThat( + schema, + equalTo( + TableSchema.builder() + .field("f0", INT()) + .field("f1", STRING()) + .field("f2", TIMESTAMP(5).bridgedTo(Timestamp.class)) + .field("f3", TIMESTAMP(3).bridgedTo(LocalDateTime.class)) + .build())); + } + + @Test + public void testExpandDistinctType() { + FieldsDataType dataType = (FieldsDataType) ROW( + FIELD("f0", INT()), + FIELD("f1", STRING()), + FIELD("f2", TIMESTAMP(5).bridgedTo(Timestamp.class)), + FIELD("f3", TIMESTAMP(3))); + + LogicalType originalLogicalType = dataType.getLogicalType(); + DistinctType distinctLogicalType = DistinctType.newBuilder( + ObjectIdentifier.of("catalog", "database", "type"), + originalLogicalType) + .build(); + DataType distinctDataType = new FieldsDataType(distinctLogicalType, dataType.getFieldDataTypes()); + + TableSchema schema = DataTypeUtils.expandCompositeTypeToSchema(distinctDataType); + + assertThat( + schema, + equalTo( + TableSchema.builder() + .field("f0", INT()) + .field("f1", STRING()) + .field("f2", TIMESTAMP(5).bridgedTo(Timestamp.class)) + .field("f3", TIMESTAMP(3).bridgedTo(LocalDateTime.class)) + .build())); + } + + @Test(expected = IllegalArgumentException.class) + public void testExpandThrowExceptionOnAtomicType() { + DataTypeUtils.expandCompositeTypeToSchema(DataTypes.TIMESTAMP()); + } +}
