kbendick commented on a change in pull request #1393:
URL: https://github.com/apache/iceberg/pull/1393#discussion_r479710309



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+    String location = null;
+    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+      if ("location".equalsIgnoreCase(entry.getKey())) {
+        location = entry.getValue();
+      } else {
+        properties.put(entry.getKey(), entry.getValue());

Review comment:
       Should `location` still be placed in the table properties or will that 
cause some kind of conflict / error?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+    String location = null;
+    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+      if ("location".equalsIgnoreCase(entry.getKey())) {
+        location = entry.getValue();
+      } else {
+        properties.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    try {
+      icebergCatalog.createTable(
+          toIdentifier(tablePath),
+          icebergSchema,
+          spec,
+          location,
+          properties.build());
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistException(getName(), tablePath, e);
+    }
   }
 
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support alterTable now.");
+      throws CatalogException, TableNotExistException {
+    validateFlinkTable(newTable);
+    Table icebergTable = getIcebergTable(tablePath);
+    CatalogTable table = toCatalogTable(icebergTable);
+
+    // Currently, Flink SQL only support altering table properties.
+
+    if (!table.getSchema().equals(newTable.getSchema())) {
+      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+    }
+
+    if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+      throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
+    }
+
+    Map<String, String> oldOptions = table.getOptions();
+    Map<String, String> setProperties = Maps.newHashMap();
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (Objects.equals(value, oldOptions.get(key))) {
+        continue;
+      }
+
+      if ("location".equalsIgnoreCase(key)) {
+        setLocation = value;
+      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+        setSnapshotId = value;
+      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+        pickSnapshotId = value;
+      } else {
+        setProperties.put(key, value);
+      }
+    }
+
+    oldOptions.keySet().forEach(k -> {
+      if (!newTable.getOptions().containsKey(k)) {
+        setProperties.put(k, null);
+      }
+    });
+
+    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+  }
+
+  private static void validateFlinkTable(CatalogBaseTable table) {
+    Preconditions.checkArgument(table instanceof CatalogTable, "The Table 
should be a CatalogTable.");
+
+    TableSchema schema = table.getSchema();
+    schema.getTableColumns().forEach(column -> {
+      if (column.isGenerated()) {
+        throw new UnsupportedOperationException("Creating table with computed 
columns is not supported yet.");
+      }
+    });
+
+    if (!schema.getWatermarkSpecs().isEmpty()) {
+      throw new UnsupportedOperationException("Creating table with watermark 
specs is not supported yet.");
+    }
+
+    if (schema.getPrimaryKey().isPresent()) {
+      throw new UnsupportedOperationException("Creating table with primary key 
is not supported yet.");
+    }
+  }
+
+  private static PartitionSpec toPartitionSpec(List<String> partitionKeys, 
Schema icebergSchema) {
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+    partitionKeys.forEach(builder::identity);
+    return builder.build();
+  }
+
+  private static List<String> toPartitionKeys(PartitionSpec spec, Schema 
icebergSchema) {
+    List<String> partitionKeys = Lists.newArrayList();
+    for (PartitionField field : spec.fields()) {
+      if (field.transform().isIdentity()) {
+        partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+      } else {
+        // Not created by Flink SQL.
+        // For compatibility with iceberg tables, return empty.
+        // TODO modify this after Flink support partition transform.
+        return Collections.emptyList();

Review comment:
       To me, it seems like adding all of the identity fields (but not the 
transformed fields) would likely be incorrect. Although returning an empty list 
when the table is partitioned seems like a possible correctness bug to me too.
   
   Should we consider throwing an exception in this case instead until such a 
time that Flink supports partition transforms?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+    String location = null;
+    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+      if ("location".equalsIgnoreCase(entry.getKey())) {
+        location = entry.getValue();
+      } else {
+        properties.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    try {
+      icebergCatalog.createTable(
+          toIdentifier(tablePath),
+          icebergSchema,
+          spec,
+          location,
+          properties.build());
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistException(getName(), tablePath, e);
+    }
   }
 
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support alterTable now.");
+      throws CatalogException, TableNotExistException {
+    validateFlinkTable(newTable);
+    Table icebergTable = getIcebergTable(tablePath);
+    CatalogTable table = toCatalogTable(icebergTable);
+
+    // Currently, Flink SQL only support altering table properties.
+
+    if (!table.getSchema().equals(newTable.getSchema())) {
+      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+    }
+
+    if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+      throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
+    }
+
+    Map<String, String> oldOptions = table.getOptions();
+    Map<String, String> setProperties = Maps.newHashMap();
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (Objects.equals(value, oldOptions.get(key))) {
+        continue;
+      }
+
+      if ("location".equalsIgnoreCase(key)) {
+        setLocation = value;
+      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+        setSnapshotId = value;
+      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+        pickSnapshotId = value;
+      } else {
+        setProperties.put(key, value);
+      }
+    }
+
+    oldOptions.keySet().forEach(k -> {
+      if (!newTable.getOptions().containsKey(k)) {
+        setProperties.put(k, null);
+      }
+    });
+
+    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+  }
+
+  private static void validateFlinkTable(CatalogBaseTable table) {
+    Preconditions.checkArgument(table instanceof CatalogTable, "The Table 
should be a CatalogTable.");

Review comment:
       `CatalogTable` is a subinterface that inherits from `CatalogBaseTable`. 
So definitely, yes.
   
   See the java docs on the current `CatalogBaseTable` in Flink:
   
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/catalog/CatalogBaseTable.html

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+    String location = null;
+    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+      if ("location".equalsIgnoreCase(entry.getKey())) {
+        location = entry.getValue();
+      } else {
+        properties.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    try {
+      icebergCatalog.createTable(
+          toIdentifier(tablePath),
+          icebergSchema,
+          spec,
+          location,
+          properties.build());
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistException(getName(), tablePath, e);
+    }
   }
 
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support alterTable now.");
+      throws CatalogException, TableNotExistException {
+    validateFlinkTable(newTable);
+    Table icebergTable = getIcebergTable(tablePath);
+    CatalogTable table = toCatalogTable(icebergTable);
+
+    // Currently, Flink SQL only support altering table properties.
+
+    if (!table.getSchema().equals(newTable.getSchema())) {
+      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+    }
+
+    if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+      throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
+    }
+
+    Map<String, String> oldOptions = table.getOptions();
+    Map<String, String> setProperties = Maps.newHashMap();
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (Objects.equals(value, oldOptions.get(key))) {
+        continue;
+      }
+
+      if ("location".equalsIgnoreCase(key)) {
+        setLocation = value;
+      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+        setSnapshotId = value;
+      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+        pickSnapshotId = value;
+      } else {
+        setProperties.put(key, value);
+      }
+    }
+
+    oldOptions.keySet().forEach(k -> {
+      if (!newTable.getOptions().containsKey(k)) {
+        setProperties.put(k, null);
+      }
+    });
+
+    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+  }
+
+  private static void validateFlinkTable(CatalogBaseTable table) {
+    Preconditions.checkArgument(table instanceof CatalogTable, "The Table 
should be a CatalogTable.");
+
+    TableSchema schema = table.getSchema();
+    schema.getTableColumns().forEach(column -> {
+      if (column.isGenerated()) {
+        throw new UnsupportedOperationException("Creating table with computed 
columns is not supported yet.");
+      }
+    });
+
+    if (!schema.getWatermarkSpecs().isEmpty()) {
+      throw new UnsupportedOperationException("Creating table with watermark 
specs is not supported yet.");
+    }
+
+    if (schema.getPrimaryKey().isPresent()) {
+      throw new UnsupportedOperationException("Creating table with primary key 
is not supported yet.");
+    }
+  }
+
+  private static PartitionSpec toPartitionSpec(List<String> partitionKeys, 
Schema icebergSchema) {
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+    partitionKeys.forEach(builder::identity);
+    return builder.build();
+  }
+
+  private static List<String> toPartitionKeys(PartitionSpec spec, Schema 
icebergSchema) {
+    List<String> partitionKeys = Lists.newArrayList();
+    for (PartitionField field : spec.fields()) {
+      if (field.transform().isIdentity()) {
+        partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+      } else {
+        // Not created by Flink SQL.
+        // For compatibility with iceberg tables, return empty.
+        // TODO modify this after Flink support partition transform.
+        return Collections.emptyList();
+      }
+    }
+    return partitionKeys;
+  }
+
+  private static void commitChanges(Table table, String setLocation, String 
setSnapshotId,
+                                    String pickSnapshotId, Map<String, String> 
setProperties) {
+    // don't allow setting the snapshot and picking a commit at the same time 
because order is ambiguous and choosing
+    // one order leads to different results
+    Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == 
null,
+        "Cannot set the current the current snapshot ID and cherry-pick 
snapshot changes");

Review comment:
       Nit: Duplication of the words `the current` in the Preconditions string. 
It currently reads `Cannot set the current the current`.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+    String location = null;
+    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+      if ("location".equalsIgnoreCase(entry.getKey())) {
+        location = entry.getValue();
+      } else {
+        properties.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    try {
+      icebergCatalog.createTable(
+          toIdentifier(tablePath),
+          icebergSchema,
+          spec,
+          location,
+          properties.build());
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistException(getName(), tablePath, e);
+    }
   }
 
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support alterTable now.");
+      throws CatalogException, TableNotExistException {
+    validateFlinkTable(newTable);
+    Table icebergTable = getIcebergTable(tablePath);
+    CatalogTable table = toCatalogTable(icebergTable);
+
+    // Currently, Flink SQL only support altering table properties.
+
+    if (!table.getSchema().equals(newTable.getSchema())) {
+      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+    }
+
+    if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+      throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
+    }
+
+    Map<String, String> oldOptions = table.getOptions();
+    Map<String, String> setProperties = Maps.newHashMap();
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (Objects.equals(value, oldOptions.get(key))) {
+        continue;
+      }
+
+      if ("location".equalsIgnoreCase(key)) {
+        setLocation = value;
+      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+        setSnapshotId = value;
+      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+        pickSnapshotId = value;
+      } else {
+        setProperties.put(key, value);
+      }
+    }
+
+    oldOptions.keySet().forEach(k -> {
+      if (!newTable.getOptions().containsKey(k)) {
+        setProperties.put(k, null);
+      }
+    });
+
+    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+  }
+
+  private static void validateFlinkTable(CatalogBaseTable table) {
+    Preconditions.checkArgument(table instanceof CatalogTable, "The Table 
should be a CatalogTable.");
+
+    TableSchema schema = table.getSchema();
+    schema.getTableColumns().forEach(column -> {
+      if (column.isGenerated()) {
+        throw new UnsupportedOperationException("Creating table with computed 
columns is not supported yet.");
+      }
+    });
+
+    if (!schema.getWatermarkSpecs().isEmpty()) {
+      throw new UnsupportedOperationException("Creating table with watermark 
specs is not supported yet.");
+    }
+
+    if (schema.getPrimaryKey().isPresent()) {

Review comment:
       I found this which might answer your question: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
   
   In particular, here are the _proposed_ changes:
   
   ```
   Proposed Changes
   We suggest to introduce the concept of primary key constraint as a hint for 
FLINK to leverage for optimizations.
   
   Primary key constraints tell that a column or a set of columns of a table or 
a view are unique and they do not contain null.
   Neither of columns in a primary can be nullable.
   Primary key therefore uniquely identify a row in a table.
   ```
   
   So it sounds just like an RDBMS primary key.
   
   Note however, that even in the FLIP (which is just the proposal and not 
necessarily the finished product), it does state that there's no planned 
enforcement on the PK. It's up to the user to ensure that the PK is non-null 
and unique.
   
   ```
   Primary key validity checks
   SQL standard specifies that a constraint can either be ENFORCED or NOT 
ENFORCED.
   This controls if the constraint checks are performed on the 
incoming/outgoing data.
   Flink does not own the data therefore the only mode we want to support is 
the NOT ENFORCED mode.
   Its up to the user to ensure that the query enforces key integrity.
   ```
   
   So I agree here that throwing might be the most useful option and that 
there's likely nothing on the iceberg side to be added to enforce this as Flink 
doesn't enforce it either. In an entirely streaming setting, ensuring unique 
keys would be rather difficult and so to me it somewhat sounds like the PK is 
just more metadata that could very well be in TBLPROPERTIES.
   
   But a more experienced Flink SQL user than myself might have more to say on 
the matter. I've never attempted to enforce a PK when using Flink SQL. Sounds 
like the work to do so would involve custom operators etc.
   
   TLDR: The Primary Key is just a constraint, which is currently part of 
Flink's Table spec but goes unenforced and is up to the user. It does not 
appear as though the PK info is supported in any UpsertSinks etc, though that 
may be discussed / planned in the future. Support in the DDL for Primary Key 
constraints is relatively new (Flink 1.11 / current, with support in the API 
coming in at Flink 1.10).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to