This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 51e4ee2  [FLINK-23306][table] Reduce usages of legacy TableSchema
51e4ee2 is described below

commit 51e4ee24a48b475ce6e19b6c06c43c12d5dce42d
Author: Timo Walther <[email protected]>
AuthorDate: Thu Jul 8 10:49:53 2021 +0200

    [FLINK-23306][table] Reduce usages of legacy TableSchema
    
    This closes #16425.
---
 docs/content.zh/docs/dev/table/sourcesSinks.md                         | 3 ++-
 docs/content/docs/dev/table/sourcesSinks.md                            | 3 ++-
 .../table/examples/java/connectors/SocketDynamicTableFactory.java      | 2 +-
 .../java/org/apache/flink/table/factories/PrintTableSinkFactory.java   | 2 +-
 .../flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala       | 2 +-
 .../planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala    | 2 +-
 .../plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala | 2 +-
 7 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sourcesSinks.md 
b/docs/content.zh/docs/dev/table/sourcesSinks.md
index b2932f5..666fa1b 100644
--- a/docs/content.zh/docs/dev/table/sourcesSinks.md
+++ b/docs/content.zh/docs/dev/table/sourcesSinks.md
@@ -452,7 +452,8 @@ public class SocketDynamicTableFactory implements 
DynamicTableSourceFactory {
     final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
 
     // derive the produced data type (excluding computed columns) from the 
catalog table
-    final DataType producedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+    final DataType producedDataType =
+            
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
 
     // create and return dynamic table source
     return new SocketDynamicTableSource(hostname, port, byteDelimiter, 
decodingFormat, producedDataType);
diff --git a/docs/content/docs/dev/table/sourcesSinks.md 
b/docs/content/docs/dev/table/sourcesSinks.md
index b2932f5..666fa1b 100644
--- a/docs/content/docs/dev/table/sourcesSinks.md
+++ b/docs/content/docs/dev/table/sourcesSinks.md
@@ -452,7 +452,8 @@ public class SocketDynamicTableFactory implements 
DynamicTableSourceFactory {
     final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
 
     // derive the produced data type (excluding computed columns) from the 
catalog table
-    final DataType producedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+    final DataType producedDataType =
+            
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
 
     // create and return dynamic table source
     return new SocketDynamicTableSource(hostname, port, byteDelimiter, 
decodingFormat, producedDataType);
diff --git 
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
 
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
index 0341466..f7c5603 100644
--- 
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
+++ 
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
@@ -95,7 +95,7 @@ public final class SocketDynamicTableFactory implements 
DynamicTableSourceFactor
 
         // derive the produced data type (excluding computed columns) from the 
catalog table
         final DataType producedDataType =
-                context.getCatalogTable().getSchema().toPhysicalRowDataType();
+                
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
 
         // create and return dynamic table source
         return new SocketDynamicTableSource(
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
index e1393c5..01e1f52 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
@@ -95,7 +95,7 @@ public class PrintTableSinkFactory implements 
DynamicTableSinkFactory {
         helper.validate();
         ReadableConfig options = helper.getOptions();
         return new PrintSink(
-                context.getCatalogTable().getSchema().toPhysicalRowDataType(),
+                
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),
                 options.get(PRINT_IDENTIFIER),
                 options.get(STANDARD_ERROR),
                 
options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null));
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index 08939ee..8ebb7fa 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -67,7 +67,7 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
           case act: CatalogTable =>
             val builder = ImmutableSet.builder[ImmutableBitSet]()
 
-            val schema = act.getSchema
+            val schema = act.getResolvedSchema
             if (schema.getPrimaryKey.isPresent) {
               // use relOptTable's type which may be projected based on 
original schema
               val columns = relOptTable.getRowType.getFieldNames
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
index b2b85e3..8b67547 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
@@ -59,7 +59,7 @@ class StreamPhysicalSinkRule extends ConverterRule(
           val dynamicPartFields = sink.catalogTable.getPartitionKeys
               .filter(!sink.staticPartitions.contains(_))
           val fieldNames = sink.catalogTable
-            .getSchema
+            .getResolvedSchema
             .toPhysicalRowDataType
             .getLogicalType.asInstanceOf[RowType]
             .getFieldNames
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
index 56a8ec7..8472e31 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
@@ -76,7 +76,7 @@ class StreamPhysicalTableSourceScanRule
         isSourceChangeEventsDuplicate(table.catalogTable, table.tableSource, 
config)) {
       // generate changelog normalize node
       // primary key has been validated in CatalogSourceTable
-      val primaryKey = table.catalogTable.getSchema.getPrimaryKey.get()
+      val primaryKey = table.catalogTable.getResolvedSchema.getPrimaryKey.get()
       val keyFields = primaryKey.getColumns
       val inputFieldNames = newScan.getRowType.getFieldNames
       val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, 
keyFields)

Reply via email to