This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 36823449f [core] Immutable table options can now be changed on an 
empty table (#3845)
36823449f is described below

commit 36823449f28f3053ed5412760637aed0e17a2076
Author: tsreaper <[email protected]>
AuthorDate: Tue Jul 30 17:46:00 2024 +0800

    [core] Immutable table options can now be changed on an empty table (#3845)
---
 .../main/java/org/apache/paimon/schema/Schema.java | 12 ++-
 .../org/apache/paimon/schema/SchemaManager.java    | 59 +++++++++------
 .../apache/paimon/schema/SchemaManagerTest.java    | 86 ++++++++++++++++++++++
 .../apache/paimon/flink/SchemaChangeITCase.java    | 26 ++++++-
 .../apache/paimon/flink/SchemaChangeITCase.java    | 25 ++++++-
 5 files changed, 183 insertions(+), 25 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index b75758374..c6c79f4d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -161,7 +161,11 @@ public class Schema {
                         "Cannot define primary key on DDL and table options at 
the same time.");
             }
             String pk = options.get(CoreOptions.PRIMARY_KEY.key());
-            primaryKeys = Arrays.asList(pk.split(","));
+            primaryKeys =
+                    Arrays.stream(pk.split(","))
+                            .map(String::trim)
+                            .filter(s -> !s.isEmpty())
+                            .collect(Collectors.toList());
             options.remove(CoreOptions.PRIMARY_KEY.key());
         }
         return primaryKeys;
@@ -174,7 +178,11 @@ public class Schema {
                         "Cannot define partition on DDL and table options at 
the same time.");
             }
             String partitions = options.get(CoreOptions.PARTITION.key());
-            partitionKeys = Arrays.asList(partitions.split(","));
+            partitionKeys =
+                    Arrays.stream(partitions.split(","))
+                            .map(String::trim)
+                            .filter(s -> !s.isEmpty())
+                            .collect(Collectors.toList());
             options.remove(CoreOptions.PARTITION.key());
         }
         return partitionKeys;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 4f70ac725..684adfe9d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,6 +45,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.StringUtils;
 
 import javax.annotation.Nullable;
