This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit a679273d187f633e944c9e1074b5e9bc02e6ef30 Author: Zouxxyy <[email protected]> AuthorDate: Tue Jan 2 09:23:56 2024 +0800 [core][spark] Fix comment consistency in the schema (#2600) --- .../main/java/org/apache/paimon/schema/Schema.java | 2 +- .../org/apache/paimon/schema/SchemaChange.java | 38 ++++++++++ .../org/apache/paimon/schema/SchemaManager.java | 7 +- .../org/apache/paimon/schema/SchemaSerializer.java | 5 +- .../java/org/apache/paimon/schema/TableSchema.java | 10 +-- .../org/apache/paimon/catalog/CatalogTestBase.java | 31 ++++++++ .../flink/SchemaChangeSerializationTest.java | 1 + .../java/org/apache/paimon/spark/SparkCatalog.java | 19 +++-- .../java/org/apache/paimon/spark/SparkTable.java | 3 + .../paimon/spark/sql/DescribeTableTest.scala | 83 ++++++++++++++++++++++ 10 files changed, 185 insertions(+), 14 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java index 15a361aec..824dff5c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java @@ -319,7 +319,7 @@ public class Schema { } /** Declares table comment. */ - public Builder comment(String comment) { + public Builder comment(@Nullable String comment) { this.comment = comment; return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index 28c49af7c..711b09bbe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -43,6 +43,10 @@ public interface SchemaChange extends Serializable { return new RemoveOption(key); } + static SchemaChange updateComment(@Nullable String comment) { + return new UpdateComment(comment); + } + static SchemaChange addColumn(String fieldName, DataType dataType) { return addColumn(fieldName, dataType, null, null); } @@ -159,6 +163,40 @@ public interface SchemaChange extends Serializable { } } + /** A SchemaChange to Update table comment. */ + final class UpdateComment implements SchemaChange { + + private static final long serialVersionUID = 1L; + + // If comment is null, means to remove comment + private final @Nullable String comment; + + private UpdateComment(@Nullable String comment) { + this.comment = comment; + } + + public @Nullable String comment() { + return comment; + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + UpdateComment that = (UpdateComment) object; + return Objects.equals(comment, that.comment); + } + + @Override + public int hashCode() { + return Objects.hash(comment); + } + } + /** A SchemaChange to add a field. */ final class AddColumn implements SchemaChange { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 01b395a84..dba7f8e14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -35,6 +35,7 @@ import org.apache.paimon.schema.SchemaChange.UpdateColumnComment; import org.apache.paimon.schema.SchemaChange.UpdateColumnNullability; import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition; import org.apache.paimon.schema.SchemaChange.UpdateColumnType; +import org.apache.paimon.schema.SchemaChange.UpdateComment; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeCasts; @@ -165,6 +166,7 @@ public class SchemaManager implements Serializable { Map<String, String> newOptions = new HashMap<>(schema.options()); List<DataField> newFields = new ArrayList<>(schema.fields()); AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId()); + String newComment = schema.comment(); for (SchemaChange change : changes) { if (change instanceof SetOption) { SetOption setOption = (SetOption) change; @@ -174,6 +176,9 @@ public class SchemaManager implements Serializable { RemoveOption removeOption = (RemoveOption) change; checkAlterTableOption(removeOption.key()); newOptions.remove(removeOption.key()); + } else if (change instanceof UpdateComment) { + UpdateComment updateComment = (UpdateComment) change; + newComment = updateComment.comment(); } else if (change instanceof AddColumn) { AddColumn addColumn = (AddColumn) change; SchemaChange.Move move = addColumn.move(); @@ -341,7 +346,7 @@ public class SchemaManager implements Serializable { schema.partitionKeys(), schema.primaryKeys(), newOptions, - schema.comment()); + newComment); try { boolean success = commit(newSchema); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java index af78e3b97..ca7fc540b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java @@ -22,7 +22,6 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypeJsonParser; import org.apache.paimon.utils.JsonDeserializer; import org.apache.paimon.utils.JsonSerializer; -import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -72,7 +71,7 @@ public class SchemaSerializer } generator.writeEndObject(); - if (!StringUtils.isNullOrWhitespaceOnly(tableSchema.comment())) { + if (tableSchema.comment() != null) { generator.writeStringField("comment", tableSchema.comment()); } @@ -114,7 +113,7 @@ public class SchemaSerializer } JsonNode commentNode = node.get("comment"); - String comment = ""; + String comment = null; if (commentNode != null) { comment = commentNode.asText(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index b8caa841a..cd8326d54 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -24,6 +24,8 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Arrays; import java.util.Collections; @@ -56,7 +58,7 @@ public class TableSchema implements Serializable { private final Map<String, String> options; - private final String comment; + private final @Nullable String comment; private final long timeMillis; @@ -67,7 +69,7 @@ public class TableSchema implements Serializable { List<String> partitionKeys, List<String> primaryKeys, Map<String, String> options, - String comment) { + @Nullable String comment) { this( id, fields, @@ -86,7 +88,7 @@ public class TableSchema implements Serializable { List<String> partitionKeys, List<String> primaryKeys, Map<String, String> options, - String comment, + @Nullable String comment, long timeMillis) { this.id = id; this.fields = fields; @@ -205,7 +207,7 @@ public class TableSchema implements Serializable { return new HashSet<>(all).containsAll(new HashSet<>(contains)); } - public String comment() { + public @Nullable String comment() { return comment; } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index b8395358b..868ce4ecc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -792,4 +792,35 @@ public abstract class CatalogTestBase { UnsupportedOperationException.class, "Cannot change nullability of primary key")); } + + @Test + public void testAlterTableUpdateComment() throws Exception { + catalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "test_table"); + catalog.createTable( + identifier, + new Schema( + Lists.newArrayList( + new DataField(0, "col1", DataTypes.STRING(), "field1"), + new DataField(1, "col2", DataTypes.STRING(), "field2")), + Collections.emptyList(), + Collections.emptyList(), + Maps.newHashMap(), + "comment"), + false); + + catalog.alterTable( + identifier, Lists.newArrayList(SchemaChange.updateComment("new comment")), false); + + Table table = catalog.getTable(identifier); + assertThat(table.comment().isPresent() && table.comment().get().equals("new comment")) + .isTrue(); + + // drop comment + catalog.alterTable(identifier, Lists.newArrayList(SchemaChange.updateComment(null)), false); + + table = catalog.getTable(identifier); + assertThat(table.comment().isPresent()).isFalse(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java index 4606bd0d0..cb9dc5084 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java @@ -46,6 +46,7 @@ public class SchemaChangeSerializationTest { runTest(SchemaChange.updateColumnNullability(new String[] {"col1", "col2"}, true)); runTest(SchemaChange.updateColumnComment(new String[] {"col1", "col2"}, "comment")); runTest(SchemaChange.updateColumnPosition(SchemaChange.Move.after("col", "ref"))); + runTest(SchemaChange.updateComment("comment")); } private void runTest(SchemaChange schemaChange) throws Exception { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 4436b7c8d..f85d8065e 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -291,7 +291,7 @@ public class SparkCatalog extends SparkBaseCatalog { throws TableAlreadyExistsException, NoSuchNamespaceException { try { catalog.createTable( - toIdentifier(ident), toUpdateSchema(schema, partitions, properties), false); + toIdentifier(ident), toInitialSchema(schema, partitions, properties), false); return loadTable(ident); } catch (Catalog.TableAlreadyExistException e) { throw new TableAlreadyExistsException(ident); @@ -316,11 +316,19 @@ public class SparkCatalog extends SparkBaseCatalog { if (change instanceof TableChange.SetProperty) { TableChange.SetProperty set = (TableChange.SetProperty) change; validateAlterProperty(set.property()); - return SchemaChange.setOption(set.property(), set.value()); + if (set.property().equals(TableCatalog.PROP_COMMENT)) { + return SchemaChange.updateComment(set.value()); + } else { + return SchemaChange.setOption(set.property(), set.value()); + } } else if (change instanceof TableChange.RemoveProperty) { TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change; validateAlterProperty(remove.property()); - return SchemaChange.removeOption(remove.property()); + if (remove.property().equals(TableCatalog.PROP_COMMENT)) { + return SchemaChange.updateComment(null); + } else { + return SchemaChange.removeOption(remove.property()); + } } else if (change instanceof TableChange.AddColumn) { TableChange.AddColumn add = (TableChange.AddColumn) change; validateAlterNestedField(add.fieldNames()); @@ -373,7 +381,7 @@ public class SparkCatalog extends SparkBaseCatalog { return move; } - private Schema toUpdateSchema( + private Schema toInitialSchema( StructType schema, Transform[] partitions, Map<String, String> properties) { Preconditions.checkArgument( Arrays.stream(partitions) @@ -385,6 +393,7 @@ public class SparkCatalog extends SparkBaseCatalog { })); Map<String, String> normalizedProperties = new HashMap<>(properties); normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER); + normalizedProperties.remove(TableCatalog.PROP_COMMENT); String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER); List<String> primaryKeys = pkAsString == null @@ -400,7 +409,7 @@ public class SparkCatalog extends SparkBaseCatalog { Arrays.stream(partitions) .map(partition -> partition.references()[0].describe()) .collect(Collectors.toList())) - .comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, "")); + .comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, null)); for (StructField field : schema.fields()) { schemaBuilder.column( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java index dd0d5bce1..26d61e824 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java @@ -114,6 +114,9 @@ public class SparkTable CoreOptions.PRIMARY_KEY.key(), String.join(",", table.primaryKeys())); } properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME()); + if (table.comment().isPresent()) { + properties.put(TableCatalog.PROP_COMMENT, table.comment().get()); + } return properties; } else { return Collections.emptyMap(); diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala new file mode 100644 index 000000000..4a9a7f109 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row +import org.junit.jupiter.api.Assertions + +import java.util.Objects + +class DescribeTableTest extends PaimonSparkTestBase { + + test(s"Paimon describe: describe table comment") { + var comment = "test comment" + spark.sql(s""" + |CREATE TABLE T ( + | id INT COMMENT 'id comment', + | name STRING, + | dt STRING) + |COMMENT '$comment' + |""".stripMargin) + checkTableCommentEqual("T", comment) + + comment = "new comment" + spark.sql(s"ALTER TABLE T SET TBLPROPERTIES ('comment' = '$comment')") + checkTableCommentEqual("T", comment) + + comment = " " + spark.sql(s"ALTER TABLE T SET TBLPROPERTIES ('comment' = '$comment')") + checkTableCommentEqual("T", comment) + + comment = "" + spark.sql(s"ALTER TABLE T SET TBLPROPERTIES ('comment' = '$comment')") + checkTableCommentEqual("T", comment) + + spark.sql(s"ALTER TABLE T UNSET TBLPROPERTIES ('comment')") + checkTableCommentEqual("T", null) + + comment = "new comment" + spark.sql(s"ALTER TABLE T SET TBLPROPERTIES ('comment' = '$comment')") + checkTableCommentEqual("T", comment) + } + + test(s"Paimon describe: describe table with no comment") { + spark.sql(s""" + |CREATE TABLE T ( + | id INT COMMENT 'id comment', + | name STRING, + | dt STRING) + |""".stripMargin) + checkTableCommentEqual("T", null) + } + + def checkTableCommentEqual(tableName: String, comment: String): Unit = { + // check describe table + checkAnswer( + spark + .sql(s"DESCRIBE TABLE EXTENDED $tableName") + .filter("col_name = 'Comment'") + .select("col_name", "data_type"), + if (comment == null) Nil else Row("Comment", comment) :: Nil + ) + + // check comment in schema + Assertions.assertTrue(Objects.equals(comment, loadTable(tableName).schema().comment())) + } +}
