openinx commented on a change in pull request #1393:
URL: https://github.com/apache/iceberg/pull/1393#discussion_r478301144



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -277,20 +286,29 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
   }
 
   @Override
-  public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
-    try {
-      Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
-      TableSchema tableSchema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+  public CatalogTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+    Table table = getIcebergTable(tablePath);
+    return toCatalogTable(table);
+  }
 
-      // NOTE: We can not create a IcebergCatalogTable, because Flink 
optimizer may use CatalogTableImpl to copy a new
-      // catalog table.
-      // Let's re-loading table from Iceberg catalog when creating source/sink 
operators.
-      return new CatalogTableImpl(tableSchema, table.properties(), null);
+  private Table getIcebergTable(ObjectPath tablePath) throws 
TableNotExistException {
+    try {
+      return icebergCatalog.loadTable(toIdentifier(tablePath));
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       throw new TableNotExistException(getName(), tablePath, e);
     }
   }
 
+  private CatalogTable toCatalogTable(Table table) {
+    TableSchema schema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+    List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
+
+    // NOTE: We can not create a IcebergCatalogTable, because Flink optimizer 
may use CatalogTableImpl to copy a new

Review comment:
       nit:  we may need to change this comment ? Actually, I did not found any 
class named `IcebergCatalogTable`,  or I misunderstood something ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to