This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new ced92230 Iceberg and Delta target: sync schema field comments
ced92230 is described below

commit ced9223033d3efe64cf475556f3def4b1b1b85d3
Author: Hanzhi Wang <[email protected]>
AuthorDate: Fri Nov 1 22:03:00 2024 -0700

    Iceberg and Delta target: sync schema field comments
    
    It fixed the issue that the comments are not synced correctly when the 
target table is Iceberg or Delta.
    For Delta target, extract the comment from internal schema and add to Spark 
schema representative.
    For Iceberg target, make sure any difference of comments is captured during 
syncSchema() and the schema is updated with new comments.
---
 .../apache/xtable/delta/DeltaSchemaExtractor.java  |  15 ++-
 .../apache/xtable/iceberg/IcebergSchemaSync.java   |   7 ++
 .../xtable/delta/TestDeltaSchemaExtractor.java     |  18 ++-
 .../xtable/iceberg/TestIcebergSchemaSync.java      | 126 ++++++++++++++++++++-
 4 files changed, 154 insertions(+), 12 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index e312761f..a10ee120 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -59,6 +59,7 @@ import org.apache.xtable.schema.SchemaUtils;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class DeltaSchemaExtractor {
   private static final String DELTA_COLUMN_MAPPING_ID = 
"delta.columnMapping.id";
+  private static final String COMMENT = "comment";
   private static final DeltaSchemaExtractor INSTANCE = new 
DeltaSchemaExtractor();
 
   public static DeltaSchemaExtractor getInstance() {
@@ -74,7 +75,7 @@ public class DeltaSchemaExtractor {
                         field.getName(),
                         convertFieldType(field),
                         field.getSchema().isNullable(),
-                        getMetaData(field.getSchema().getDataType())))
+                        getMetaData(field.getSchema())))
             .toArray(StructField[]::new);
     return new StructType(fields);
   }
@@ -144,12 +145,16 @@ public class DeltaSchemaExtractor {
     }
   }
 
-  private Metadata getMetaData(InternalType type) {
+  private Metadata getMetaData(InternalSchema schema) {
+    InternalType type = schema.getDataType();
+    MetadataBuilder metadataBuilder = new MetadataBuilder();
     if (type == InternalType.UUID) {
-      return new 
MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build();
-    } else {
-      return Metadata.empty();
+      metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
     }
+    if (schema.getComment() != null) {
+      metadataBuilder.putString(COMMENT, schema.getComment());
+    }
+    return metadataBuilder.build();
   }
 
   public InternalSchema toInternalSchema(StructType structType) {
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
index 05e89469..800938cb 100644
--- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
+++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
@@ -21,6 +21,7 @@ package org.apache.xtable.iceberg;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.Supplier;
 
@@ -132,6 +133,12 @@ public class IcebergSchemaSync {
               latestColumn.fieldId(), () -> 
updateSchema.requireColumn(latestColumn.name()));
         }
       }
