This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 23a4782bdd0ef938562cba1d657f90dcee686e79
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Dec 13 09:43:03 2019 +0100

    [hotfix][table-common] Add utility for mapping logical fields to physical 
indices.
---
 .../apache/flink/table/utils/TypeMappingUtils.java | 295 ++++++++++++++++
 .../flink/table/utils/TypeMappingUtilsTest.java    | 373 +++++++++++++++++++++
 2 files changed, 668 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
new file mode 100644
index 0000000..3dc60ff
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sources.DefinedProctimeAttribute;
+import org.apache.flink.table.sources.DefinedRowtimeAttributes;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+
+/**
+ * Utility methods for dealing with {@link 
org.apache.flink.table.sources.TableSource}.
+ */
+@Internal
+public final class TypeMappingUtils {
+
+       /**
+        * Computes indices of physical fields corresponding to the selected 
logical fields of a {@link TableSchema}.
+        *
+        * @param logicalColumns         Logical columns that describe the 
physical type.
+        * @param physicalType           Physical type to retrieve indices from.
+        * @param nameRemapping          Additional remapping of a logical to a 
physical field name.
+        *                               TimestampExtractor works with logical 
names, but accesses physical
+        *                               fields
+        * @return Physical indices of logical fields selected with {@code 
projectedLogicalFields} mask.
+        */
+       public static int[] computePhysicalIndices(
+                       List<TableColumn> logicalColumns,
+                       DataType physicalType,
+                       Function<String, String> nameRemapping) {
+
+               Map<TableColumn, Integer> physicalIndexLookup = 
computePhysicalIndices(
+                       logicalColumns.stream(),
+                       physicalType,
+                       nameRemapping);
+
+               return 
logicalColumns.stream().mapToInt(physicalIndexLookup::get).toArray();
+       }
+
+       /**
+        * Computes indices of physical fields corresponding to the selected 
logical fields of a {@link TableSchema}.
+        *
+        * <p>It puts markers (idx < 0) for time attributes extracted from 
{@link DefinedProctimeAttribute}
+        * and {@link DefinedRowtimeAttributes}
+        *
+        * <p>{@link TypeMappingUtils#computePhysicalIndices(List, DataType, 
Function)} should be preferred. The
+        * time attribute markers should not be used anymore.
+        *
+        * @param tableSource     Used to extract {@link 
DefinedRowtimeAttributes}, {@link DefinedProctimeAttribute}
+        *                        and {@link TableSource#getProducedDataType()}.
+        * @param logicalColumns  Logical columns that describe the physical 
type.
+        * @param streamMarkers   If true puts stream markers otherwise puts 
batch markers.
+        * @param nameRemapping   Additional remapping of a logical to a 
physical field name.
+        *                        TimestampExtractor works with logical names, 
but accesses physical
+        *                        fields
+        * @return Physical indices of logical fields selected with {@code 
projectedLogicalFields} mask.
+        */
+       public static int[] computePhysicalIndicesOrTimeAttributeMarkers(
+                       TableSource<?> tableSource,
+                       List<TableColumn> logicalColumns,
+                       boolean streamMarkers,
+                       Function<String, String> nameRemapping) {
+               Optional<String> proctimeAttribute = 
getProctimeAttribute(tableSource);
+               List<String> rowtimeAttributes = 
getRowtimeAttributes(tableSource);
+
+               List<TableColumn> columnsWithoutTimeAttributes = 
logicalColumns.stream().filter(col ->
+                       !rowtimeAttributes.contains(col.getName())
+                               && proctimeAttribute.map(attr -> 
!attr.equals(col.getName())).orElse(true))
+                       .collect(Collectors.toList());
+
+               Map<TableColumn, Integer> columnsToPhysicalIndices = 
TypeMappingUtils.computePhysicalIndices(
+                       columnsWithoutTimeAttributes.stream(),
+                       tableSource.getProducedDataType(),
+                       nameRemapping
+               );
+
+               return logicalColumns.stream().mapToInt(logicalColumn -> {
+                       if (proctimeAttribute.map(attr -> 
attr.equals(logicalColumn.getName())).orElse(false)) {
+                               verifyTimeAttributeType(logicalColumn, 
"Proctime");
+
+                               if (streamMarkers) {
+                                       return 
TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER;
+                               } else {
+                                       return 
TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER;
+                               }
+                       } else if 
(rowtimeAttributes.contains(logicalColumn.getName())) {
+                               verifyTimeAttributeType(logicalColumn, 
"Rowtime");
+
+                               if (streamMarkers) {
+                                       return 
TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER;
+                               } else {
+                                       return 
TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER;
+                               }
+                       } else {
+                               return 
columnsToPhysicalIndices.get(logicalColumn);
+                       }
+               }).toArray();
+       }
+
+       private static void verifyTimeAttributeType(TableColumn logicalColumn, 
String rowtimeOrProctime) {
+               if (!hasFamily(logicalColumn.getType().getLogicalType(), 
LogicalTypeFamily.TIMESTAMP)) {
+                       throw new ValidationException(String.format(
+                               "%s field '%s' has invalid type %s. %s 
attributes must be of a Timestamp family.",
+                               rowtimeOrProctime,
+                               logicalColumn.getName(),
+                               logicalColumn.getType(),
+                               rowtimeOrProctime));
+               }
+       }
+
+       private static Map<TableColumn, Integer> computePhysicalIndices(
+                       Stream<TableColumn> columns,
+                       DataType physicalType,
+                       Function<String, String> nameRemappingFunction) {
+               if 
(LogicalTypeChecks.isCompositeType(physicalType.getLogicalType())) {
+                       TableSchema physicalSchema = 
DataTypeUtils.expandCompositeTypeToSchema(physicalType);
+                       return computeInCompositeType(columns, physicalSchema, 
wrapWithNotNullCheck(nameRemappingFunction));
+               } else {
+                       return computeInSimpleType(columns, physicalType);
+               }
+       }
+
+       private static Function<String, String> 
wrapWithNotNullCheck(Function<String, String> nameRemapping) {
+               return name -> {
+                       String resolvedFieldName = nameRemapping.apply(name);
+                       if (resolvedFieldName == null) {
+                               throw new ValidationException(String.format(
+                                       "Field '%s' could not be resolved by 
the field mapping.",
+                                       name));
+                       }
+                       return resolvedFieldName;
+               };
+       }
+
+       private static Map<TableColumn, Integer> computeInCompositeType(
+                       Stream<TableColumn> columns,
+                       TableSchema physicalSchema,
+                       Function<String, String> nameRemappingFunction) {
+               return columns.collect(
+                       Collectors.toMap(
+                               Function.identity(),
+                               column -> {
+                                       String remappedName = 
nameRemappingFunction.apply(column.getName());
+
+                                       int idx = IntStream.range(0, 
physicalSchema.getFieldCount())
+                                               .filter(i -> 
physicalSchema.getFieldName(i).get().equals(remappedName))
+                                               .findFirst()
+                                               .orElseThrow(() -> new 
ValidationException(String.format(
+                                                       "Could not map %s 
column to the underlying physical type %s. No such field.",
+                                                       column.getName(),
+                                                       physicalSchema
+                                               )));
+
+                                       LogicalType physicalFieldType = 
physicalSchema.getFieldDataType(idx).get().getLogicalType();
+                                       LogicalType logicalFieldType = 
column.getType().getLogicalType();
+
+                                       checkIfCompatible(
+                                               physicalFieldType,
+                                               logicalFieldType,
+                                               (cause) -> new 
ValidationException(
+                                                       String.format(
+                                                               "Type %s of 
table field '%s' does not match with " +
+                                                                       "the 
physical type %s of the '%s' field of the TableSource return type.",
+                                                               
logicalFieldType,
+                                                               
column.getName(),
+                                                               
physicalFieldType,
+                                                               remappedName),
+                                                       cause));
+
+                                       return idx;
+                               }
+                       )
+               );
+       }
+
+       private static void checkIfCompatible(
+                       LogicalType physicalFieldType,
+                       LogicalType logicalFieldType,
+                       Function<Throwable, ValidationException> 
exceptionSupplier) {
+               if (LogicalTypeChecks.areTypesCompatible(physicalFieldType, 
logicalFieldType)) {
+                       return;
+               }
+
+               physicalFieldType.accept(new LogicalTypeDefaultVisitor<Void>() {
+                       @Override
+                       public Void visit(LogicalType other) {
+                               if (other instanceof LegacyTypeInformationType 
&& other.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
+                                       if (!(logicalFieldType instanceof 
DecimalType)) {
+                                               throw 
exceptionSupplier.apply(null);
+                                       }
+
+                                       DecimalType logicalDecimalType = 
(DecimalType) logicalFieldType;
+                                       if (logicalDecimalType.getPrecision() 
!= DecimalType.MAX_PRECISION ||
+                                                       
logicalDecimalType.getScale() != 18) {
+                                               throw 
exceptionSupplier.apply(new ValidationException(
+                                                       "Legacy decimal type 
can only be mapped to DECIMAL(38, 18)."));
+                                       }
+
+                                       return null;
+                               }
+
+                               return defaultMethod(other);
+                       }
+
+                       @Override
+                       protected Void defaultMethod(LogicalType logicalType) {
+                               throw exceptionSupplier.apply(null);
+                       }
+               });
+       }
+
+       private static Map<TableColumn, Integer> computeInSimpleType(
+                       Stream<TableColumn> columns,
+                       DataType physicalType) {
+
+               Map<TableColumn, Integer> indices = columns.collect(
+                       Collectors.toMap(
+                               Function.identity(),
+                               col -> 0
+                       )
+               );
+
+               if (indices.keySet().size() > 1) {
+                       throw new ValidationException(String.format(
+                               "More than one table field matched to atomic 
input type %s.)",
+                               physicalType));
+               }
+
+               return indices;
+       }
+
+       /** Returns a list with all rowtime attribute names of the 
[[TableSource]]. */
+       private static List<String> getRowtimeAttributes(TableSource<?> 
tableSource){
+               if (tableSource instanceof DefinedRowtimeAttributes) {
+                       return ((DefinedRowtimeAttributes) 
tableSource).getRowtimeAttributeDescriptors()
+                               .stream()
+                               
.map(RowtimeAttributeDescriptor::getAttributeName)
+                               .collect(Collectors.toList());
+               } else {
+                       return Collections.emptyList();
+               }
+       }
+
+       /** Returns the proctime attribute of the [[TableSource]] if it is 
defined. */
+       private static Optional<String> getProctimeAttribute(TableSource<?> 
tableSource) {
+               if (tableSource instanceof DefinedProctimeAttribute) {
+                       return Optional.ofNullable(((DefinedProctimeAttribute) 
tableSource).getProctimeAttribute());
+               } else {
+                       return Optional.empty();
+               }
+       }
+
+       private TypeMappingUtils() {
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
new file mode 100644
index 0000000..daadd66
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sources.DefinedProctimeAttribute;
+import org.apache.flink.table.sources.DefinedRowtimeAttributes;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+
+/**
+ * Tests for {@link TypeMappingUtils}.
+ */
+public class TypeMappingUtilsTest {
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void testFieldMappingReordered() {
+               int[] indices = TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f1", DataTypes.BIGINT())
+                               .field("f0", DataTypes.STRING())
+                               .build().getTableColumns(),
+                       ROW(FIELD("f0", DataTypes.STRING()), FIELD("f1", 
DataTypes.BIGINT())),
+                       Function.identity()
+               );
+
+               assertThat(indices, equalTo(new int[] {1, 0}));
+       }
+
+       @Test
+       public void testFieldMappingNonMatchingTypes() {
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage("Type TIMESTAMP(3) of table field 'f0' 
does not match with the physical type STRING of " +
+                       "the 'f0' field of the TableSource return type.");
+               TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f1", DataTypes.BIGINT())
+                               .field("f0", DataTypes.TIMESTAMP(3))
+                               .build().getTableColumns(),
+                       ROW(FIELD("f0", DataTypes.STRING()), FIELD("f1", 
DataTypes.BIGINT())),
+                       Function.identity()
+               );
+       }
+
+       @Test
+       public void testFieldMappingNonMatchingPrecision() {
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage("Type TIMESTAMP(9) of table field 'f0' 
does not match with the physical type " +
+                       "TIMESTAMP(3) of the 'f0' field of the TableSource 
return type.");
+               TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f0", DataTypes.TIMESTAMP(9))
+                               .build().getTableColumns(),
+                       ROW(FIELD("f0", DataTypes.TIMESTAMP(3))),
+                       Function.identity()
+               );
+       }
+
+       @Test
+       public void testNameMappingDoesNotExist() {
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage("Field 'f0' could not be resolved by the 
field mapping.");
+               TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f0", DataTypes.BIGINT())
+                               .build().getTableColumns(),
+                       ROW(FIELD("f0", DataTypes.BIGINT())),
+                       str -> null
+               );
+       }
+
+       @Test
+       public void testFieldMappingLegacyDecimalType() {
+               int[] indices = TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f0", DECIMAL(38, 18))
+                               .build().getTableColumns(),
+                       ROW(FIELD("f0", 
TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC))),
+                       Function.identity()
+               );
+
+               assertThat(indices, equalTo(new int[] {0}));
+       }
+
+       @Test
+       public void testFieldMappingLegacyDecimalTypeNotMatchingPrecision() {
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage("Type DECIMAL(38, 10) of table field 'f0' 
does not match with the physical type" +
+                       " LEGACY('DECIMAL', 'DECIMAL') of the 'f0' field of the 
TableSource return type.");
+               thrown.expectCause(allOf(
+                       instanceOf(ValidationException.class),
+                       hasMessage(equalTo("Legacy decimal type can only be 
mapped to DECIMAL(38, 18)."))));
+
+               int[] indices = TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f0", DECIMAL(38, 10))
+                               .build().getTableColumns(),
+                       ROW(FIELD("f0", 
TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC))),
+                       Function.identity()
+               );
+
+               assertThat(indices, equalTo(new int[] {0}));
+       }
+
+       @Test
+       public void testFieldMappingRowTypeNotMatchingNamesInNestedType() {
+               int[] indices = TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f0", DECIMAL(38, 18))
+                               .field("f1", ROW(FIELD("logical_f1_0", 
BIGINT()), FIELD("logical_f1_1", STRING())))
+                               .build().getTableColumns(),
+                       ROW(
+                               FIELD("f0", DECIMAL(38, 18)),
+                               FIELD("f1", ROW(FIELD("physical_f1_0", 
BIGINT()), FIELD("physical_f1_1", STRING())))
+                       ),
+                       Function.identity()
+               );
+
+               assertThat(indices, equalTo(new int[] {0, 1}));
+       }
+
+       @Test
+       public void testFieldMappingRowTypeNotMatchingTypesInNestedType() {
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage("Type ROW<`f1_0` BIGINT, `f1_1` STRING> of 
table field 'f1' does not match with the " +
+                       "physical type ROW<`f1_0` STRING, `f1_1` STRING> of the 
'f1' field of the TableSource return type.");
+
+               TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f0", DECIMAL(38, 18))
+                               .field("f1", ROW(FIELD("f1_0", BIGINT()), 
FIELD("f1_1", STRING())))
+                               .build().getTableColumns(),
+                       ROW(
+                               FIELD("f0", DECIMAL(38, 18)),
+                               FIELD("f1", ROW(FIELD("f1_0", STRING()), 
FIELD("f1_1", STRING())))
+                       ),
+                       Function.identity()
+               );
+       }
+
+       @Test
+       public void testFieldMappingLegacyCompositeType() {
+               int[] indices = TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("f1", DataTypes.BIGINT())
+                               .field("f0", DataTypes.STRING())
+                               .build().getTableColumns(),
+                       
TypeConversions.fromLegacyInfoToDataType(Types.TUPLE(Types.STRING, Types.LONG)),
+                       Function.identity()
+               );
+
+               assertThat(indices, equalTo(new int[] {1, 0}));
+       }
+
+       @Test
+       public void testFieldMappingLegacyCompositeTypeWithRenaming() {
+               int[] indices = TypeMappingUtils.computePhysicalIndices(
+                       TableSchema.builder()
+                               .field("a", DataTypes.BIGINT())
+                               .field("b", DataTypes.STRING())
+                               .build().getTableColumns(),
+                       
TypeConversions.fromLegacyInfoToDataType(Types.TUPLE(Types.STRING, Types.LONG)),
+                       str -> {
+                               switch (str) {
+                                       case "a":
+                                               return "f1";
+                                       case "b":
+                                               return "f0";
+                                       default:
+                                               throw new AssertionError();
+                               }
+                       }
+               );
+
+               assertThat(indices, equalTo(new int[]{1, 0}));
+       }
+
+       @Test
+       public void testMappingWithBatchTimeAttributes() {
+               TestTableSource tableSource = new TestTableSource(
+                       DataTypes.BIGINT(),
+                       Collections.singletonList("rowtime"),
+                       "proctime"
+               );
+               int[] indices = 
TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+                       tableSource,
+                       TableSchema.builder()
+                               .field("a", Types.LONG)
+                               .field("rowtime", Types.SQL_TIMESTAMP)
+                               .field("proctime", Types.SQL_TIMESTAMP)
+                               .build().getTableColumns(),
+                       false,
+                       Function.identity()
+               );
+
+               assertThat(indices, equalTo(new int[]{0, -3, -4}));
+       }
+
+       @Test
+       public void testMappingWithStreamTimeAttributes() {
+               TestTableSource tableSource = new TestTableSource(
+                       DataTypes.BIGINT(),
+                       Collections.singletonList("rowtime"),
+                       "proctime"
+               );
+               int[] indices = 
TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+                       tableSource,
+                       TableSchema.builder()
+                               .field("a", Types.LONG)
+                               .field("rowtime", Types.SQL_TIMESTAMP)
+                               .field("proctime", Types.SQL_TIMESTAMP)
+                               .build().getTableColumns(),
+                       true,
+                       Function.identity()
+               );
+
+               assertThat(indices, equalTo(new int[]{0, -1, -2}));
+       }
+
+       @Test
+       public void testMappingWithStreamTimeAttributesFromCompositeType() {
+               TestTableSource tableSource = new TestTableSource(
+                       ROW(FIELD("b", TIME()), FIELD("a", DataTypes.BIGINT())),
+                       Collections.singletonList("rowtime"),
+                       "proctime"
+               );
+               int[] indices = 
TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+                       tableSource,
+                       TableSchema.builder()
+                               .field("a", Types.LONG)
+                               .field("rowtime", Types.SQL_TIMESTAMP)
+                               .field("proctime", Types.SQL_TIMESTAMP)
+                               .build().getTableColumns(),
+                       true,
+                       Function.identity()
+               );
+
+               assertThat(indices, equalTo(new int[]{1, -1, -2}));
+       }
+
+       @Test
+       public void testWrongLogicalTypeForRowtimeAttribute() {
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "Rowtime field 'rowtime' has invalid type TIME(0). 
Rowtime attributes must be of a Timestamp family.");
+
+               TestTableSource tableSource = new TestTableSource(
+                       DataTypes.BIGINT(),
+                       Collections.singletonList("rowtime"),
+                       "proctime"
+               );
+               TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+                       tableSource,
+                       TableSchema.builder()
+                               .field("a", Types.LONG)
+                               .field("rowtime", Types.SQL_TIME)
+                               .field("proctime", Types.SQL_TIMESTAMP)
+                               .build().getTableColumns(),
+                       false,
+                       Function.identity()
+               );
+       }
+
+       @Test
+       public void testWrongLogicalTypeForProctimeAttribute() {
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "Proctime field 'proctime' has invalid type TIME(0). 
Proctime attributes must be of a Timestamp family.");
+
+               TestTableSource tableSource = new TestTableSource(
+                       DataTypes.BIGINT(),
+                       Collections.singletonList("rowtime"),
+                       "proctime"
+               );
+               TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+                       tableSource,
+                       TableSchema.builder()
+                               .field("a", Types.LONG)
+                               .field("rowtime", Types.SQL_TIMESTAMP)
+                               .field("proctime", Types.SQL_TIME)
+                               .build().getTableColumns(),
+                       false,
+                       Function.identity()
+               );
+       }
+
+       private static class TestTableSource
+               implements TableSource<Object>, DefinedProctimeAttribute, 
DefinedRowtimeAttributes {
+
+               private final DataType producedDataType;
+               private final List<String> rowtimeAttributes;
+               private final String proctimeAttribute;
+
+               private TestTableSource(
+                               DataType producedDataType,
+                               List<String> rowtimeAttributes,
+                               String proctimeAttribute) {
+                       this.producedDataType = producedDataType;
+                       this.rowtimeAttributes = rowtimeAttributes;
+                       this.proctimeAttribute = proctimeAttribute;
+               }
+
+               @Nullable
+               @Override
+               public String getProctimeAttribute() {
+                       return proctimeAttribute;
+               }
+
+               @Override
+               public List<RowtimeAttributeDescriptor> 
getRowtimeAttributeDescriptors() {
+                       return rowtimeAttributes.stream()
+                               .map(attr -> new 
RowtimeAttributeDescriptor(attr, null, null))
+                               .collect(Collectors.toList());
+               }
+
+               @Override
+               public DataType getProducedDataType() {
+                       return producedDataType;
+               }
+
+               @Override
+               public TableSchema getTableSchema() {
+                       throw new UnsupportedOperationException("Should not be 
called");
+               }
+       }
+}

Reply via email to