openinx commented on a change in pull request #1393:
URL: https://github.com/apache/iceberg/pull/1393#discussion_r479985290
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -277,20 +287,30 @@ public void alterDatabase(String name, CatalogDatabase
newDatabase, boolean igno
}
@Override
- public CatalogBaseTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
- try {
- Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
- TableSchema tableSchema =
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+ public CatalogTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
+ Table table = loadIcebergTable(tablePath);
+ return toCatalogTable(table);
+ }
- // NOTE: We can not create a IcebergCatalogTable, because Flink
optimizer may use CatalogTableImpl to copy a new
- // catalog table.
- // Let's re-loading table from Iceberg catalog when creating source/sink
operators.
- return new CatalogTableImpl(tableSchema, table.properties(), null);
+ private Table loadIcebergTable(ObjectPath tablePath) throws
TableNotExistException {
+ try {
+ return icebergCatalog.loadTable(toIdentifier(tablePath));
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new TableNotExistException(getName(), tablePath, e);
}
}
+ private CatalogTable toCatalogTable(Table table) {
+ TableSchema schema =
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
Review comment:
nit: how about making the iceberg schema -> flink `TableSchema`
conversion to be a static method inside FlinkSchemaUtil ? The table sink pr
https://github.com/apache/iceberg/pull/1348 will also depend on this static
method
(https://github.com/apache/iceberg/pull/1348/files#diff-0ad7dfff9cfa32fbb760796d976fd650R50).
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +340,169 @@ public void renameTable(ObjectPath tablePath, String
newTableName, boolean ignor
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support alterTable now.");
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+ Table icebergTable = loadIcebergTable(tablePath);
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ // For current Flink Catalog API, support for adding/removing/renaming
columns cannot be done by comparing
+ // CatalogTable instances, unless the Flink schema contains Iceberg column
IDs.
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not
supported yet.");
+ }
+
+ Map<String, String> oldOptions = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldOptions.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldOptions.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId,
setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table
should be a CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema.getTableColumns().forEach(column -> {
+ if (column.isGenerated()) {
+ throw new UnsupportedOperationException("Creating table with computed
columns is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException("Creating table with watermark
specs is not supported yet.");
+ }
+
+ if (schema.getPrimaryKey().isPresent()) {
+ throw new UnsupportedOperationException("Creating table with primary key
is not supported yet.");
+ }
+ }
+
Review comment:
Do we need to add a TODO indicating that we flink only support
`identity` partition now but will support hidden column future ?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +340,169 @@ public void renameTable(ObjectPath tablePath, String
newTableName, boolean ignor
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support alterTable now.");
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+ Table icebergTable = loadIcebergTable(tablePath);
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ // For current Flink Catalog API, support for adding/removing/renaming
columns cannot be done by comparing
+ // CatalogTable instances, unless the Flink schema contains Iceberg column
IDs.
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not
supported yet.");
+ }
+
+ Map<String, String> oldOptions = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldOptions.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldOptions.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId,
setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table
should be a CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema.getTableColumns().forEach(column -> {
+ if (column.isGenerated()) {
+ throw new UnsupportedOperationException("Creating table with computed
columns is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException("Creating table with watermark
specs is not supported yet.");
+ }
+
+ if (schema.getPrimaryKey().isPresent()) {
+ throw new UnsupportedOperationException("Creating table with primary key
is not supported yet.");
+ }
+ }
+
+ private static PartitionSpec toPartitionSpec(List<String> partitionKeys,
Schema icebergSchema) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+ partitionKeys.forEach(builder::identity);
+ return builder.build();
+ }
+
+ private static List<String> toPartitionKeys(PartitionSpec spec, Schema
icebergSchema) {
+ List<String> partitionKeys = Lists.newArrayList();
+ for (PartitionField field : spec.fields()) {
+ if (field.transform().isIdentity()) {
+ partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+ } else {
+ // Not created by Flink SQL.
+ // For compatibility with iceberg tables, return empty.
+ // TODO modify this after Flink support partition transform.
+ return Collections.emptyList();
+ }
+ }
+ return partitionKeys;
+ }
+
+ private static void commitChanges(Table table, String setLocation, String
setSnapshotId,
+ String pickSnapshotId, Map<String, String>
setProperties) {
+ // don't allow setting the snapshot and picking a commit at the same time
because order is ambiguous and choosing
+ // one order leads to different results
+ Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId ==
null,
+ "Cannot set the current snapshot ID and cherry-pick snapshot changes");
+
+ if (setSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(setSnapshotId);
+ table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+ }
+
+ // if updating the table snapshot, perform that update first in case it
fails
+ if (pickSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(pickSnapshotId);
+ table.manageSnapshots().cherrypick(newSnapshotId).commit();
+ }
+
+ Transaction transaction = table.newTransaction();
+
+ if (setLocation != null) {
+ transaction.updateLocation()
+ .setLocation(setLocation)
+ .commit();
+ }
+
+ if (!setProperties.isEmpty()) {
+ UpdateProperties updateProperties = transaction.updateProperties();
+ setProperties.forEach((k, v) -> {
+ if (v == null) {
Review comment:
The `v` should never be null in `HashMap` ?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -277,20 +286,29 @@ public void alterDatabase(String name, CatalogDatabase
newDatabase, boolean igno
}
@Override
- public CatalogBaseTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
- try {
- Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
- TableSchema tableSchema =
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+ public CatalogTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
+ Table table = getIcebergTable(tablePath);
+ return toCatalogTable(table);
+ }
- // NOTE: We can not create a IcebergCatalogTable, because Flink
optimizer may use CatalogTableImpl to copy a new
- // catalog table.
- // Let's re-loading table from Iceberg catalog when creating source/sink
operators.
- return new CatalogTableImpl(tableSchema, table.properties(), null);
+ private Table getIcebergTable(ObjectPath tablePath) throws
TableNotExistException {
+ try {
+ return icebergCatalog.loadTable(toIdentifier(tablePath));
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new TableNotExistException(getName(), tablePath, e);
}
}
+ private CatalogTable toCatalogTable(Table table) {
+ TableSchema schema =
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+ List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
+
+ // NOTE: We can not create a IcebergCatalogTable, because Flink optimizer
may use CatalogTableImpl to copy a new
Review comment:
Got it, maybe could write this comment more clear.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -320,19 +338,158 @@ public void renameTable(ObjectPath tablePath, String
newTableName, boolean ignor
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+ Map<String, String> options = Maps.newHashMap(table.getOptions());
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ options.get("location"),
+ options);
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support alterTable now.");
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+ Table icebergTable = getIcebergTable(tablePath);
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not
supported yet.");
+ }
+
+ Map<String, String> oldOptions = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldOptions.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldOptions.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
Review comment:
`setProperties.put(k, null)` ? The javadoc from Map said :
```java
* @throws NullPointerException if the specified key or value is null
* and this map does not permit null keys or values
* @throws IllegalArgumentException if some property of the specified key
* or value prevents it from being stored in this map
*/
V put(K key, V value);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]