+      // update the comment of the column
+      if (!Objects.equals(currentColumn.doc(), latestColumn.doc())) {
+        updates.put(
+            latestColumn.fieldId(),
+            () -> updateSchema.updateColumnDoc(latestColumn.name(), 
latestColumn.doc()));
+      }
       if (latestColumn.type().isStructType()) {
         updates.putAll(
             addUpdates(
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
index 4b0eacd0..361245fe 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
@@ -56,6 +56,7 @@ public class TestDeltaSchemaExtractor {
                                 .name("boolean")
                                 .dataType(InternalType.BOOLEAN)
                                 .isNullable(false)
+                                .comment("requiredBooleanComment")
                                 .build())
                         .build(),
                     InternalField.builder()
@@ -226,7 +227,7 @@ public class TestDeltaSchemaExtractor {
 
     StructType structRepresentation =
         new StructType()
-            .add("requiredBoolean", DataTypes.BooleanType, false)
+            .add("requiredBoolean", DataTypes.BooleanType, false, 
"requiredBooleanComment")
             .add("optionalBoolean", DataTypes.BooleanType, true)
             .add("requiredInt", DataTypes.IntegerType, false)
             .add("optionalInt", DataTypes.IntegerType, true)
@@ -268,6 +269,7 @@ public class TestDeltaSchemaExtractor {
                                 .name("fixed")
                                 .dataType(InternalType.FIXED)
                                 .isNullable(false)
+                                .comment("comment")
                                 .build())
                         .build(),
                     InternalField.builder()
@@ -296,6 +298,7 @@ public class TestDeltaSchemaExtractor {
                                 .name("binary")
                                 .dataType(InternalType.BYTES)
                                 .isNullable(false)
+                                .comment("comment")
                                 .build())
                         .build(),
                     InternalField.builder()
@@ -311,7 +314,7 @@ public class TestDeltaSchemaExtractor {
             .build();
     StructType structRepresentation =
         new StructType()
-            .add("requiredFixed", DataTypes.BinaryType, false)
+            .add("requiredFixed", DataTypes.BinaryType, false, "comment")
             .add("optionalFixed", DataTypes.BinaryType, true);
 
     Assertions.assertEquals(
@@ -681,6 +684,7 @@ public class TestDeltaSchemaExtractor {
                                 .name("struct")
                                 .dataType(InternalType.RECORD)
                                 .isNullable(true)
+                                .comment("comment")
                                 .fields(
                                     Arrays.asList(
                                         InternalField.builder()
@@ -691,6 +695,7 @@ public class TestDeltaSchemaExtractor {
                                                     .name("integer")
                                                     .dataType(InternalType.INT)
                                                     .isNullable(true)
+                                                    
.comment("nestedOptionalIntComment")
                                                     .build())
                                             .defaultValue(
                                                 
InternalField.Constants.NULL_DEFAULT_VALUE)
@@ -740,13 +745,18 @@ public class TestDeltaSchemaExtractor {
             .add(
                 "nestedOne",
                 new StructType()
-                    .add("nestedOptionalInt", DataTypes.IntegerType, true)
+                    .add(
+                        "nestedOptionalInt",
+                        DataTypes.IntegerType,
+                        true,
+                        "nestedOptionalIntComment")
                     .add("nestedRequiredDouble", DataTypes.DoubleType, false)
                     .add(
                         "nestedTwo",
                         new StructType().add("doublyNestedString", 
DataTypes.StringType, true),
                         false),
-                true);
+                true,
+                "comment");
     Assertions.assertEquals(
         structRepresentation,
         DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
index 347eb309..98254591 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
@@ -43,14 +43,15 @@ public class TestIcebergSchemaSync {
 
   private static final Schema SCHEMA =
       new Schema(
-          Types.NestedField.required(1, "timestamp_field", 
Types.TimestampType.withoutZone()),
+          Types.NestedField.required(
+              1, "timestamp_field", Types.TimestampType.withoutZone(), "doc"),
           Types.NestedField.optional(2, "date_field", Types.DateType.get()),
           Types.NestedField.required(3, "group_id", Types.IntegerType.get()),
           Types.NestedField.required(
               4,
               "record",
               Types.StructType.of(
-                  Types.NestedField.required(5, "string_field", 
Types.StringType.get()),
+                  Types.NestedField.required(5, "string_field", 
Types.StringType.get(), "doc"),
                   Types.NestedField.required(6, "int_field", 
Types.IntegerType.get()))),
           Types.NestedField.required(
               7,
@@ -228,6 +229,112 @@ public class TestIcebergSchemaSync {
     verify(mockUpdateSchema).commit();
   }
 
+  @Test
+  void testAddFieldComment() {
+    UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
+    when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
+    Types.NestedField updated =
+        Types.NestedField.optional(2, "date_field", Types.DateType.get(), 
"doc");
+    Schema latest = addCommentToDefault(updated, 2);
+
+    schemaSync.sync(SCHEMA, latest, mockTransaction);
+
+    verify(mockUpdateSchema).updateColumnDoc("date_field", "doc");
+    verify(mockUpdateSchema).commit();
+  }
+
+  @Test
+  void testDropFieldComment() {
+    UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
+    when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
+    Types.NestedField updated =
+        Types.NestedField.optional(1, "timestamp_field", Types.DateType.get());
+    Schema latest = addCommentToDefault(updated, 1);
+
+    schemaSync.sync(SCHEMA, latest, mockTransaction);
+
+    verify(mockUpdateSchema).updateColumnDoc("timestamp_field", null);
+    verify(mockUpdateSchema).commit();
+  }
+
+  @Test
+  void tesUpdatedFieldComment() {
+    UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
+    when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
+    Types.NestedField updated =
+        Types.NestedField.optional(1, "timestamp_field", Types.DateType.get(), 
"new comment");
+    Schema latest = addCommentToDefault(updated, 1);
+
+    schemaSync.sync(SCHEMA, latest, mockTransaction);
+
+    verify(mockUpdateSchema).updateColumnDoc("timestamp_field", "new comment");
+    verify(mockUpdateSchema).commit();
+  }
+
+  @Test
+  void testAddNestedFieldComment() {
+    UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
+    when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
+    Types.NestedField updated =
+        Types.NestedField.required(
+            4,
+            "record",
+            Types.StructType.of(
+                Types.NestedField.required(5, "string_field", 
Types.StringType.get(), "doc"),
+                Types.NestedField.required(6, "int_field", 
Types.IntegerType.get(), "doc")));
+    Schema latest = addCommentToDefault(updated, 4);
+
+    schemaSync.sync(SCHEMA, latest, mockTransaction);
+
+    verify(mockUpdateSchema).updateColumnDoc("int_field", "doc");
+    verify(mockUpdateSchema).commit();
+  }
+
+  @Test
+  void testAddListFieldComment() {
+    UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
+    when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
+    Types.NestedField updated =
+        Types.NestedField.required(
+            10,
+            "array_field",
+            Types.ListType.ofRequired(
+                11,
+                Types.StructType.of(
+                    Types.NestedField.required(15, "element_string", 
Types.StringType.get(), "doc"),
+                    Types.NestedField.optional(16, "element_int", 
Types.IntegerType.get()))));
+    Schema latest = addCommentToDefault(updated, 10);
+
+    schemaSync.sync(SCHEMA, latest, mockTransaction);
+
+    verify(mockUpdateSchema).updateColumnDoc("element_string", "doc");
+    verify(mockUpdateSchema).commit();
+  }
+
+  @Test
+  void testAddMapFieldComment() {
+    UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
+    when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
+    Types.NestedField updated =
+        Types.NestedField.required(
+            7,
+            "map_field",
+            Types.MapType.ofRequired(
+                8,
+                9,
+                Types.StructType.of(
+                    Types.NestedField.required(12, "key_string", 
Types.StringType.get())),
+                Types.StructType.of(
+                    Types.NestedField.required(13, "value_string", 
Types.StringType.get(), "doc"),
+                    Types.NestedField.optional(14, "value_int", 
Types.IntegerType.get()))));
+    Schema latest = addCommentToDefault(updated, 7);
+
+    schemaSync.sync(SCHEMA, latest, mockTransaction);
+
+    verify(mockUpdateSchema).updateColumnDoc("value_string", "doc");
+    verify(mockUpdateSchema).commit();
+  }
+
   private Schema addColumnToDefault(Schema schema, Types.NestedField field, 
Integer parentId) {
     List<Types.NestedField> fields = new ArrayList<>();
     for (Types.NestedField existingField : schema.columns()) {
@@ -251,6 +358,18 @@ public class TestIcebergSchemaSync {
     return new Schema(fields);
   }
 
+  private Schema addCommentToDefault(Types.NestedField updated, int fieldId) {
+    List<Types.NestedField> fields = new ArrayList<>();
+    for (Types.NestedField existingField : SCHEMA.columns()) {
+      if (existingField.fieldId() == fieldId) {
+        fields.add(updated);
+      } else {
+        fields.add(existingField);
+      }
+    }
+    return new Schema(fields);
+  }
+
   private Schema updateFieldRequired(int fieldId) {
     List<Types.NestedField> fields = new ArrayList<>();
     for (Types.NestedField existingField : SCHEMA.columns()) {
@@ -260,7 +379,8 @@ public class TestIcebergSchemaSync {
                 existingField.fieldId(),
                 !existingField.isOptional(),
                 existingField.name(),
-                existingField.type()));
+                existingField.type(),
+                existingField.doc()));
       } else {
         fields.add(existingField);
       }

Reply via email to