This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 51e4ee2 [FLINK-23306][table] Reduce usages of legacy TableSchema
51e4ee2 is described below
commit 51e4ee24a48b475ce6e19b6c06c43c12d5dce42d
Author: Timo Walther <[email protected]>
AuthorDate: Thu Jul 8 10:49:53 2021 +0200
[FLINK-23306][table] Reduce usages of legacy TableSchema
This closes #16425.
---
docs/content.zh/docs/dev/table/sourcesSinks.md | 3 ++-
docs/content/docs/dev/table/sourcesSinks.md | 3 ++-
.../table/examples/java/connectors/SocketDynamicTableFactory.java | 2 +-
.../java/org/apache/flink/table/factories/PrintTableSinkFactory.java | 2 +-
.../flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala | 2 +-
.../planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala | 2 +-
.../plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala | 2 +-
7 files changed, 9 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/sourcesSinks.md
b/docs/content.zh/docs/dev/table/sourcesSinks.md
index b2932f5..666fa1b 100644
--- a/docs/content.zh/docs/dev/table/sourcesSinks.md
+++ b/docs/content.zh/docs/dev/table/sourcesSinks.md
@@ -452,7 +452,8 @@ public class SocketDynamicTableFactory implements
DynamicTableSourceFactory {
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
// derive the produced data type (excluding computed columns) from the
catalog table
- final DataType producedDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ final DataType producedDataType =
+
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// create and return dynamic table source
return new SocketDynamicTableSource(hostname, port, byteDelimiter,
decodingFormat, producedDataType);
diff --git a/docs/content/docs/dev/table/sourcesSinks.md
b/docs/content/docs/dev/table/sourcesSinks.md
index b2932f5..666fa1b 100644
--- a/docs/content/docs/dev/table/sourcesSinks.md
+++ b/docs/content/docs/dev/table/sourcesSinks.md
@@ -452,7 +452,8 @@ public class SocketDynamicTableFactory implements
DynamicTableSourceFactory {
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
// derive the produced data type (excluding computed columns) from the
catalog table
- final DataType producedDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ final DataType producedDataType =
+
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// create and return dynamic table source
return new SocketDynamicTableSource(hostname, port, byteDelimiter,
decodingFormat, producedDataType);
diff --git
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
index 0341466..f7c5603 100644
---
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
+++
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
@@ -95,7 +95,7 @@ public final class SocketDynamicTableFactory implements
DynamicTableSourceFactor
// derive the produced data type (excluding computed columns) from the
catalog table
final DataType producedDataType =
- context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// create and return dynamic table source
return new SocketDynamicTableSource(
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
index e1393c5..01e1f52 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
@@ -95,7 +95,7 @@ public class PrintTableSinkFactory implements
DynamicTableSinkFactory {
helper.validate();
ReadableConfig options = helper.getOptions();
return new PrintSink(
- context.getCatalogTable().getSchema().toPhysicalRowDataType(),
+
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),
options.get(PRINT_IDENTIFIER),
options.get(STANDARD_ERROR),
options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null));
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index 08939ee..8ebb7fa 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -67,7 +67,7 @@ class FlinkRelMdUniqueKeys private extends
MetadataHandler[BuiltInMetadata.Uniqu
case act: CatalogTable =>
val builder = ImmutableSet.builder[ImmutableBitSet]()
- val schema = act.getSchema
+ val schema = act.getResolvedSchema
if (schema.getPrimaryKey.isPresent) {
// use relOptTable's type which may be projected based on
original schema
val columns = relOptTable.getRowType.getFieldNames
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
index b2b85e3..8b67547 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
@@ -59,7 +59,7 @@ class StreamPhysicalSinkRule extends ConverterRule(
val dynamicPartFields = sink.catalogTable.getPartitionKeys
.filter(!sink.staticPartitions.contains(_))
val fieldNames = sink.catalogTable
- .getSchema
+ .getResolvedSchema
.toPhysicalRowDataType
.getLogicalType.asInstanceOf[RowType]
.getFieldNames
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
index 56a8ec7..8472e31 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
@@ -76,7 +76,7 @@ class StreamPhysicalTableSourceScanRule
isSourceChangeEventsDuplicate(table.catalogTable, table.tableSource,
config)) {
// generate changelog normalize node
// primary key has been validated in CatalogSourceTable
- val primaryKey = table.catalogTable.getSchema.getPrimaryKey.get()
+ val primaryKey = table.catalogTable.getResolvedSchema.getPrimaryKey.get()
val keyFields = primaryKey.getColumns
val inputFieldNames = newScan.getRowType.getFieldNames
val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames,
keyFields)