This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 083e4d41afa5b6d960a578b1d91ba0d3a817e1b1
Author: slinkydeveloper <[email protected]>
AuthorDate: Wed Sep 29 17:54:53 2021 +0200

    [FLINK-24399][table-common] Add DataType methods to improve ergonomics when 
accessing row data type instances
    
    Signed-off-by: slinkydeveloper <[email protected]>
---
 .../org/apache/flink/table/types/DataType.java     | 93 ++++++++++++++++++++++
 .../org/apache/flink/table/types/DataTypeTest.java | 72 +++++++++++++++++
 2 files changed, 165 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
index 2d41583..23c0e85 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
@@ -24,14 +24,22 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isCompositeType;
 
 /**
  * Describes the data type of a value in the table ecosystem. Instances of 
this class can be used to
@@ -134,6 +142,91 @@ public abstract class DataType implements 
AbstractDataType<DataType>, Serializab
     // 
--------------------------------------------------------------------------------------------
 
     /**
+     * Returns the first-level field names for the provided {@link DataType}.
+     *
+     * <p>Note: This method returns an empty list for every {@link DataType} 
that is not a composite
+     * type.
+     */
+    public static List<String> getFieldNames(DataType dataType) {
+        final LogicalType type = dataType.getLogicalType();
+        if (hasRoot(type, LogicalTypeRoot.DISTINCT_TYPE)) {
+            return getFieldNames(dataType.getChildren().get(0));
+        } else if (isCompositeType(type)) {
+            return LogicalTypeChecks.getFieldNames(type);
+        }
+        return Collections.emptyList();
+    }
+
+    /**
+     * Returns the first-level field data types for the provided {@link 
DataType}.
+     *
+     * <p>Note: This method returns an empty list for every {@link DataType} 
that is not a composite
+     * type.
+     */
+    public static List<DataType> getFieldDataTypes(DataType dataType) {
+        final LogicalType type = dataType.getLogicalType();
+        if (hasRoot(type, LogicalTypeRoot.DISTINCT_TYPE)) {
+            return getFieldDataTypes(dataType.getChildren().get(0));
+        } else if (isCompositeType(type)) {
+            return dataType.getChildren();
+        }
+        return Collections.emptyList();
+    }
+
+    /**
+     * Returns the count of the first-level fields for the provided {@link 
DataType}.
+     *
+     * <p>Note: This method returns {@code 0} for every {@link DataType} that 
is not a composite
+     * type.
+     */
+    public static int getFieldCount(DataType dataType) {
+        return getFieldDataTypes(dataType).size();
+    }
+
+    /**
+     * Projects a (possibly nested) row data type by returning a new data type 
that only includes
+     * fields of the given index paths.
+     *
+     * <p>Note: Index paths allow for arbitrary deep nesting. For example, 
{@code [[0, 2, 1], ...]}
+     * specifies to include the 2nd field of the 3rd field of the 1st field in 
the top-level row.
+     * Sometimes, name conflicts might occur when extracting fields from a 
row. Considering the path
+     * is unique to extract fields, it makes sense to use the path to the 
fields with delimiter `_`
+     * as the new name of the field. For example, the new name of the field 
`b` in the row `a` is
+     * `a_b` rather than `b`. However, name conflicts are still possible in 
some cases, e.g. if the
+     * field name is`a_b` in the top level row. In this case, the method will 
use a postfix in the
+     * format '_$%d' to resolve the name conflicts.
+     */
+    public static DataType projectFields(DataType dataType, int[][] 
indexPaths) {
+        return DataTypeUtils.projectRow(dataType, indexPaths);
+    }
+
+    /**
+     * Projects a (possibly nested) row data type by returning a new data type 
that only includes
+     * fields of the given indices.
+     *
+     * <p>Note: This method only projects (possibly nested) fields in the 
top-level row.
+     */
+    public static DataType projectFields(DataType dataType, int[] indexes) {
+        return DataTypeUtils.projectRow(dataType, indexes);
+    }
+
+    /**
+     * Returns an ordered list of fields starting from the provided {@link 
DataType}.
+     *
+     * <p>Note: This method returns an empty list for every {@link DataType} 
that is not a composite
+     * type.
+     */
+    public static List<DataTypes.Field> getFields(DataType dataType) {
+        final List<String> names = getFieldNames(dataType);
+        final List<DataType> dataTypes = getFieldDataTypes(dataType);
+        return IntStream.range(0, names.size())
+                .mapToObj(i -> DataTypes.FIELD(names.get(i), dataTypes.get(i)))
+                .collect(Collectors.toList());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
      * This method should catch the most common errors. However, another 
validation is required in
      * deeper layers as we don't know whether the data type is used for input 
or output declaration.
      */
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
index 6f78ca5..f74991f 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -36,6 +36,7 @@ import static org.apache.flink.table.api.DataTypes.ARRAY;
 import static org.apache.flink.table.api.DataTypes.BIGINT;
 import static org.apache.flink.table.api.DataTypes.BOOLEAN;
 import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.INTERVAL;
@@ -44,6 +45,7 @@ import static org.apache.flink.table.api.DataTypes.MONTH;
 import static org.apache.flink.table.api.DataTypes.MULTISET;
 import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.STRUCTURED;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.apache.flink.table.api.DataTypes.YEAR;
 import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
@@ -148,4 +150,74 @@ public class DataTypeTest {
 
         assertEquals(children, mapDataType.getChildren());
     }
+
+    @Test
+    public void getFieldNames() {
+        assertEquals(
+                Arrays.asList("c0", "c1", "c2"),
+                DataType.getFieldNames(
+                        ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), 
FIELD("c2", INT()))));
+        assertEquals(
+                Arrays.asList("name", "count"),
+                DataType.getFieldNames(
+                        STRUCTURED(
+                                DataTypesTest.SimplePojo.class,
+                                FIELD("name", STRING()),
+                                FIELD("count", 
INT().notNull().bridgedTo(int.class)))));
+        assertEquals(Collections.emptyList(), 
DataType.getFieldNames(ARRAY(INT())));
+        assertEquals(Collections.emptyList(), DataType.getFieldNames(INT()));
+    }
+
+    @Test
+    public void getFieldDataTypes() {
+        assertEquals(
+                Arrays.asList(BOOLEAN(), DOUBLE(), INT()),
+                DataType.getFieldDataTypes(
+                        ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), 
FIELD("c2", INT()))));
+        assertEquals(
+                Arrays.asList(STRING(), INT().notNull().bridgedTo(int.class)),
+                DataType.getFieldDataTypes(
+                        STRUCTURED(
+                                DataTypesTest.SimplePojo.class,
+                                FIELD("name", STRING()),
+                                FIELD("count", 
INT().notNull().bridgedTo(int.class)))));
+        assertEquals(Collections.emptyList(), 
DataType.getFieldDataTypes(ARRAY(INT())));
+        assertEquals(Collections.emptyList(), 
DataType.getFieldDataTypes(INT()));
+    }
+
+    @Test
+    public void getFieldCount() {
+        assertEquals(
+                3,
+                DataType.getFieldCount(
+                        ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), 
FIELD("c2", INT()))));
+        assertEquals(
+                2,
+                DataType.getFieldCount(
+                        STRUCTURED(
+                                DataTypesTest.SimplePojo.class,
+                                FIELD("name", STRING()),
+                                FIELD("count", 
INT().notNull().bridgedTo(int.class)))));
+        assertEquals(0, DataType.getFieldCount(ARRAY(INT())));
+        assertEquals(0, DataType.getFieldCount(INT()));
+    }
+
+    @Test
+    public void getFields() {
+        assertEquals(
+                Arrays.asList(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), 
FIELD("c2", INT())),
+                DataType.getFields(
+                        ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), 
FIELD("c2", INT()))));
+        assertEquals(
+                Arrays.asList(
+                        FIELD("name", STRING()),
+                        FIELD("count", INT().notNull().bridgedTo(int.class))),
+                DataType.getFields(
+                        STRUCTURED(
+                                DataTypesTest.SimplePojo.class,
+                                FIELD("name", STRING()),
+                                FIELD("count", 
INT().notNull().bridgedTo(int.class)))));
+        assertEquals(Collections.emptyList(), 
DataType.getFields(ARRAY(INT())));
+        assertEquals(Collections.emptyList(), DataType.getFields(INT()));
+    }
 }

Reply via email to