This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9a0fc49481 AWS: Retain Glue Catalog column comment (#10276)
9a0fc49481 is described below
commit 9a0fc49481c232bf0eb149fa189d875cded6a231
Author: Sotaro Hikita <[email protected]>
AuthorDate: Fri Jul 5 15:46:24 2024 +0900
AWS: Retain Glue Catalog column comment (#10276)
* Retain Glue Catalog column comment
* merge handling existing column comment to creating column set
* apply spotless
* Add newline after else block for improved readability
---
.../org/apache/iceberg/aws/glue/GlueTestBase.java | 36 ++++++
.../iceberg/aws/glue/TestGlueCatalogTable.java | 129 +++++++++++++++++++++
.../iceberg/aws/glue/GlueTableOperations.java | 6 +-
.../iceberg/aws/glue/IcebergToGlueConverter.java | 76 ++++++++++--
.../aws/glue/TestIcebergToGlueConverter.java | 78 +++++++++++++
5 files changed, 310 insertions(+), 15 deletions(-)
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
index ed3a235eb0..aa0c7f1831 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.aws.glue;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
@@ -39,6 +41,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.Table;
@@ -158,4 +161,37 @@ public class GlueTestBase {
.build();
glue.updateTable(request);
}
+
+ public static void updateTableColumns(
+ String namespace, String tableName, Function<Column, Column>
columnUpdater) {
+ GetTableResponse response =
+
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
+ Table existingTable = response.table();
+ List<Column> updatedColumns =
+ existingTable.storageDescriptor().columns().stream()
+ .map(columnUpdater)
+ .collect(Collectors.toList());
+
+ UpdateTableRequest request =
+ UpdateTableRequest.builder()
+ .catalogId(existingTable.catalogId())
+ .databaseName(existingTable.databaseName())
+ .tableInput(
+ TableInput.builder()
+ .description(existingTable.description())
+ .name(existingTable.name())
+ .partitionKeys(existingTable.partitionKeys())
+ .tableType(existingTable.tableType())
+ .owner(existingTable.owner())
+ .parameters(existingTable.parameters())
+ .storageDescriptor(
+ existingTable
+ .storageDescriptor()
+ .toBuilder()
+ .columns(updatedColumns)
+ .build())
+ .build())
+ .build();
+ glue.updateTable(request);
+ }
}
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
index 6dffdb5b92..9c4d1839a4 100644
---
a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
+++
b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
@@ -221,6 +221,69 @@ public class TestGlueCatalogTable extends GlueTestBase {
assertThat(response.table().description()).isEqualTo(updatedComment);
}
+ @Test
+ public void testDropColumn() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ Table table = glueCatalog.loadTable(TableIdentifier.of(namespace,
tableName));
+ table
+ .updateSchema()
+ .addColumn("c2", Types.StringType.get(), "updated from Iceberg API")
+ .addColumn("c3", Types.StringType.get())
+ .commit();
+
+ updateTableColumns(
+ namespace,
+ tableName,
+ column -> {
+ if (column.name().equals("c3")) {
+ return column.toBuilder().comment("updated from Glue API").build();
+ } else {
+ return column;
+ }
+ });
+
+ table.updateSchema().deleteColumn("c2").deleteColumn("c3").commit();
+
+ GetTableResponse response =
+
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
+ List<Column> actualColumns =
response.table().storageDescriptor().columns();
+
+ List<Column> expectedColumns =
+ ImmutableList.of(
+ Column.builder()
+ .name("c1")
+ .type("string")
+ .comment("c1")
+ .parameters(
+ ImmutableMap.of(
+ IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
+ IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
+ IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
+ .build(),
+ Column.builder()
+ .name("c2")
+ .type("string")
+ .comment("updated from Iceberg API")
+ .parameters(
+ ImmutableMap.of(
+ IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
+ IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
+ IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "false"))
+ .build(),
+ Column.builder()
+ .name("c3")
+ .type("string")
+ .comment("updated from Glue API")
+ .parameters(
+ ImmutableMap.of(
+ IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
+ IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
+ IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "false"))
+ .build());
+ assertThat(actualColumns).isEqualTo(expectedColumns);
+ }
+
@Test
public void testRenameTable() {
String namespace = createNamespace();
@@ -514,6 +577,72 @@ public class TestGlueCatalogTable extends GlueTestBase {
assertThat(actualColumns).isEqualTo(expectedColumns);
}
+ @Test
+ public void testGlueTableColumnCommentsPreserved() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ Table table = glueCatalog.loadTable(TableIdentifier.of(namespace,
tableName));
+ table
+ .updateSchema()
+ .addColumn("c2", Types.StringType.get())
+ .addColumn("c3", Types.StringType.get())
+ .commit();
+
+ updateTableColumns(
+ namespace,
+ tableName,
+ column -> {
+ if (column.name().equals("c2") || column.name().equals("c3")) {
+ return column.toBuilder().comment("updated from Glue API").build();
+ } else {
+ return column;
+ }
+ });
+
+ table
+ .updateSchema()
+ .updateColumn("c2", Types.StringType.get(), "updated from Iceberg API")
+ .commit();
+
+ GetTableResponse response =
+
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
+ List<Column> actualColumns =
response.table().storageDescriptor().columns();
+
+ List<Column> expectedColumns =
+ ImmutableList.of(
+ Column.builder()
+ .name("c1")
+ .type("string")
+ .comment("c1")
+ .parameters(
+ ImmutableMap.of(
+ IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
+ IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
+ IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
+ .build(),
+ Column.builder()
+ .name("c2")
+ .type("string")
+ .comment("updated from Iceberg API")
+ .parameters(
+ ImmutableMap.of(
+ IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
+ IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
+ IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
+ .build(),
+ Column.builder()
+ .name("c3")
+ .type("string")
+ .comment("updated from Glue API")
+ .parameters(
+ ImmutableMap.of(
+ IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
+ IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
+ IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
+ .build());
+ assertThat(actualColumns).isEqualTo(expectedColumns);
+ }
+
@Test
public void testTablePropsDefinedAtCatalogLevel() {
String namespace = createNamespace();
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
index aedf785234..4c63dfdb2a 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -316,12 +316,10 @@ class GlueTableOperations extends
BaseMetastoreTableOperations {
.skipArchive(awsProperties.glueCatalogSkipArchive())
.tableInput(
TableInput.builder()
- // Call description before applyMutation so that
applyMutation overwrites the
- // description with the comment specified in the query
- .description(glueTable.description())
.applyMutation(
builder ->
-
IcebergToGlueConverter.setTableInputInformation(builder, metadata))
+ IcebergToGlueConverter.setTableInputInformation(
+ builder, metadata, glueTable))
.name(tableName)
.tableType(GLUE_EXTERNAL_TABLE_TYPE)
.parameters(parameters)
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
index 2c7ed1fe64..56b38a47e9 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.aws.glue;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
class IcebergToGlueConverter {
@@ -219,6 +221,29 @@ class IcebergToGlueConverter {
*/
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata) {
+ setTableInputInformation(tableInputBuilder, metadata, null);
+ }
+
+ /**
+ * Set Glue table input information based on Iceberg table metadata,
optionally preserving
+ * comments from an existing Glue table's columns.
+ *
+ * <p>A best-effort conversion of Iceberg metadata to Glue table is
performed to display Iceberg
+ * information in Glue, but such information is only intended for
informational human read access
+ * through tools like UI or CLI, and should never be used by any query
processing engine to infer
+ * information like schema, partition spec, etc. The source of truth is
stored in the actual
+ * Iceberg metadata file defined by the metadata_location table property.
+ *
+ * <p>If an existing Glue table is provided, the comments from its columns
will be preserved in
+ * the resulting Glue TableInput. This is useful when updating an existing
Glue table to retain
+ * any user-defined comments on the columns.
+ *
+ * @param tableInputBuilder Glue TableInput builder
+ * @param metadata Iceberg table metadata
+ * @param existingTable optional existing Glue table, used to preserve
column comments
+ */
+ static void setTableInputInformation(
+ TableInput.Builder tableInputBuilder, TableMetadata metadata, Table
existingTable) {
try {
Map<String, String> properties = metadata.properties();
StorageDescriptor.Builder storageDescriptor =
StorageDescriptor.builder();
@@ -231,11 +256,28 @@ class IcebergToGlueConverter {
.collect(Collectors.toSet()));
}
- Optional.ofNullable(properties.get(GLUE_DESCRIPTION_KEY))
- .ifPresent(tableInputBuilder::description);
+ String description = properties.get(GLUE_DESCRIPTION_KEY);
+ if (description != null) {
+ tableInputBuilder.description(description);
+ } else if (existingTable != null) {
+
Optional.ofNullable(existingTable.description()).ifPresent(tableInputBuilder::description);
+ }
+
+ Map<String, String> existingColumnMap = null;
+ if (existingTable != null) {
+ List<Column> existingColumns =
existingTable.storageDescriptor().columns();
+ existingColumnMap =
+ existingColumns.stream()
+ .filter(column -> column.comment() != null)
+ .collect(Collectors.toMap(Column::name, Column::comment));
+ } else {
+ existingColumnMap = Collections.emptyMap();
+ }
+
+ List<Column> columns = toColumns(metadata, existingColumnMap);
tableInputBuilder.storageDescriptor(
-
storageDescriptor.location(metadata.location()).columns(toColumns(metadata)).build());
+
storageDescriptor.location(metadata.location()).columns(columns).build());
} catch (RuntimeException e) {
LOG.warn(
"Encountered unexpected exception while converting Iceberg metadata
to Glue table information",
@@ -297,18 +339,20 @@ class IcebergToGlueConverter {
}
}
- private static List<Column> toColumns(TableMetadata metadata) {
+ private static List<Column> toColumns(
+ TableMetadata metadata, Map<String, String> existingColumnMap) {
List<Column> columns = Lists.newArrayList();
Set<String> addedNames = Sets.newHashSet();
for (NestedField field : metadata.schema().columns()) {
- addColumnWithDedupe(columns, addedNames, field, true /* is current */);
+ addColumnWithDedupe(columns, addedNames, field, true /* is current */,
existingColumnMap);
}
for (Schema schema : metadata.schemas()) {
if (schema.schemaId() != metadata.currentSchemaId()) {
for (NestedField field : schema.columns()) {
- addColumnWithDedupe(columns, addedNames, field, false /* is not
current */);
+ addColumnWithDedupe(
+ columns, addedNames, field, false /* is not current */,
existingColumnMap);
}
}
}
@@ -317,19 +361,29 @@ class IcebergToGlueConverter {
}
private static void addColumnWithDedupe(
- List<Column> columns, Set<String> dedupe, NestedField field, boolean
isCurrent) {
+ List<Column> columns,
+ Set<String> dedupe,
+ NestedField field,
+ boolean isCurrent,
+ Map<String, String> existingColumnMap) {
if (!dedupe.contains(field.name())) {
- columns.add(
+ Column.Builder builder =
Column.builder()
.name(field.name())
.type(toTypeString(field.type()))
- .comment(field.doc())
.parameters(
ImmutableMap.of(
ICEBERG_FIELD_ID, Integer.toString(field.fieldId()),
ICEBERG_FIELD_OPTIONAL,
Boolean.toString(field.isOptional()),
- ICEBERG_FIELD_CURRENT, Boolean.toString(isCurrent)))
- .build());
+ ICEBERG_FIELD_CURRENT, Boolean.toString(isCurrent)));
+
+ if (field.doc() != null && !field.doc().isEmpty()) {
+ builder.comment(field.doc());
+ } else if (existingColumnMap != null &&
existingColumnMap.containsKey(field.name())) {
+ builder.comment(existingColumnMap.get(field.name()));
+ }
+
+ columns.add(builder.build());
dedupe.add(field.name());
}
}
diff --git
a/aws/src/test/java/org/apache/iceberg/aws/glue/TestIcebergToGlueConverter.java
b/aws/src/test/java/org/apache/iceberg/aws/glue/TestIcebergToGlueConverter.java
index 7c646f7bf7..1136ad63b4 100644
---
a/aws/src/test/java/org/apache/iceberg/aws/glue/TestIcebergToGlueConverter.java
+++
b/aws/src/test/java/org/apache/iceberg/aws/glue/TestIcebergToGlueConverter.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
public class TestIcebergToGlueConverter {
@@ -306,4 +307,81 @@ public class TestIcebergToGlueConverter {
.as("description should match")
.isEqualTo(tableDescription);
}
+
+ @Test
+ public void testSetTableInputInformationWithExistingTable() {
+ // Actual TableInput
+ TableInput.Builder actualTableInputBuilder = TableInput.builder();
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "x", Types.StringType.get()),
+ Types.NestedField.required(2, "y", Types.StringType.get(), "new
comment"),
+ Types.NestedField.required(3, "z", Types.StringType.get(), "new
comment"));
+ PartitionSpec partitionSpec =
+
PartitionSpec.builderFor(schema).identity("x").withSpecId(1000).build();
+ TableMetadata tableMetadata =
+ TableMetadata.newTableMetadata(schema, partitionSpec, "s3://test",
tableLocationProperties);
+
+ // Existing Table
+ Table existingGlueTable =
+ Table.builder()
+ .storageDescriptor(
+ StorageDescriptor.builder()
+ .columns(
+ ImmutableList.of(
+ Column.builder().name("x").comment("existing
comment").build(),
+ Column.builder().name("y").comment("existing
comment").build()))
+ .build())
+ .build();
+
+ IcebergToGlueConverter.setTableInputInformation(
+ actualTableInputBuilder, tableMetadata, existingGlueTable);
+ TableInput actualTableInput = actualTableInputBuilder.build();
+
+ // Expected TableInput
+ TableInput expectedTableInput =
+ TableInput.builder()
+ .storageDescriptor(
+ StorageDescriptor.builder()
+ .location("s3://test")
+
.additionalLocations(Sets.newHashSet(tableLocationProperties.values()))
+ .columns(
+ ImmutableList.of(
+ Column.builder()
+ .name("x")
+ .type("string")
+ .comment("existing comment")
+ .parameters(
+ ImmutableMap.of(
+
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
+
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
+
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
+ .build(),
+ Column.builder()
+ .name("y")
+ .type("string")
+ .comment("new comment")
+ .parameters(
+ ImmutableMap.of(
+
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
+
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
+
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
+ .build(),
+ Column.builder()
+ .name("z")
+ .type("string")
+ .comment("new comment")
+ .parameters(
+ ImmutableMap.of(
+
IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
+
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
+
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
+ .build()))
+ .build())
+ .build();
+
+ assertThat(actualTableInput.storageDescriptor().columns())
+ .as("Columns should match")
+ .isEqualTo(expectedTableInput.storageDescriptor().columns());
+ }
}