Airblader commented on a change in pull request #17350:
URL: https://github.com/apache/flink/pull/17350#discussion_r715632847



##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##########
@@ -49,7 +54,11 @@
         this.tableIdentifier = context.getObjectIdentifier();
         this.tableOptions = new Configuration();
         
context.getCatalogTable().getOptions().forEach(tableOptions::setString);
-        this.schema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+        this.schema = context.getCatalogTable().getResolvedSchema();
+        this.physicalColumns =

Review comment:
       This seems redundant, since `schema` contains this entirely.

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
##########
@@ -73,31 +73,32 @@
 
     public DeserializationSchemaAdapter(
             DeserializationSchema<RowData> deserializationSchema,
-            TableSchema schema,
+            List<Column> physicalColumns,

Review comment:
       We should just pass the (physical) `DataType` here and keep that. It 
contains both names and types.

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##########
@@ -59,15 +68,19 @@ ReadableConfig formatOptions(String identifier) {
         return new DelegatingConfiguration(tableOptions, identifier + ".");
     }
 
-    DataType getFormatDataType() {
-        TableSchema.Builder builder = TableSchema.builder();
-        schema.getTableColumns()
-                .forEach(
-                        column -> {
-                            if (!partitionKeys.contains(column.getName())) {
-                                builder.add(column);
-                            }
-                        });
-        return builder.build().toRowDataType();
+    DataType getRowDataTypeWithoutPartitionColumns() {

Review comment:
       This can be entirely derived from `getRowDataType` by using 
`getRowDataType().getLogicalType()` which is always `RowType` and has all the 
information.

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -164,7 +167,7 @@ private SourceProvider 
createSourceProvider(BulkFormat<RowData, FileSourceSplit>
 
                     @Override
                     public TableSchema getSchema() {
-                        return schema;
+                        return TableSchema.fromResolvedSchema(schema);

Review comment:
       So migrating this isn't needed to fix the issue you had encountered?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to