Airblader commented on a change in pull request #17381:
URL: https://github.com/apache/flink/pull/17381#discussion_r719088932
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
##########
@@ -163,6 +165,18 @@ public int getColumnCount() {
return Optional.ofNullable(primaryKey);
}
+ /** Returns the primary key indexes, if any, otherwise returns an empty
list. * */
Review comment:
```suggestion
/** Returns the primary key indexes, if any, otherwise returns an empty
array. * */
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
##########
@@ -115,6 +149,167 @@ public int hashCode() {
//
--------------------------------------------------------------------------------------------
+ /**
+ * Returns the first-level field names for the provided {{@link DataType}}.
+ *
+ * <p>Note: this method returns an empty list for every {{@link DataType}}
not instance of
+ * {{@link FieldsDataType}}.
Review comment:
```suggestion
* <p>Note: this method returns an empty list for every {{@link
DataType}} that is not an instance of {{@link FieldsDataType}}.
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
##########
@@ -163,6 +165,18 @@ public int getColumnCount() {
return Optional.ofNullable(primaryKey);
}
+ /** Returns the primary key indexes, if any, otherwise returns an empty
list. * */
+ public int[] getPrimaryKeyIndexes() {
+ return getPrimaryKey()
+ .map(UniqueConstraint::getColumns)
+ .map(
+ l -> {
+ List<String> columns = getColumnNames();
+ return
l.stream().mapToInt(columns::indexOf).toArray();
+ })
+ .orElseGet(() -> new int[] {});
Review comment:
The column names are unnecessarily calculated on every iteration.
```suggestion
final List<String> columns = getColumnNames();
return getPrimaryKey()
.map(UniqueConstraint::getColumns)
.map(pkColumns ->
pkColumns.stream().mapToInt(columns::indexOf).toArray())
.orElseGet(() -> new int[] {});
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -85,5 +86,24 @@
/** Whether the table is temporary. */
boolean isTemporary();
+
+ /**
+ * Shortcut for {@code
getCatalogTable().getResolvedSchema().toPhysicalRowDataType()}.
Review comment:
I think we should add instructional information here on what to use this
for / how to use it in the specific context of the connector (since we offer a
shortcut for it). This method will be called by basically every connector
implementation.
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
##########
@@ -115,6 +149,167 @@ public int hashCode() {
//
--------------------------------------------------------------------------------------------
+ /**
+ * Returns the first-level field names for the provided {{@link DataType}}.
+ *
+ * <p>Note: this method returns an empty list for every {{@link DataType}}
not instance of
+ * {{@link FieldsDataType}}.
+ */
+ public static List<String> getFieldNames(DataType dataType) {
+ return dataType.logicalType.accept(
+ new LogicalTypeDefaultVisitor<List<String>>() {
+
+ @Override
+ public List<String> visit(StructuredType structuredType) {
+ return structuredType.getAttributes().stream()
+
.map(StructuredType.StructuredAttribute::getName)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<String> visit(RowType rowType) {
+ return rowType.getFieldNames();
+ }
+
+ @Override
+ public List<String> visit(DistinctType distinctType) {
+ return distinctType.getSourceType().accept(this);
+ }
+
+ @Override
+ protected List<String> defaultMethod(LogicalType
logicalType) {
+ return Collections.emptyList();
+ }
+ });
+ }
+
+ /**
+ * Returns the first-level field data types for the provided {{@link
DataType}}.
+ *
+ * <p>Note: this method returns an empty list for every {{@link DataType}}
not instance of
+ * {{@link FieldsDataType}}.
+ */
+ public static List<DataType> getFieldDataTypes(DataType dataType) {
+ return dataType.accept(
+ new DataTypeDefaultVisitor<List<DataType>>() {
+ @Override
+ public List<DataType> visit(FieldsDataType fieldsDataType)
{
+ return fieldsDataType.getChildren();
+ }
+
+ @Override
+ protected List<DataType> defaultMethod(DataType dataType) {
+ return Collections.emptyList();
+ }
+ });
+ }
+
+ /**
+ * Returns the count of the first-level fields for the provided {{@link
DataType}}.
+ *
+ * <p>Note: this method returns {{@code 0}} for every {{@link DataType}}
not instance of {{@link
+ * FieldsDataType}}.
+ */
+ public static int getFieldCount(DataType dataType) {
+ return getFieldDataTypes(dataType).size();
+ }
+
+ /**
+ * Projects a (possibly nested) row data type by returning a new data type
that only includes
+ * fields of the given index paths.
+ *
+ * <p>Note: Index paths allow for arbitrary deep nesting. For example,
{@code [[0, 2, 1], ...]}
+ * specifies to include the 2nd field of the 3rd field of the 1st field in
the top-level row.
+ * Sometimes, it may get name conflicts when extract fields from the row
field. Considering the
+ * the path is unique to extract fields, it makes sense to use the path to
the fields with
+ * delimiter `_` as the new name of the field. For example, the new name
of the field `b` in the
+ * row `a` is `a_b` rather than `b`. But it may still gets name conflicts
in some situation,
+ * such as the field `a_b` in the top level schema. In such situation, it
will use the postfix
+ * in the format '_$%d' to resolve the name conflicts.
+ */
+ public static DataType projectFields(DataType dataType, int[][]
indexPaths) {
+ return DataTypeUtils.projectRow(dataType, indexPaths);
+ }
+
+ /**
+ * Projects a (possibly nested) row data type by returning a new data type
that only includes
+ * fields of the given indices.
+ *
+ * <p>Note: This method only projects (possibly nested) fields in the
top-level row.
+ */
+ public static DataType projectFields(DataType dataType, int[] indexes) {
+ return DataTypeUtils.projectRow(dataType, indexes);
+ }
+
+ /**
+ * Returns an ordered list of fields starting from the provided {{@link
DataType}}.
+ *
+ * <p>Note: this method returns an empty list if the provided data type is
not an instance of
+ * {{@link FieldsDataType}} or its logical type is not an instance of
{{@link RowType}}.
+ */
+ public static List<DataTypes.Field> getFields(DataType dataType) {
Review comment:
I think this method deserves some refactoring to reduce the complexity.
It could even live in a util that is just called from here. Right now it's a
very lengthy method with high-level ifs.
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
##########
@@ -115,6 +149,167 @@ public int hashCode() {
//
--------------------------------------------------------------------------------------------
+ /**
+ * Returns the first-level field names for the provided {{@link DataType}}.
+ *
+ * <p>Note: this method returns an empty list for every {{@link DataType}}
not instance of
+ * {{@link FieldsDataType}}.
Review comment:
Same for the methods below.
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
##########
@@ -149,6 +150,14 @@
*/
<T> TypeInformation<T> createTypeInformation(DataType
consumedDataType);
+ /**
+ * Creates type information describing the internal data structures of
the given {@link
+ * LogicalType}.
+ */
+ default <T> TypeInformation<T> createTypeInformation(LogicalType
producedLogicalType) {
+ return createTypeInformation(DataTypes.of(producedLogicalType));
Review comment:
```suggestion
default <T> TypeInformation<T> createTypeInformation(LogicalType
consumedLogicalType) {
return createTypeInformation(DataTypes.of(consumedLogicalType));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]