kbendick commented on a change in pull request #1393:
URL: https://github.com/apache/iceberg/pull/1393#discussion_r479710309
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String
newTableName, boolean ignor
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
Review comment:
Should `location` still be placed in the table properties or will that
cause some kind of conflict / error?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String
newTableName, boolean ignor
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support alterTable now.");
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+ Table icebergTable = getIcebergTable(tablePath);
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not
supported yet.");
+ }
+
+ Map<String, String> oldOptions = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldOptions.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldOptions.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId,
setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table
should be a CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema.getTableColumns().forEach(column -> {
+ if (column.isGenerated()) {
+ throw new UnsupportedOperationException("Creating table with computed
columns is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException("Creating table with watermark
specs is not supported yet.");
+ }
+
+ if (schema.getPrimaryKey().isPresent()) {
+ throw new UnsupportedOperationException("Creating table with primary key
is not supported yet.");
+ }
+ }
+
+ private static PartitionSpec toPartitionSpec(List<String> partitionKeys,
Schema icebergSchema) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+ partitionKeys.forEach(builder::identity);
+ return builder.build();
+ }
+
+ private static List<String> toPartitionKeys(PartitionSpec spec, Schema
icebergSchema) {
+ List<String> partitionKeys = Lists.newArrayList();
+ for (PartitionField field : spec.fields()) {
+ if (field.transform().isIdentity()) {
+ partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+ } else {
+ // Not created by Flink SQL.
+ // For compatibility with iceberg tables, return empty.
+ // TODO modify this after Flink support partition transform.
+ return Collections.emptyList();
Review comment:
To me, it seems like adding all of the identity fields (but not the
transformed fields) would likely be incorrect. Although returning an empty list
when the table is partitioned seems like a possible correctness bug to me too.
Should we consider throwing an exception in this case instead until such a
time that Flink supports partition transforms?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String
newTableName, boolean ignor
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support alterTable now.");
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+ Table icebergTable = getIcebergTable(tablePath);
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not
supported yet.");
+ }
+
+ Map<String, String> oldOptions = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldOptions.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldOptions.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId,
setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table
should be a CatalogTable.");
Review comment:
`CatalogTable` is a subinterface that inherits from `CatalogBaseTable`.
So definitely, yes.
See the java docs on the current `CatalogBaseTable` in Flink:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/catalog/CatalogBaseTable.html
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String
newTableName, boolean ignor
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support alterTable now.");
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+ Table icebergTable = getIcebergTable(tablePath);
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not
supported yet.");
+ }
+
+ Map<String, String> oldOptions = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldOptions.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldOptions.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId,
setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table
should be a CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema.getTableColumns().forEach(column -> {
+ if (column.isGenerated()) {
+ throw new UnsupportedOperationException("Creating table with computed
columns is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException("Creating table with watermark
specs is not supported yet.");
+ }
+
+ if (schema.getPrimaryKey().isPresent()) {
+ throw new UnsupportedOperationException("Creating table with primary key
is not supported yet.");
+ }
+ }
+
+ private static PartitionSpec toPartitionSpec(List<String> partitionKeys,
Schema icebergSchema) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+ partitionKeys.forEach(builder::identity);
+ return builder.build();
+ }
+
+ private static List<String> toPartitionKeys(PartitionSpec spec, Schema
icebergSchema) {
+ List<String> partitionKeys = Lists.newArrayList();
+ for (PartitionField field : spec.fields()) {
+ if (field.transform().isIdentity()) {
+ partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+ } else {
+ // Not created by Flink SQL.
+ // For compatibility with iceberg tables, return empty.
+ // TODO modify this after Flink support partition transform.
+ return Collections.emptyList();
+ }
+ }
+ return partitionKeys;
+ }
+
+ private static void commitChanges(Table table, String setLocation, String
setSnapshotId,
+ String pickSnapshotId, Map<String, String>
setProperties) {
+ // don't allow setting the snapshot and picking a commit at the same time
because order is ambiguous and choosing
+ // one order leads to different results
+ Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId ==
null,
+ "Cannot set the current the current snapshot ID and cherry-pick
snapshot changes");
Review comment:
Nit: Duplication of the words `the current` in the Preconditions string.
It currently reads `Cannot set the current the current`.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +339,167 @@ public void renameTable(ObjectPath tablePath, String
newTableName, boolean ignor
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support alterTable now.");
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+ Table icebergTable = getIcebergTable(tablePath);
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not
supported yet.");
+ }
+
+ Map<String, String> oldOptions = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldOptions.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldOptions.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId,
setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table
should be a CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema.getTableColumns().forEach(column -> {
+ if (column.isGenerated()) {
+ throw new UnsupportedOperationException("Creating table with computed
columns is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException("Creating table with watermark
specs is not supported yet.");
+ }
+
+ if (schema.getPrimaryKey().isPresent()) {
Review comment:
I found this which might answer your question:
https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
In particular, here are the _proposed_ changes:
```
Proposed Changes
We suggest to introduce the concept of primary key constraint as a hint for
FLINK to leverage for optimizations.
Primary key constraints tell that a column or a set of columns of a table or
a view are unique and they do not contain null.
Neither of columns in a primary can be nullable.
Primary key therefore uniquely identify a row in a table.
```
So it sounds just like an RDBMS primary key.
Note however, that even in the FLIP (which is just the proposal and not
necessarily the finished product), it does state that there's no planned
enforcement on the PK. It's up to the user to ensure that the PK is non-null
and unique.
```
Primary key validity checks
SQL standard specifies that a constraint can either be ENFORCED or NOT
ENFORCED.
This controls if the constraint checks are performed on the
incoming/outgoing data.
Flink does not own the data therefore the only mode we want to support is
the NOT ENFORCED mode.
Its up to the user to ensure that the query enforces key integrity.
```
So I agree here that throwing might be the most useful option and that
there's likely nothing on the iceberg side to be added to enforce this as Flink
doesn't enforce it either. In an entirely streaming setting, ensuring unique
keys would be rather difficult and so to me it somewhat sounds like the PK is
just more metadata that could very well be in TBLPROPERTIES.
But a more experienced Flink SQL user than myself might have more to say on
the matter. I've never attempted to enforce a PK when using Flink SQL. Sounds
like the work to do so would involve custom operators etc.
TLDR: The Primary Key is just a constraint, which is currently part of
Flink's Table spec but goes unenforced and is up to the user. It does not
appear as though the PK info is supported in any UpsertSinks etc, though that
may be discussed / planned in the future. Support in the DDL for Primary Key
constraints is relatively new (Flink 1.11 / current, with support in the API
coming in at Flink 1.10).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]