This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 083e4d41afa5b6d960a578b1d91ba0d3a817e1b1 Author: slinkydeveloper <[email protected]> AuthorDate: Wed Sep 29 17:54:53 2021 +0200 [FLINK-24399][table-common] Add DataType methods to improve ergonomics when accessing row data type instances Signed-off-by: slinkydeveloper <[email protected]> --- .../org/apache/flink/table/types/DataType.java | 93 ++++++++++++++++++++++ .../org/apache/flink/table/types/DataTypeTest.java | 72 +++++++++++++++++ 2 files changed, 165 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java index 2d41583..23c0e85 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java @@ -24,14 +24,22 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isCompositeType; /** * Describes the data type of a value in the table ecosystem. Instances of this class can be used to @@ -134,6 +142,91 @@ public abstract class DataType implements AbstractDataType<DataType>, Serializab // -------------------------------------------------------------------------------------------- /** + * Returns the first-level field names for the provided {@link DataType}. + * + * <p>Note: This method returns an empty list for every {@link DataType} that is not a composite + * type. + */ + public static List<String> getFieldNames(DataType dataType) { + final LogicalType type = dataType.getLogicalType(); + if (hasRoot(type, LogicalTypeRoot.DISTINCT_TYPE)) { + return getFieldNames(dataType.getChildren().get(0)); + } else if (isCompositeType(type)) { + return LogicalTypeChecks.getFieldNames(type); + } + return Collections.emptyList(); + } + + /** + * Returns the first-level field data types for the provided {@link DataType}. + * + * <p>Note: This method returns an empty list for every {@link DataType} that is not a composite + * type. + */ + public static List<DataType> getFieldDataTypes(DataType dataType) { + final LogicalType type = dataType.getLogicalType(); + if (hasRoot(type, LogicalTypeRoot.DISTINCT_TYPE)) { + return getFieldDataTypes(dataType.getChildren().get(0)); + } else if (isCompositeType(type)) { + return dataType.getChildren(); + } + return Collections.emptyList(); + } + + /** + * Returns the count of the first-level fields for the provided {@link DataType}. + * + * <p>Note: This method returns {@code 0} for every {@link DataType} that is not a composite + * type. + */ + public static int getFieldCount(DataType dataType) { + return getFieldDataTypes(dataType).size(); + } + + /** + * Projects a (possibly nested) row data type by returning a new data type that only includes + * fields of the given index paths. + * + * <p>Note: Index paths allow for arbitrary deep nesting. For example, {@code [[0, 2, 1], ...]} + * specifies to include the 2nd field of the 3rd field of the 1st field in the top-level row. + * Sometimes, name conflicts might occur when extracting fields from a row. Considering the path + * is unique to extract fields, it makes sense to use the path to the fields with delimiter `_` + * as the new name of the field. For example, the new name of the field `b` in the row `a` is + * `a_b` rather than `b`. However, name conflicts are still possible in some cases, e.g. if the + * field name is`a_b` in the top level row. In this case, the method will use a postfix in the + * format '_$%d' to resolve the name conflicts. + */ + public static DataType projectFields(DataType dataType, int[][] indexPaths) { + return DataTypeUtils.projectRow(dataType, indexPaths); + } + + /** + * Projects a (possibly nested) row data type by returning a new data type that only includes + * fields of the given indices. + * + * <p>Note: This method only projects (possibly nested) fields in the top-level row. + */ + public static DataType projectFields(DataType dataType, int[] indexes) { + return DataTypeUtils.projectRow(dataType, indexes); + } + + /** + * Returns an ordered list of fields starting from the provided {@link DataType}. + * + * <p>Note: This method returns an empty list for every {@link DataType} that is not a composite + * type. + */ + public static List<DataTypes.Field> getFields(DataType dataType) { + final List<String> names = getFieldNames(dataType); + final List<DataType> dataTypes = getFieldDataTypes(dataType); + return IntStream.range(0, names.size()) + .mapToObj(i -> DataTypes.FIELD(names.get(i), dataTypes.get(i))) + .collect(Collectors.toList()); + } + + // -------------------------------------------------------------------------------------------- + + /** * This method should catch the most common errors. However, another validation is required in * deeper layers as we don't know whether the data type is used for input or output declaration. */ diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java index 6f78ca5..f74991f 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java @@ -36,6 +36,7 @@ import static org.apache.flink.table.api.DataTypes.ARRAY; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.BOOLEAN; import static org.apache.flink.table.api.DataTypes.CHAR; +import static org.apache.flink.table.api.DataTypes.DOUBLE; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.INT; import static org.apache.flink.table.api.DataTypes.INTERVAL; @@ -44,6 +45,7 @@ import static org.apache.flink.table.api.DataTypes.MONTH; import static org.apache.flink.table.api.DataTypes.MULTISET; import static org.apache.flink.table.api.DataTypes.ROW; import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.STRUCTURED; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.DataTypes.YEAR; import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass; @@ -148,4 +150,74 @@ public class DataTypeTest { assertEquals(children, mapDataType.getChildren()); } + + @Test + public void getFieldNames() { + assertEquals( + Arrays.asList("c0", "c1", "c2"), + DataType.getFieldNames( + ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), FIELD("c2", INT())))); + assertEquals( + Arrays.asList("name", "count"), + DataType.getFieldNames( + STRUCTURED( + DataTypesTest.SimplePojo.class, + FIELD("name", STRING()), + FIELD("count", INT().notNull().bridgedTo(int.class))))); + assertEquals(Collections.emptyList(), DataType.getFieldNames(ARRAY(INT()))); + assertEquals(Collections.emptyList(), DataType.getFieldNames(INT())); + } + + @Test + public void getFieldDataTypes() { + assertEquals( + Arrays.asList(BOOLEAN(), DOUBLE(), INT()), + DataType.getFieldDataTypes( + ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), FIELD("c2", INT())))); + assertEquals( + Arrays.asList(STRING(), INT().notNull().bridgedTo(int.class)), + DataType.getFieldDataTypes( + STRUCTURED( + DataTypesTest.SimplePojo.class, + FIELD("name", STRING()), + FIELD("count", INT().notNull().bridgedTo(int.class))))); + assertEquals(Collections.emptyList(), DataType.getFieldDataTypes(ARRAY(INT()))); + assertEquals(Collections.emptyList(), DataType.getFieldDataTypes(INT())); + } + + @Test + public void getFieldCount() { + assertEquals( + 3, + DataType.getFieldCount( + ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), FIELD("c2", INT())))); + assertEquals( + 2, + DataType.getFieldCount( + STRUCTURED( + DataTypesTest.SimplePojo.class, + FIELD("name", STRING()), + FIELD("count", INT().notNull().bridgedTo(int.class))))); + assertEquals(0, DataType.getFieldCount(ARRAY(INT()))); + assertEquals(0, DataType.getFieldCount(INT())); + } + + @Test + public void getFields() { + assertEquals( + Arrays.asList(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), FIELD("c2", INT())), + DataType.getFields( + ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), FIELD("c2", INT())))); + assertEquals( + Arrays.asList( + FIELD("name", STRING()), + FIELD("count", INT().notNull().bridgedTo(int.class))), + DataType.getFields( + STRUCTURED( + DataTypesTest.SimplePojo.class, + FIELD("name", STRING()), + FIELD("count", INT().notNull().bridgedTo(int.class))))); + assertEquals(Collections.emptyList(), DataType.getFields(ARRAY(INT()))); + assertEquals(Collections.emptyList(), DataType.getFields(INT())); + } }
