Initial-neko commented on a change in pull request #3614:
URL: https://github.com/apache/iceberg/pull/3614#discussion_r767217774



##########
File path: 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -422,7 +440,7 @@ public void alterTable(ObjectPath tablePath, 
CatalogBaseTable newTable, boolean
 
     // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by comparing
     // CatalogTable instances, unless the Flink schema contains Iceberg column 
IDs.
-    if (!table.getSchema().equals(newTable.getSchema())) {
+    if (!schemaCompare(table.getUnresolvedSchema(), 
newTable.getUnresolvedSchema())) {

Review comment:
       because I found getSchema Deprecated and the fields to be compared are 
same

##########
File path: 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -400,6 +400,24 @@ void createIcebergTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
     }
   }
 
+  // skip the AbstractConstraint comparison

Review comment:
       that's right, I'll write the corresponding explanation

##########
File path: 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -400,6 +400,24 @@ void createIcebergTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
     }
   }
 
+  // skip the AbstractConstraint comparison
+  public boolean schemaCompare(org.apache.flink.table.api.Schema rawSchema,
+                               org.apache.flink.table.api.Schema newSchema) {
+
+    org.apache.flink.table.api.Schema.UnresolvedPrimaryKey rawPK = 
rawSchema.getPrimaryKey().orElse(null);
+
+    org.apache.flink.table.api.Schema.UnresolvedPrimaryKey newPK = 
newSchema.getPrimaryKey().orElse(null);
+    if ((rawPK == null && newPK != null) || (rawPK != null && newPK == null)) {
+      return false;
+    }
+    boolean exp = rawSchema.getColumns().equals(newSchema.getColumns()) &&
+            
rawSchema.getWatermarkSpecs().equals(newSchema.getWatermarkSpecs());
+    boolean exp2 = rawPK != null && newPK != null ?
+            rawPK.getColumnNames().equals(newPK.getColumnNames())
+            : true;

Review comment:
       I also think it needs to be retained because it is easier to understand
   if(rawPK == null && newPK == null)   this situation always return true  
   and other situation needs to calculate 
rawPK.getColumnNames().equals(newPK.getColumnNames()) 
   via 
   ```
   if ((rawPK == null && newPK != null) || (rawPK != null && newPK == null)) {
         return false;
       }
   ```
   maybe compiler know that rawPK and newPK must be double null or not
   So When the code reaches here, if rawPK is null, newPK must be null
   exp2 can simplify like this
   ```
   exp2 = rawPK != null?
               rawPK.getColumnNames().equals(newPK.getColumnNames())
               : true;
   ```
   boolean exp2 = rawPK == null || 
rawPK.getColumnNames().equals(newPK.getColumnNames());
   this result is ok :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to