This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac203a7a26d78fe270bd9941bb721be53a893040
Author: slinkydeveloper <[email protected]>
AuthorDate: Wed Sep 29 17:49:17 2021 +0200

    [FLINK-24399][table-common] Add 
DynamicTableFactory.Context#getPrimaryKeyIndexes and 
DynamicTableFactory.Context#getPhysicalRowDataType
    
    Signed-off-by: slinkydeveloper <[email protected]>
    
    This closes #17381.
---
 .../flink/table/factories/DynamicTableFactory.java | 45 ++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
index 3b87eff..7fb880a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.types.DataType;
 
 /**
  * Base interface for configuring a dynamic table connector for an external 
storage system from
@@ -85,5 +86,49 @@ public interface DynamicTableFactory extends Factory {
 
         /** Whether the table is temporary. */
         boolean isTemporary();
+
+        /**
+         * Returns the physical schema to use for encoding and decoding 
records. The returned row
+         * data type contains only physical columns. It does not include 
computed or metadata
+         * columns. A factory can use the returned data type to configure the 
table connector, and
+         * can manipulate it using the {@link DataType} static methods:
+         *
+         * <pre>{@code
+         * // Project some fields into a new data type
+         * DataType projectedDataType = DataType.projectRow(
+         *      context.getPhysicalRowDataType(), projectedIndexes);
+         *
+         * // Create key data type
+         * DataType keyDataType = DataType.projectRow(
+         *      context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
+         *
+         * // Create a new data type filtering columns of the original data 
type
+         * DataType myOwnDataType = DataTypes.ROW(
+         *      DataType.getFields(context.getPhysicalRowDataType())
+         *          .stream()
+         *          .filter(myFieldFilterPredicate)
+         *          .toArray(DataTypes.Field[]::new))
+         * }</pre>
+         *
+         * <p>Shortcut for {@code 
getCatalogTable().getResolvedSchema().toPhysicalRowDataType()}.
+         *
+         * @see ResolvedSchema#toPhysicalRowDataType()
+         */
+        default DataType getPhysicalRowDataType() {
+            return 
getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+        }
+
+        /**
+         * Returns the primary key indexes, if any, otherwise returns an empty 
array. A factory can
+         * use it to compute the schema projection of the key fields with 
{@link
+         * DataType#projectFields(DataType, int[])}.
+         *
+         * <p>Shortcut for {@code 
getCatalogTable().getResolvedSchema().getPrimaryKeyIndexes()}.
+         *
+         * @see ResolvedSchema#getPrimaryKeyIndexes()
+         */
+        default int[] getPrimaryKeyIndexes() {
+            return 
getCatalogTable().getResolvedSchema().getPrimaryKeyIndexes();
+        }
     }
 }

Reply via email to