twalthr commented on a change in pull request #17381:
URL: https://github.com/apache/flink/pull/17381#discussion_r719422899
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
##########
@@ -96,6 +97,15 @@
@PublicEvolving
public final class DataTypes {
+ /**
+ * Create {@link DataType} from a {@link LogicalType}.
Review comment:
```
Creates a {@link DataType} from a {@link LogicalType} with default
conversion class.
@see LogicalType#getDefaultConversion()
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
##########
@@ -133,6 +143,92 @@ public int hashCode() {
//
--------------------------------------------------------------------------------------------
+ /**
+ * Returns the first-level field names for the provided {{@link DataType}}.
Review comment:
`{{` -> `{` here and below
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java
##########
@@ -94,10 +94,18 @@
* Creates type information describing the internal data structures of
the given {@link
* DataType}.
*
- * @see TableSchema#toPhysicalRowDataType()
+ * @see
org.apache.flink.table.catalog.ResolvedSchema#toPhysicalRowDataType()
Review comment:
nit for the future: can we import the classes to keep the JavaDocs less
bloated, also in the `DataType` commit
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -85,5 +86,49 @@
/** Whether the table is temporary. */
boolean isTemporary();
+
+ /**
+ * Returns the physical schema to use for encoding and decoding
records. The returned row
+ * data type contains only physical columns. It does not include
computed or metadata
+ * columns. A factory can use the returned data type to configure the
table connector, and
+ * can manipulate it using the {{@link DataType}} static methods:
+ *
+ * <pre>{@code
+ * // Project some fields into a new data type
+ * DataType projectedDataType = DataType.projectRow(
+ * context.getPhysicalRowDataType(), projectedIndexes);
+ *
+ * // Create key data type
+ * DataType keyDataType = DataType.projectRow(
+ * context.getPhysicalRowDataType(),
context.getPrimaryKeyIndexes());
+ *
+ * // Create a new data type filtering columns of the original data
type
+ * DataType myOwnDataType = DataTypes.ROW(
+ * DataType.getFields(context.getPhysicalRowDataType())
+ * .stream()
+ * .filter(myFieldFilterPredicate)
+ * .toArray(DataTypes.Field[]::new))
+ * }</pre>
+ *
+ * <p>Shortcut for {@code
getCatalogTable().getResolvedSchema().toPhysicalRowDataType()}.
Review comment:
nice JavaDocs 🥰
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
##########
@@ -149,6 +150,14 @@
*/
<T> TypeInformation<T> createTypeInformation(DataType
consumedDataType);
+ /**
+ * Creates type information describing the internal data structures of
the given {@link
+ * LogicalType}.
+ */
+ default <T> TypeInformation<T> createTypeInformation(LogicalType
consumedLogicalType) {
Review comment:
let's really implement this method, this way we avoid creating a data
type at all, because if you check the underlying implementation, you will see
that actually a logical type is required anyways
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]