This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit a679273d187f633e944c9e1074b5e9bc02e6ef30
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jan 2 09:23:56 2024 +0800

    [core][spark] Fix comment consistency in the schema (#2600)
---
 .../main/java/org/apache/paimon/schema/Schema.java |  2 +-
 .../org/apache/paimon/schema/SchemaChange.java     | 38 ++++++++++
 .../org/apache/paimon/schema/SchemaManager.java    |  7 +-
 .../org/apache/paimon/schema/SchemaSerializer.java |  5 +-
 .../java/org/apache/paimon/schema/TableSchema.java | 10 +--
 .../org/apache/paimon/catalog/CatalogTestBase.java | 31 ++++++++
 .../flink/SchemaChangeSerializationTest.java       |  1 +
 .../java/org/apache/paimon/spark/SparkCatalog.java | 19 +++--
 .../java/org/apache/paimon/spark/SparkTable.java   |  3 +
 .../paimon/spark/sql/DescribeTableTest.scala       | 83 ++++++++++++++++++++++
 10 files changed, 185 insertions(+), 14 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index 15a361aec..824dff5c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -319,7 +319,7 @@ public class Schema {
         }
 
         /** Declares table comment. */
-        public Builder comment(String comment) {
+        public Builder comment(@Nullable String comment) {
             this.comment = comment;
             return this;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 28c49af7c..711b09bbe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -43,6 +43,10 @@ public interface SchemaChange extends Serializable {
         return new RemoveOption(key);
     }
 
+    static SchemaChange updateComment(@Nullable String comment) {
+        return new UpdateComment(comment);
+    }
+
     static SchemaChange addColumn(String fieldName, DataType dataType) {
         return addColumn(fieldName, dataType, null, null);
     }
@@ -159,6 +163,40 @@ public interface SchemaChange extends Serializable {
         }
     }
 
+    /** A SchemaChange to Update table comment. */
+    final class UpdateComment implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
+        // If comment is null, means to remove comment
+        private final @Nullable String comment;
+
+        private UpdateComment(@Nullable String comment) {
+            this.comment = comment;
+        }
+
+        public @Nullable String comment() {
+            return comment;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+            UpdateComment that = (UpdateComment) object;
+            return Objects.equals(comment, that.comment);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(comment);
+        }
+    }
+
     /** A SchemaChange to add a field. */
     final class AddColumn implements SchemaChange {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 01b395a84..dba7f8e14 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -35,6 +35,7 @@ import 
org.apache.paimon.schema.SchemaChange.UpdateColumnComment;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnNullability;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
+import org.apache.paimon.schema.SchemaChange.UpdateComment;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeCasts;
@@ -165,6 +166,7 @@ public class SchemaManager implements Serializable {
             Map<String, String> newOptions = new HashMap<>(schema.options());
             List<DataField> newFields = new ArrayList<>(schema.fields());
             AtomicInteger highestFieldId = new 
AtomicInteger(schema.highestFieldId());
+            String newComment = schema.comment();
             for (SchemaChange change : changes) {
                 if (change instanceof SetOption) {
                     SetOption setOption = (SetOption) change;
@@ -174,6 +176,9 @@ public class SchemaManager implements Serializable {
                     RemoveOption removeOption = (RemoveOption) change;
                     checkAlterTableOption(removeOption.key());
                     newOptions.remove(removeOption.key());
+                } else if (change instanceof UpdateComment) {
+                    UpdateComment updateComment = (UpdateComment) change;
+                    newComment = updateComment.comment();
                 } else if (change instanceof AddColumn) {
                     AddColumn addColumn = (AddColumn) change;
                     SchemaChange.Move move = addColumn.move();
@@ -341,7 +346,7 @@ public class SchemaManager implements Serializable {
                             schema.partitionKeys(),
                             schema.primaryKeys(),
                             newOptions,
-                            schema.comment());
+                            newComment);
 
             try {
                 boolean success = commit(newSchema);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
index af78e3b97..ca7fc540b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
@@ -22,7 +22,6 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypeJsonParser;
 import org.apache.paimon.utils.JsonDeserializer;
 import org.apache.paimon.utils.JsonSerializer;
-import org.apache.paimon.utils.StringUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -72,7 +71,7 @@ public class SchemaSerializer
         }
         generator.writeEndObject();
 
-        if (!StringUtils.isNullOrWhitespaceOnly(tableSchema.comment())) {
+        if (tableSchema.comment() != null) {
             generator.writeStringField("comment", tableSchema.comment());
         }
 
@@ -114,7 +113,7 @@ public class SchemaSerializer
         }
 
         JsonNode commentNode = node.get("comment");
-        String comment = "";
+        String comment = null;
         if (commentNode != null) {
             comment = commentNode.asText();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index b8caa841a..cd8326d54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -24,6 +24,8 @@ import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
@@ -56,7 +58,7 @@ public class TableSchema implements Serializable {
 
     private final Map<String, String> options;
 
-    private final String comment;
+    private final @Nullable String comment;
 
     private final long timeMillis;
 
@@ -67,7 +69,7 @@ public class TableSchema implements Serializable {
             List<String> partitionKeys,
             List<String> primaryKeys,
             Map<String, String> options,
-            String comment) {
+            @Nullable String comment) {
         this(
                 id,
                 fields,
@@ -86,7 +88,7 @@ public class TableSchema implements Serializable {
             List<String> partitionKeys,
             List<String> primaryKeys,
             Map<String, String> options,
-            String comment,
+            @Nullable String comment,
             long timeMillis) {
         this.id = id;
         this.fields = fields;
@@ -205,7 +207,7 @@ public class TableSchema implements Serializable {
         return new HashSet<>(all).containsAll(new HashSet<>(contains));
     }
 
-    public String comment() {
+    public @Nullable String comment() {
         return comment;
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index b8395358b..868ce4ecc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -792,4 +792,35 @@ public abstract class CatalogTestBase {
                                 UnsupportedOperationException.class,
                                 "Cannot change nullability of primary key"));
     }
+
+    @Test
+    public void testAlterTableUpdateComment() throws Exception {
+        catalog.createDatabase("test_db", false);
+
+        Identifier identifier = Identifier.create("test_db", "test_table");
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "col1", DataTypes.STRING(), 
"field1"),
+                                new DataField(1, "col2", DataTypes.STRING(), 
"field2")),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Maps.newHashMap(),
+                        "comment"),
+                false);
+
+        catalog.alterTable(
+                identifier, Lists.newArrayList(SchemaChange.updateComment("new 
comment")), false);
+
+        Table table = catalog.getTable(identifier);
+        assertThat(table.comment().isPresent() && 
table.comment().get().equals("new comment"))
+                .isTrue();
+
+        // drop comment
+        catalog.alterTable(identifier, 
Lists.newArrayList(SchemaChange.updateComment(null)), false);
+
+        table = catalog.getTable(identifier);
+        assertThat(table.comment().isPresent()).isFalse();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java
index 4606bd0d0..cb9dc5084 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java
@@ -46,6 +46,7 @@ public class SchemaChangeSerializationTest {
         runTest(SchemaChange.updateColumnNullability(new String[] {"col1", 
"col2"}, true));
         runTest(SchemaChange.updateColumnComment(new String[] {"col1", 
"col2"}, "comment"));
         
runTest(SchemaChange.updateColumnPosition(SchemaChange.Move.after("col", 
"ref")));
+        runTest(SchemaChange.updateComment("comment"));
     }
 
     private void runTest(SchemaChange schemaChange) throws Exception {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 4436b7c8d..f85d8065e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -291,7 +291,7 @@ public class SparkCatalog extends SparkBaseCatalog {
             throws TableAlreadyExistsException, NoSuchNamespaceException {
         try {
             catalog.createTable(
-                    toIdentifier(ident), toUpdateSchema(schema, partitions, 
properties), false);
+                    toIdentifier(ident), toInitialSchema(schema, partitions, 
properties), false);
             return loadTable(ident);
         } catch (Catalog.TableAlreadyExistException e) {
             throw new TableAlreadyExistsException(ident);
@@ -316,11 +316,19 @@ public class SparkCatalog extends SparkBaseCatalog {
         if (change instanceof TableChange.SetProperty) {
             TableChange.SetProperty set = (TableChange.SetProperty) change;
             validateAlterProperty(set.property());
-            return SchemaChange.setOption(set.property(), set.value());
+            if (set.property().equals(TableCatalog.PROP_COMMENT)) {
+                return SchemaChange.updateComment(set.value());
+            } else {
+                return SchemaChange.setOption(set.property(), set.value());
+            }
         } else if (change instanceof TableChange.RemoveProperty) {
             TableChange.RemoveProperty remove = (TableChange.RemoveProperty) 
change;
             validateAlterProperty(remove.property());
-            return SchemaChange.removeOption(remove.property());
+            if (remove.property().equals(TableCatalog.PROP_COMMENT)) {
+                return SchemaChange.updateComment(null);
+            } else {
+                return SchemaChange.removeOption(remove.property());
+            }
         } else if (change instanceof TableChange.AddColumn) {
             TableChange.AddColumn add = (TableChange.AddColumn) change;
             validateAlterNestedField(add.fieldNames());
@@ -373,7 +381,7 @@ public class SparkCatalog extends SparkBaseCatalog {
         return move;
     }
 
-    private Schema toUpdateSchema(
+    private Schema toInitialSchema(
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
         Preconditions.checkArgument(
                 Arrays.stream(partitions)
@@ -385,6 +393,7 @@ public class SparkCatalog extends SparkBaseCatalog {
                                 }));
         Map<String, String> normalizedProperties = new HashMap<>(properties);
         normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
+        normalizedProperties.remove(TableCatalog.PROP_COMMENT);
         String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
         List<String> primaryKeys =
                 pkAsString == null
@@ -400,7 +409,7 @@ public class SparkCatalog extends SparkBaseCatalog {
                                 Arrays.stream(partitions)
                                         .map(partition -> 
partition.references()[0].describe())
                                         .collect(Collectors.toList()))
-                        
.comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, ""));
+                        
.comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, null));
 
         for (StructField field : schema.fields()) {
             schemaBuilder.column(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index dd0d5bce1..26d61e824 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -114,6 +114,9 @@ public class SparkTable
                         CoreOptions.PRIMARY_KEY.key(), String.join(",", 
table.primaryKeys()));
             }
             properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME());
+            if (table.comment().isPresent()) {
+                properties.put(TableCatalog.PROP_COMMENT, 
table.comment().get());
+            }
             return properties;
         } else {
             return Collections.emptyMap();
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
new file mode 100644
index 000000000..4a9a7f109
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
+
+import java.util.Objects
+
+class DescribeTableTest extends PaimonSparkTestBase {
+
+  test(s"Paimon describe: describe table comment") {
+    var comment = "test comment"
+    spark.sql(s"""
+                 |CREATE TABLE T (
+                 |  id INT COMMENT 'id comment',
+                 |  name STRING,
+                 |  dt STRING)
+                 |COMMENT '$comment'
+                 |""".stripMargin)
+    checkTableCommentEqual("T", comment)
+
+    comment = "new comment"
+    spark.sql(s"ALTER TABLE T SET TBLPROPERTIES ('comment' = '$comment')")
+    checkTableCommentEqual("T", comment)
+
+    comment = "  "
+    spark.sql(s"ALTER TABLE T SET TBLPROPERTIES ('comment' = '$comment')")
+    checkTableCommentEqual("T", comment)
+
+    comment = ""
+    spark.sql(s"ALTER TABLE T SET TBLPROPERTIES ('comment' = '$comment')")
+    checkTableCommentEqual("T", comment)
+
+    spark.sql(s"ALTER TABLE T UNSET TBLPROPERTIES ('comment')")
+    checkTableCommentEqual("T", null)
+
+    comment = "new comment"
+    spark.sql(s"ALTER TABLE T SET TBLPROPERTIES ('comment' = '$comment')")
+    checkTableCommentEqual("T", comment)
+  }
+
+  test(s"Paimon describe: describe table with no comment") {
+    spark.sql(s"""
+                 |CREATE TABLE T (
+                 |  id INT COMMENT 'id comment',
+                 |  name STRING,
+                 |  dt STRING)
+                 |""".stripMargin)
+    checkTableCommentEqual("T", null)
+  }
+
+  def checkTableCommentEqual(tableName: String, comment: String): Unit = {
+    // check describe table
+    checkAnswer(
+      spark
+        .sql(s"DESCRIBE TABLE EXTENDED $tableName")
+        .filter("col_name = 'Comment'")
+        .select("col_name", "data_type"),
+      if (comment == null) Nil else Row("Comment", comment) :: Nil
+    )
+
+    // check comment in schema
+    Assertions.assertTrue(Objects.equals(comment, 
loadTable(tableName).schema().comment()))
+  }
+}

Reply via email to