@@ -108,14 +109,14 @@ public class SchemaManager implements Serializable {
         try {
             return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
                     .reduce(Math::max)
-                    .map(id -> schema(id));
+                    .map(this::schema);
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
     }
 
     public List<TableSchema> listAll() {
-        return listAllIds().stream().map(id -> 
schema(id)).collect(Collectors.toList());
+        return 
listAllIds().stream().map(this::schema).collect(Collectors.toList());
     }
 
     /** List all schema IDs. */
@@ -184,24 +185,31 @@ public class SchemaManager implements Serializable {
     public TableSchema commitChanges(List<SchemaChange> changes)
             throws Catalog.TableNotExistException, 
Catalog.ColumnAlreadyExistException,
                     Catalog.ColumnNotExistException {
+        SnapshotManager snapshotManager = new SnapshotManager(fileIO, 
tableRoot, branch);
+        boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null);
+
         while (true) {
-            TableSchema schema =
+            TableSchema oldTableSchema =
                     latest().orElseThrow(
                                     () ->
                                             new Catalog.TableNotExistException(
                                                     fromPath(branchPath(), 
true)));
-            Map<String, String> newOptions = new HashMap<>(schema.options());
-            List<DataField> newFields = new ArrayList<>(schema.fields());
-            AtomicInteger highestFieldId = new 
AtomicInteger(schema.highestFieldId());
-            String newComment = schema.comment();
+            Map<String, String> newOptions = new 
HashMap<>(oldTableSchema.options());
+            List<DataField> newFields = new 
ArrayList<>(oldTableSchema.fields());
+            AtomicInteger highestFieldId = new 
AtomicInteger(oldTableSchema.highestFieldId());
+            String newComment = oldTableSchema.comment();
             for (SchemaChange change : changes) {
                 if (change instanceof SetOption) {
                     SetOption setOption = (SetOption) change;
-                    checkAlterTableOption(setOption.key());
+                    if (hasSnapshots) {
+                        checkAlterTableOption(setOption.key());
+                    }
                     newOptions.put(setOption.key(), setOption.value());
                 } else if (change instanceof RemoveOption) {
                     RemoveOption removeOption = (RemoveOption) change;
-                    checkAlterTableOption(removeOption.key());
+                    if (hasSnapshots) {
+                        checkAlterTableOption(removeOption.key());
+                    }
                     newOptions.remove(removeOption.key());
                 } else if (change instanceof UpdateComment) {
                     UpdateComment updateComment = (UpdateComment) change;
@@ -245,7 +253,7 @@ public class SchemaManager implements Serializable {
 
                 } else if (change instanceof RenameColumn) {
                     RenameColumn rename = (RenameColumn) change;
-                    validateNotPrimaryAndPartitionKey(schema, 
rename.fieldName());
+                    validateNotPrimaryAndPartitionKey(oldTableSchema, 
rename.fieldName());
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
                         throw new Catalog.ColumnAlreadyExistException(
                                 fromPath(branchPath(), true), 
rename.fieldName());
@@ -263,7 +271,7 @@ public class SchemaManager implements Serializable {
                                             field.description()));
                 } else if (change instanceof DropColumn) {
                     DropColumn drop = (DropColumn) change;
-                    validateNotPrimaryAndPartitionKey(schema, 
drop.fieldName());
+                    validateNotPrimaryAndPartitionKey(oldTableSchema, 
drop.fieldName());
                     if (!newFields.removeIf(
                             f -> f.name().equals(((DropColumn) 
change).fieldName()))) {
                         throw new Catalog.ColumnNotExistException(
@@ -274,7 +282,7 @@ public class SchemaManager implements Serializable {
                     }
                 } else if (change instanceof UpdateColumnType) {
                     UpdateColumnType update = (UpdateColumnType) change;
-                    if (schema.partitionKeys().contains(update.fieldName())) {
+                    if 
(oldTableSchema.partitionKeys().contains(update.fieldName())) {
                         throw new IllegalArgumentException(
                                 String.format(
                                         "Cannot update partition column [%s] 
type in the table[%s].",
@@ -310,7 +318,7 @@ public class SchemaManager implements Serializable {
                     UpdateColumnNullability update = (UpdateColumnNullability) 
change;
                     if (update.fieldNames().length == 1
                             && update.newNullability()
-                            && 
schema.primaryKeys().contains(update.fieldNames()[0])) {
+                            && 
oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
                         throw new UnsupportedOperationException(
                                 "Cannot change nullability of primary key");
                     }
@@ -346,20 +354,29 @@ public class SchemaManager implements Serializable {
                 }
             }
 
-            TableSchema newSchema =
-                    new TableSchema(
-                            schema.id() + 1,
+            // We change TableSchema to Schema, because we want to deal with 
primary-key and
+            // partition in options.
+            Schema newSchema =
+                    new Schema(
                             newFields,
-                            highestFieldId.get(),
-                            schema.partitionKeys(),
-                            schema.primaryKeys(),
+                            oldTableSchema.partitionKeys(),
+                            oldTableSchema.primaryKeys(),
                             newOptions,
                             newComment);
+            TableSchema newTableSchema =
+                    new TableSchema(
+                            oldTableSchema.id() + 1,
+                            newSchema.fields(),
+                            highestFieldId.get(),
+                            newSchema.partitionKeys(),
+                            newSchema.primaryKeys(),
+                            newSchema.options(),
+                            newSchema.comment());
 
             try {
-                boolean success = commit(newSchema);
+                boolean success = commit(newTableSchema);
                 if (success) {
-                    return newSchema;
+                    return newTableSchema;
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 742e2188f..4bd965268 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -19,9 +19,18 @@
 package org.apache.paimon.schema;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -441,4 +450,81 @@ public class SchemaManagerTest {
         Assertions.assertEquals(
                 2, fields.get(0).id(), "The field id should remain as 2 after 
moving f2 before f0");
     }
+
+    @Test
+    public void testAlterImmutableOptionsOnEmptyTable() throws Exception {
+        // create table without primary keys
+        Schema schema =
+                new Schema(
+                        rowType.getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options,
+                        "");
+        Path tableRoot = new Path(tempDir.toString(), "table");
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), 
tableRoot);
+        manager.createTable(schema);
+
+        // set immutable options and set primary keys
+        manager.commitChanges(
+                SchemaChange.setOption("primary-key", "f0, f1"),
+                SchemaChange.setOption("partition", "f0"),
+                SchemaChange.setOption("bucket", "2"),
+                SchemaChange.setOption("merge-engine", "first-row"));
+
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+        assertThat(table.schema().partitionKeys()).containsExactly("f0");
+        assertThat(table.schema().primaryKeys()).containsExactly("f0", "f1");
+
+        // read and write data to check that table is really a primary key 
table with first-row
+        // merge engine
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write =
+                
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+        TableCommitImpl commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
+        write.write(GenericRow.of(1, 20L, BinaryString.fromString("banana")));
+        write.write(GenericRow.of(2, 10L, BinaryString.fromString("cat")));
+        write.write(GenericRow.of(2, 20L, BinaryString.fromString("dog")));
+        commit.commit(1, write.prepareCommit(false, 1));
+        write.write(GenericRow.of(1, 20L, BinaryString.fromString("peach")));
+        write.write(GenericRow.of(1, 30L, BinaryString.fromString("mango")));
+        write.write(GenericRow.of(2, 20L, BinaryString.fromString("tiger")));
+        write.write(GenericRow.of(2, 30L, BinaryString.fromString("wolf")));
+        commit.commit(2, write.prepareCommit(false, 2));
+        write.close();
+        commit.close();
+
+        List<String> actual = new ArrayList<>();
+        try (RecordReaderIterator<InternalRow> it =
+                new RecordReaderIterator<>(
+                        
table.newRead().createReader(table.newSnapshotReader().read()))) {
+            while (it.hasNext()) {
+                InternalRow row = it.next();
+                actual.add(
+                        String.format(
+                                "%s %d %d %s",
+                                row.getRowKind().shortString(),
+                                row.getInt(0),
+                                row.getLong(1),
+                                row.getString(2)));
+            }
+        }
+        assertThat(actual)
+                .containsExactlyInAnyOrder(
+                        "+I 1 10 apple",
+                        "+I 1 20 banana",
+                        "+I 1 30 mango",
+                        "+I 2 10 cat",
+                        "+I 2 20 dog",
+                        "+I 2 30 wolf");
+
+        // now that table is not empty, we cannot change immutable options
+        assertThatThrownBy(
+                        () ->
+                                manager.commitChanges(
+                                        SchemaChange.setOption("merge-engine", 
"deduplicate")))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Change 'merge-engine' is not supported yet.");
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index adaa5b28c..19ad41cae 100644
--- 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
 import java.util.Map;
@@ -44,9 +45,29 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testSetAndResetImmutableOptions() {
+    public void testSetAndResetImmutableOptionsOnEmptyTables() {
+        sql("CREATE TABLE T1 (a INT, b INT)");
+        sql(
+                "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 
'merge-engine' = 'first-row')");
+        sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+        assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 
10), Row.of(2, 20));
+        assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 
'deduplicate')"))
+                .rootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Change 'merge-engine' is not supported yet.");
+
+        sql(
+                "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) 
WITH ('bucket' = '1', 'merge-engine' = 'first-row')");
+        sql("ALTER TABLE T2 RESET ('merge-engine')");
+        sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+        assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 
11), Row.of(2, 21));
+    }
+
+    @Test
+    public void testSetAndResetImmutableOptionsOnNonEmptyTables() {
         // bucket-key is immutable
         sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)");
+        sql("INSERT INTO T1 VALUES ('a', 'b', 'c')");
 
         assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 
'c')"))
                 .rootCause()
@@ -55,6 +76,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
 
         sql(
                 "CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' 
= '1', 'bucket-key' = 'c')");
+        sql("INSERT INTO T2 VALUES ('a', 'b', 'c')");
         assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
                 .rootCause()
                 .isInstanceOf(UnsupportedOperationException.class)
@@ -63,6 +85,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
         // merge-engine is immutable
         sql(
                 "CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH 
('merge-engine' = 'partial-update')");
+        sql("INSERT INTO T4 VALUES ('a', 'b', 'c')");
         assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')"))
                 .rootCause()
                 .isInstanceOf(UnsupportedOperationException.class)
@@ -70,6 +93,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
 
         // sequence.field is immutable
         sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH 
('sequence.field' = 'b')");
+        sql("INSERT INTO T5 VALUES ('a', 'b', 'c')");
         assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 
'c')"))
                 .rootCause()
                 .isInstanceOf(UnsupportedOperationException.class)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index fc5a3dbe0..81f07b224 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -846,10 +846,30 @@ public class SchemaChangeITCase extends CatalogITCaseBase 
{
     }
 
     @Test
-    public void testSetAndResetImmutableOptions() throws Exception {
+    public void testSetAndResetImmutableOptionsOnEmptyTables() {
+        sql("CREATE TABLE T1 (a INT, b INT)");
+        sql(
+                "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 
'merge-engine' = 'first-row')");
+        sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+        assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 
10), Row.of(2, 20));
+        assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 
'deduplicate')"))
+                .rootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Change 'merge-engine' is not supported yet.");
+
+        sql(
+                "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) 
WITH ('bucket' = '1', 'merge-engine' = 'first-row')");
+        sql("ALTER TABLE T2 RESET ('merge-engine')");
+        sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+        assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 
11), Row.of(2, 21));
+    }
+
+    @Test
+    public void testSetAndResetImmutableOptionsOnNonEmptyTables() {
         // bucket-key is immutable
         sql(
                 "CREATE TABLE T1 (a STRING, b STRING, c STRING) WITH ('bucket' 
= '1', 'bucket-key' = 'a')");
+        sql("INSERT INTO T1 VALUES ('a', 'b', 'c')");
 
         assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 
'c')"))
                 .rootCause()
@@ -858,6 +878,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
 
         sql(
                 "CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' 
= '1', 'bucket-key' = 'c')");
+        sql("INSERT INTO T2 VALUES ('a', 'b', 'c')");
         assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
                 .rootCause()
                 .isInstanceOf(UnsupportedOperationException.class)
@@ -866,6 +887,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
         // merge-engine is immutable
         sql(
                 "CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH 
('merge-engine' = 'partial-update')");
+        sql("INSERT INTO T4 VALUES ('a', 'b', 'c')");
         assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')"))
                 .rootCause()
                 .isInstanceOf(UnsupportedOperationException.class)
@@ -873,6 +895,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
 
         // sequence.field is immutable
         sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH 
('sequence.field' = 'b')");
+        sql("INSERT INTO T5 VALUES ('a', 'b', 'c')");
         assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 
'c')"))
                 .rootCause()
                 .isInstanceOf(UnsupportedOperationException.class)

Reply via email to