This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 36823449f [core] Immutable table options can now be changed on an
empty table (#3845)
36823449f is described below
commit 36823449f28f3053ed5412760637aed0e17a2076
Author: tsreaper <[email protected]>
AuthorDate: Tue Jul 30 17:46:00 2024 +0800
[core] Immutable table options can now be changed on an empty table (#3845)
---
.../main/java/org/apache/paimon/schema/Schema.java | 12 ++-
.../org/apache/paimon/schema/SchemaManager.java | 59 +++++++++------
.../apache/paimon/schema/SchemaManagerTest.java | 86 ++++++++++++++++++++++
.../apache/paimon/flink/SchemaChangeITCase.java | 26 ++++++-
.../apache/paimon/flink/SchemaChangeITCase.java | 25 ++++++-
5 files changed, 183 insertions(+), 25 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index b75758374..c6c79f4d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -161,7 +161,11 @@ public class Schema {
"Cannot define primary key on DDL and table options at
the same time.");
}
String pk = options.get(CoreOptions.PRIMARY_KEY.key());
- primaryKeys = Arrays.asList(pk.split(","));
+ primaryKeys =
+ Arrays.stream(pk.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
options.remove(CoreOptions.PRIMARY_KEY.key());
}
return primaryKeys;
@@ -174,7 +178,11 @@ public class Schema {
"Cannot define partition on DDL and table options at
the same time.");
}
String partitions = options.get(CoreOptions.PARTITION.key());
- partitionKeys = Arrays.asList(partitions.split(","));
+ partitionKeys =
+ Arrays.stream(partitions.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
options.remove(CoreOptions.PARTITION.key());
}
return partitionKeys;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 4f70ac725..684adfe9d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,6 +45,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
@@ -108,14 +109,14 @@ public class SchemaManager implements Serializable {
try {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
.reduce(Math::max)
- .map(id -> schema(id));
+ .map(this::schema);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public List<TableSchema> listAll() {
- return listAllIds().stream().map(id ->
schema(id)).collect(Collectors.toList());
+ return
listAllIds().stream().map(this::schema).collect(Collectors.toList());
}
/** List all schema IDs. */
@@ -184,24 +185,31 @@ public class SchemaManager implements Serializable {
public TableSchema commitChanges(List<SchemaChange> changes)
throws Catalog.TableNotExistException,
Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
+ SnapshotManager snapshotManager = new SnapshotManager(fileIO,
tableRoot, branch);
+ boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null);
+
while (true) {
- TableSchema schema =
+ TableSchema oldTableSchema =
latest().orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(branchPath(),
true)));
- Map<String, String> newOptions = new HashMap<>(schema.options());
- List<DataField> newFields = new ArrayList<>(schema.fields());
- AtomicInteger highestFieldId = new
AtomicInteger(schema.highestFieldId());
- String newComment = schema.comment();
+ Map<String, String> newOptions = new
HashMap<>(oldTableSchema.options());
+ List<DataField> newFields = new
ArrayList<>(oldTableSchema.fields());
+ AtomicInteger highestFieldId = new
AtomicInteger(oldTableSchema.highestFieldId());
+ String newComment = oldTableSchema.comment();
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
- checkAlterTableOption(setOption.key());
+ if (hasSnapshots) {
+ checkAlterTableOption(setOption.key());
+ }
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
- checkAlterTableOption(removeOption.key());
+ if (hasSnapshots) {
+ checkAlterTableOption(removeOption.key());
+ }
newOptions.remove(removeOption.key());
} else if (change instanceof UpdateComment) {
UpdateComment updateComment = (UpdateComment) change;
@@ -245,7 +253,7 @@ public class SchemaManager implements Serializable {
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
- validateNotPrimaryAndPartitionKey(schema,
rename.fieldName());
+ validateNotPrimaryAndPartitionKey(oldTableSchema,
rename.fieldName());
if (newFields.stream().anyMatch(f ->
f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
fromPath(branchPath(), true),
rename.fieldName());
@@ -263,7 +271,7 @@ public class SchemaManager implements Serializable {
field.description()));
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
- validateNotPrimaryAndPartitionKey(schema,
drop.fieldName());
+ validateNotPrimaryAndPartitionKey(oldTableSchema,
drop.fieldName());
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn)
change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
@@ -274,7 +282,7 @@ public class SchemaManager implements Serializable {
}
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
- if (schema.partitionKeys().contains(update.fieldName())) {
+ if
(oldTableSchema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
"Cannot update partition column [%s]
type in the table[%s].",
@@ -310,7 +318,7 @@ public class SchemaManager implements Serializable {
UpdateColumnNullability update = (UpdateColumnNullability)
change;
if (update.fieldNames().length == 1
&& update.newNullability()
- &&
schema.primaryKeys().contains(update.fieldNames()[0])) {
+ &&
oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
throw new UnsupportedOperationException(
"Cannot change nullability of primary key");
}
@@ -346,20 +354,29 @@ public class SchemaManager implements Serializable {
}
}
- TableSchema newSchema =
- new TableSchema(
- schema.id() + 1,
+ // We change TableSchema to Schema, because we want to deal with
primary-key and
+ // partition in options.
+ Schema newSchema =
+ new Schema(
newFields,
- highestFieldId.get(),
- schema.partitionKeys(),
- schema.primaryKeys(),
+ oldTableSchema.partitionKeys(),
+ oldTableSchema.primaryKeys(),
newOptions,
newComment);
+ TableSchema newTableSchema =
+ new TableSchema(
+ oldTableSchema.id() + 1,
+ newSchema.fields(),
+ highestFieldId.get(),
+ newSchema.partitionKeys(),
+ newSchema.primaryKeys(),
+ newSchema.options(),
+ newSchema.comment());
try {
- boolean success = commit(newSchema);
+ boolean success = commit(newTableSchema);
if (success) {
- return newSchema;
+ return newTableSchema;
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 742e2188f..4bd965268 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -19,9 +19,18 @@
package org.apache.paimon.schema;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -441,4 +450,81 @@ public class SchemaManagerTest {
Assertions.assertEquals(
2, fields.get(0).id(), "The field id should remain as 2 after
moving f2 before f0");
}
+
+ @Test
+ public void testAlterImmutableOptionsOnEmptyTable() throws Exception {
+ // create table without primary keys
+ Schema schema =
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ "");
+ Path tableRoot = new Path(tempDir.toString(), "table");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(),
tableRoot);
+ manager.createTable(schema);
+
+ // set immutable options and set primary keys
+ manager.commitChanges(
+ SchemaChange.setOption("primary-key", "f0, f1"),
+ SchemaChange.setOption("partition", "f0"),
+ SchemaChange.setOption("bucket", "2"),
+ SchemaChange.setOption("merge-engine", "first-row"));
+
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+ assertThat(table.schema().partitionKeys()).containsExactly("f0");
+ assertThat(table.schema().primaryKeys()).containsExactly("f0", "f1");
+
+ // read and write data to check that table is really a primary key
table with first-row
+ // merge engine
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write =
+
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
+ write.write(GenericRow.of(1, 20L, BinaryString.fromString("banana")));
+ write.write(GenericRow.of(2, 10L, BinaryString.fromString("cat")));
+ write.write(GenericRow.of(2, 20L, BinaryString.fromString("dog")));
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.write(GenericRow.of(1, 20L, BinaryString.fromString("peach")));
+ write.write(GenericRow.of(1, 30L, BinaryString.fromString("mango")));
+ write.write(GenericRow.of(2, 20L, BinaryString.fromString("tiger")));
+ write.write(GenericRow.of(2, 30L, BinaryString.fromString("wolf")));
+ commit.commit(2, write.prepareCommit(false, 2));
+ write.close();
+ commit.close();
+
+ List<String> actual = new ArrayList<>();
+ try (RecordReaderIterator<InternalRow> it =
+ new RecordReaderIterator<>(
+
table.newRead().createReader(table.newSnapshotReader().read()))) {
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ actual.add(
+ String.format(
+ "%s %d %d %s",
+ row.getRowKind().shortString(),
+ row.getInt(0),
+ row.getLong(1),
+ row.getString(2)));
+ }
+ }
+ assertThat(actual)
+ .containsExactlyInAnyOrder(
+ "+I 1 10 apple",
+ "+I 1 20 banana",
+ "+I 1 30 mango",
+ "+I 2 10 cat",
+ "+I 2 20 dog",
+ "+I 2 30 wolf");
+
+ // now that table is not empty, we cannot change immutable options
+ assertThatThrownBy(
+ () ->
+ manager.commitChanges(
+ SchemaChange.setOption("merge-engine",
"deduplicate")))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Change 'merge-engine' is not supported yet.");
+ }
}
diff --git
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index adaa5b28c..19ad41cae 100644
---
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink;
+import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import java.util.Map;
@@ -44,9 +45,29 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
}
@Test
- public void testSetAndResetImmutableOptions() {
+ public void testSetAndResetImmutableOptionsOnEmptyTables() {
+ sql("CREATE TABLE T1 (a INT, b INT)");
+ sql(
+ "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1',
'merge-engine' = 'first-row')");
+ sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+ assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1,
10), Row.of(2, 20));
+ assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' =
'deduplicate')"))
+ .rootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Change 'merge-engine' is not supported yet.");
+
+ sql(
+ "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED)
WITH ('bucket' = '1', 'merge-engine' = 'first-row')");
+ sql("ALTER TABLE T2 RESET ('merge-engine')");
+ sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+ assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1,
11), Row.of(2, 21));
+ }
+
+ @Test
+ public void testSetAndResetImmutableOptionsOnNonEmptyTables() {
// bucket-key is immutable
sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)");
+ sql("INSERT INTO T1 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' =
'c')"))
.rootCause()
@@ -55,6 +76,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
sql(
"CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket'
= '1', 'bucket-key' = 'c')");
+ sql("INSERT INTO T2 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
@@ -63,6 +85,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
// merge-engine is immutable
sql(
"CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH
('merge-engine' = 'partial-update')");
+ sql("INSERT INTO T4 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
@@ -70,6 +93,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
// sequence.field is immutable
sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH
('sequence.field' = 'b')");
+ sql("INSERT INTO T5 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' =
'c')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index fc5a3dbe0..81f07b224 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -846,10 +846,30 @@ public class SchemaChangeITCase extends CatalogITCaseBase
{
}
@Test
- public void testSetAndResetImmutableOptions() throws Exception {
+ public void testSetAndResetImmutableOptionsOnEmptyTables() {
+ sql("CREATE TABLE T1 (a INT, b INT)");
+ sql(
+ "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1',
'merge-engine' = 'first-row')");
+ sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+ assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1,
10), Row.of(2, 20));
+ assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' =
'deduplicate')"))
+ .rootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Change 'merge-engine' is not supported yet.");
+
+ sql(
+ "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED)
WITH ('bucket' = '1', 'merge-engine' = 'first-row')");
+ sql("ALTER TABLE T2 RESET ('merge-engine')");
+ sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+ assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1,
11), Row.of(2, 21));
+ }
+
+ @Test
+ public void testSetAndResetImmutableOptionsOnNonEmptyTables() {
// bucket-key is immutable
sql(
"CREATE TABLE T1 (a STRING, b STRING, c STRING) WITH ('bucket'
= '1', 'bucket-key' = 'a')");
+ sql("INSERT INTO T1 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' =
'c')"))
.rootCause()
@@ -858,6 +878,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
sql(
"CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket'
= '1', 'bucket-key' = 'c')");
+ sql("INSERT INTO T2 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
@@ -866,6 +887,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
// merge-engine is immutable
sql(
"CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH
('merge-engine' = 'partial-update')");
+ sql("INSERT INTO T4 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
@@ -873,6 +895,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
// sequence.field is immutable
sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH
('sequence.field' = 'b')");
+ sql("INSERT INTO T5 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' =
'c')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)