openinx commented on a change in pull request #1393:
URL: https://github.com/apache/iceberg/pull/1393#discussion_r479985290



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -277,20 +287,30 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
   }
 
   @Override
-  public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
-    try {
-      Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
-      TableSchema tableSchema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+  public CatalogTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+    Table table = loadIcebergTable(tablePath);
+    return toCatalogTable(table);
+  }
 
-      // NOTE: We can not create a IcebergCatalogTable, because Flink 
optimizer may use CatalogTableImpl to copy a new
-      // catalog table.
-      // Let's re-loading table from Iceberg catalog when creating source/sink 
operators.
-      return new CatalogTableImpl(tableSchema, table.properties(), null);
+  private Table loadIcebergTable(ObjectPath tablePath) throws 
TableNotExistException {
+    try {
+      return icebergCatalog.loadTable(toIdentifier(tablePath));
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       throw new TableNotExistException(getName(), tablePath, e);
     }
   }
 
+  private CatalogTable toCatalogTable(Table table) {
+    TableSchema schema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));

Review comment:
       nit: how about making the iceberg schema -> flink `TableSchema` 
conversion to be a static method inside FlinkSchemaUtil ?  The table sink pr 
https://github.com/apache/iceberg/pull/1348 will also depend on this static 
method 
(https://github.com/apache/iceberg/pull/1348/files#diff-0ad7dfff9cfa32fbb760796d976fd650R50).
 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +340,169 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+    String location = null;
+    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+      if ("location".equalsIgnoreCase(entry.getKey())) {
+        location = entry.getValue();
+      } else {
+        properties.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    try {
+      icebergCatalog.createTable(
+          toIdentifier(tablePath),
+          icebergSchema,
+          spec,
+          location,
+          properties.build());
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistException(getName(), tablePath, e);
+    }
   }
 
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support alterTable now.");
+      throws CatalogException, TableNotExistException {
+    validateFlinkTable(newTable);
+    Table icebergTable = loadIcebergTable(tablePath);
+    CatalogTable table = toCatalogTable(icebergTable);
+
+    // Currently, Flink SQL only support altering table properties.
+
+    // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by comparing
+    // CatalogTable instances, unless the Flink schema contains Iceberg column 
IDs.
+    if (!table.getSchema().equals(newTable.getSchema())) {
+      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+    }
+
+    if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+      throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
+    }
+
+    Map<String, String> oldOptions = table.getOptions();
+    Map<String, String> setProperties = Maps.newHashMap();
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (Objects.equals(value, oldOptions.get(key))) {
+        continue;
+      }
+
+      if ("location".equalsIgnoreCase(key)) {
+        setLocation = value;
+      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+        setSnapshotId = value;
+      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+        pickSnapshotId = value;
+      } else {
+        setProperties.put(key, value);
+      }
+    }
+
+    oldOptions.keySet().forEach(k -> {
+      if (!newTable.getOptions().containsKey(k)) {
+        setProperties.put(k, null);
+      }
+    });
+
+    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+  }
+
+  private static void validateFlinkTable(CatalogBaseTable table) {
+    Preconditions.checkArgument(table instanceof CatalogTable, "The Table 
should be a CatalogTable.");
+
+    TableSchema schema = table.getSchema();
+    schema.getTableColumns().forEach(column -> {
+      if (column.isGenerated()) {
+        throw new UnsupportedOperationException("Creating table with computed 
columns is not supported yet.");
+      }
+    });
+
+    if (!schema.getWatermarkSpecs().isEmpty()) {
+      throw new UnsupportedOperationException("Creating table with watermark 
specs is not supported yet.");
+    }
+
+    if (schema.getPrimaryKey().isPresent()) {
+      throw new UnsupportedOperationException("Creating table with primary key 
is not supported yet.");
+    }
+  }
+

Review comment:
       Do we need to add a TODO indicating that we flink only support 
`identity` partition now but will support hidden column future  ? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +340,169 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+    String location = null;
+    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+      if ("location".equalsIgnoreCase(entry.getKey())) {
+        location = entry.getValue();
+      } else {
+        properties.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    try {
+      icebergCatalog.createTable(
+          toIdentifier(tablePath),
+          icebergSchema,
+          spec,
+          location,
+          properties.build());
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistException(getName(), tablePath, e);
+    }
   }
 
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support alterTable now.");
+      throws CatalogException, TableNotExistException {
+    validateFlinkTable(newTable);
+    Table icebergTable = loadIcebergTable(tablePath);
+    CatalogTable table = toCatalogTable(icebergTable);
+
+    // Currently, Flink SQL only support altering table properties.
+
+    // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by comparing
+    // CatalogTable instances, unless the Flink schema contains Iceberg column 
IDs.
+    if (!table.getSchema().equals(newTable.getSchema())) {
+      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+    }
+
+    if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+      throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
+    }
+
+    Map<String, String> oldOptions = table.getOptions();
+    Map<String, String> setProperties = Maps.newHashMap();
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (Objects.equals(value, oldOptions.get(key))) {
+        continue;
+      }
+
+      if ("location".equalsIgnoreCase(key)) {
+        setLocation = value;
+      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+        setSnapshotId = value;
+      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+        pickSnapshotId = value;
+      } else {
+        setProperties.put(key, value);
+      }
+    }
+
+    oldOptions.keySet().forEach(k -> {
+      if (!newTable.getOptions().containsKey(k)) {
+        setProperties.put(k, null);
+      }
+    });
+
+    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+  }
+
+  private static void validateFlinkTable(CatalogBaseTable table) {
+    Preconditions.checkArgument(table instanceof CatalogTable, "The Table 
should be a CatalogTable.");
+
+    TableSchema schema = table.getSchema();
+    schema.getTableColumns().forEach(column -> {
+      if (column.isGenerated()) {
+        throw new UnsupportedOperationException("Creating table with computed 
columns is not supported yet.");
+      }
+    });
+
+    if (!schema.getWatermarkSpecs().isEmpty()) {
+      throw new UnsupportedOperationException("Creating table with watermark 
specs is not supported yet.");
+    }
+
+    if (schema.getPrimaryKey().isPresent()) {
+      throw new UnsupportedOperationException("Creating table with primary key 
is not supported yet.");
+    }
+  }
+
+  private static PartitionSpec toPartitionSpec(List<String> partitionKeys, 
Schema icebergSchema) {
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+    partitionKeys.forEach(builder::identity);
+    return builder.build();
+  }
+
+  private static List<String> toPartitionKeys(PartitionSpec spec, Schema 
icebergSchema) {
+    List<String> partitionKeys = Lists.newArrayList();
+    for (PartitionField field : spec.fields()) {
+      if (field.transform().isIdentity()) {
+        partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+      } else {
+        // Not created by Flink SQL.
+        // For compatibility with iceberg tables, return empty.
+        // TODO modify this after Flink support partition transform.
+        return Collections.emptyList();
+      }
+    }
+    return partitionKeys;
+  }
+
+  private static void commitChanges(Table table, String setLocation, String 
setSnapshotId,
+                                    String pickSnapshotId, Map<String, String> 
setProperties) {
+    // don't allow setting the snapshot and picking a commit at the same time 
because order is ambiguous and choosing
+    // one order leads to different results
+    Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == 
null,
+        "Cannot set the current snapshot ID and cherry-pick snapshot changes");
+
+    if (setSnapshotId != null) {
+      long newSnapshotId = Long.parseLong(setSnapshotId);
+      table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+    }
+
+    // if updating the table snapshot, perform that update first in case it 
fails
+    if (pickSnapshotId != null) {
+      long newSnapshotId = Long.parseLong(pickSnapshotId);
+      table.manageSnapshots().cherrypick(newSnapshotId).commit();
+    }
+
+    Transaction transaction = table.newTransaction();
+
+    if (setLocation != null) {
+      transaction.updateLocation()
+          .setLocation(setLocation)
+          .commit();
+    }
+
+    if (!setProperties.isEmpty()) {
+      UpdateProperties updateProperties = transaction.updateProperties();
+      setProperties.forEach((k, v) -> {
+        if (v == null) {

Review comment:
       The `v` should never be null in `HashMap` ? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -277,20 +286,29 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
   }
 
   @Override
-  public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
-    try {
-      Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
-      TableSchema tableSchema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+  public CatalogTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+    Table table = getIcebergTable(tablePath);
+    return toCatalogTable(table);
+  }
 
-      // NOTE: We can not create a IcebergCatalogTable, because Flink 
optimizer may use CatalogTableImpl to copy a new
-      // catalog table.
-      // Let's re-loading table from Iceberg catalog when creating source/sink 
operators.
-      return new CatalogTableImpl(tableSchema, table.properties(), null);
+  private Table getIcebergTable(ObjectPath tablePath) throws 
TableNotExistException {
+    try {
+      return icebergCatalog.loadTable(toIdentifier(tablePath));
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       throw new TableNotExistException(getName(), tablePath, e);
     }
   }
 
+  private CatalogTable toCatalogTable(Table table) {
+    TableSchema schema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+    List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
+
+    // NOTE: We can not create a IcebergCatalogTable, because Flink optimizer 
may use CatalogTableImpl to copy a new

Review comment:
       Got it,  maybe could write this comment more clear. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +338,158 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+    Map<String, String> options = Maps.newHashMap(table.getOptions());
+
+    try {
+      icebergCatalog.createTable(
+          toIdentifier(tablePath),
+          icebergSchema,
+          spec,
+          options.get("location"),
+          options);
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistException(getName(), tablePath, e);
+    }
   }
 
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support alterTable now.");
+      throws CatalogException, TableNotExistException {
+    validateFlinkTable(newTable);
+    Table icebergTable = getIcebergTable(tablePath);
+    CatalogTable table = toCatalogTable(icebergTable);
+
+    // Currently, Flink SQL only support altering table properties.
+
+    if (!table.getSchema().equals(newTable.getSchema())) {
+      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+    }
+
+    if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+      throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
+    }
+
+    Map<String, String> oldOptions = table.getOptions();
+    Map<String, String> setProperties = Maps.newHashMap();
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (Objects.equals(value, oldOptions.get(key))) {
+        continue;
+      }
+
+      if ("location".equalsIgnoreCase(key)) {
+        setLocation = value;
+      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+        setSnapshotId = value;
+      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+        pickSnapshotId = value;
+      } else {
+        setProperties.put(key, value);
+      }
+    }
+
+    oldOptions.keySet().forEach(k -> {
+      if (!newTable.getOptions().containsKey(k)) {
+        setProperties.put(k, null);

Review comment:
       `setProperties.put(k, null)` ?  The javadoc from Map said : 
   
   ```java
    * @throws NullPointerException if the specified key or value is null
        *         and this map does not permit null keys or values
        * @throws IllegalArgumentException if some property of the specified key
        *         or value prevents it from being stored in this map
        */
       V put(K key, V value);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to