This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2c0a791e9d [flink] Support dropping primary keys for empty tables in
Flink (#7566)
2c0a791e9d is described below
commit 2c0a791e9d9c1ac64ad426a15c6ddbf4ae31d22f
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 1 10:41:43 2026 +0800
[flink] Support dropping primary keys for empty tables in Flink (#7566)
Paimon has supported adding primary keys for empty tables before. So in
this PR, we support the symmetrical operation: dropping primary keys for
empty tables.
---
.../org/apache/paimon/schema/SchemaChange.java | 27 +++++++++++++
.../org/apache/paimon/schema/SchemaManager.java | 11 +++++-
.../apache/paimon/schema/SchemaManagerTest.java | 45 ++++++++++++++++++++++
.../java/org/apache/paimon/flink/FlinkCatalog.java | 38 ++++++++++++++----
.../apache/paimon/flink/SchemaChangeITCase.java | 24 ++++++++++++
5 files changed, 136 insertions(+), 9 deletions(-)
diff --git
a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 4b68ad105a..9b381a3490 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -78,6 +78,9 @@ import java.util.Objects;
@JsonSubTypes.Type(
value = SchemaChange.UpdateColumnPosition.class,
name = SchemaChange.Actions.UPDATE_COLUMN_POSITION_ACTION),
+ @JsonSubTypes.Type(
+ value = SchemaChange.DropPrimaryKey.class,
+ name = SchemaChange.Actions.DROP_PRIMARY_KEY_ACTION),
})
public interface SchemaChange extends Serializable {
@@ -164,6 +167,10 @@ public interface SchemaChange extends Serializable {
return new UpdateColumnPosition(move);
}
+ static SchemaChange dropPrimaryKey() {
+ return new DropPrimaryKey();
+ }
+
/** A SchemaChange to set a table option. */
final class SetOption implements SchemaChange {
@@ -810,6 +817,25 @@ public interface SchemaChange extends Serializable {
}
}
+ /** A SchemaChange to drop primary key. */
+ final class DropPrimaryKey implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o != null && getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+
/** Actions for schema changes: identify for schema change. */
class Actions {
public static final String FIELD_ACTION = "action";
@@ -824,6 +850,7 @@ public interface SchemaChange extends Serializable {
public static final String UPDATE_COLUMN_COMMENT_ACTION =
"updateColumnComment";
public static final String UPDATE_COLUMN_DEFAULT_VALUE_ACTION =
"updateColumnDefaultValue";
public static final String UPDATE_COLUMN_POSITION_ACTION =
"updateColumnPosition";
+ public static final String DROP_PRIMARY_KEY_ACTION = "dropPrimaryKey";
private Actions() {}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 13da6677f7..8667f2271d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -68,6 +68,7 @@ import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -311,6 +312,7 @@ public class SchemaManager implements Serializable {
List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new
AtomicInteger(oldTableSchema.highestFieldId());
String newComment = oldTableSchema.comment();
+ List<String> newPrimaryKeys = oldTableSchema.primaryKeys();
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
@@ -549,6 +551,12 @@ public class SchemaManager implements Serializable {
update.newDefaultValue());
},
lazyIdentifier);
+ } else if (change instanceof SchemaChange.DropPrimaryKey) {
+ if (hasSnapshots.get()) {
+ throw new UnsupportedOperationException(
+ "Cannot drop primary keys on a non-empty table.");
+ }
+ newPrimaryKeys = Collections.emptyList();
} else {
throw new UnsupportedOperationException("Unsupported change: "
+ change.getClass());
}
@@ -561,8 +569,7 @@ public class SchemaManager implements Serializable {
newFields,
oldTableSchema.partitionKeys(),
applyNotNestedColumnRename(
- oldTableSchema.primaryKeys(),
- Iterables.filter(changes, RenameColumn.class)),
+ newPrimaryKeys, Iterables.filter(changes,
RenameColumn.class)),
applyRenameColumnsToOptions(newOptions, changes),
newComment);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 31496f8bb8..9a15cab361 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -460,6 +460,51 @@ public class SchemaManagerTest {
.hasMessage("Change 'merge-engine' is not supported yet.");
}
+ @Test
+ public void testDropPrimaryKeyOnEmptyTable() throws Exception {
+ Path tableRoot = new Path(tempDir.toString(), "table");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(),
tableRoot);
+ manager.createTable(schema);
+
+ // drop primary keys on empty table should succeed
+ manager.commitChanges(SchemaChange.dropPrimaryKey());
+
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+ assertThat(table.schema().primaryKeys()).isEmpty();
+ }
+
+ @Test
+ public void testDropPrimaryKeyOnNonEmptyTable() throws Exception {
+ Map<String, String> tableOptions = new HashMap<>(options);
+ tableOptions.put("bucket", "1");
+ Schema pkSchema =
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ primaryKeys,
+ tableOptions,
+ "");
+ Path tableRoot = new Path(tempDir.toString(), "table");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(),
tableRoot);
+ manager.createTable(pkSchema);
+
+ // write data to create a snapshot
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write =
+
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.close();
+ commit.close();
+
+ // drop primary keys on non-empty table should fail
+ assertThatThrownBy(() ->
manager.commitChanges(SchemaChange.dropPrimaryKey()))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot drop primary keys on a non-empty table.");
+ }
+
@Test
public void testAddAndDropNestedColumns() throws Exception {
RowType innerType =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index e05106eef6..5f59063668 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -510,7 +510,9 @@ public class FlinkCatalog extends AbstractCatalog {
}
private List<SchemaChange> toSchemaChange(
- TableChange change, Map<String, Integer>
oldTableNonPhysicalColumnIndex) {
+ TableChange change,
+ Map<String, Integer> oldTableNonPhysicalColumnIndex,
+ @Nullable String primaryKeyConstraintName) {
List<SchemaChange> schemaChanges = new ArrayList<>();
if (change instanceof AddColumn) {
if (((AddColumn) change).getColumn().isPhysical()) {
@@ -627,6 +629,15 @@ public class FlinkCatalog extends AbstractCatalog {
} else if (change instanceof MaterializedTableChange
&& handleMaterializedTableChange(change, schemaChanges)) {
return schemaChanges;
+ } else if (change instanceof TableChange.DropConstraint) {
+ TableChange.DropConstraint dropConstraint =
(TableChange.DropConstraint) change;
+ if (primaryKeyConstraintName == null
+ ||
!primaryKeyConstraintName.equals(dropConstraint.getConstraintName())) {
+ throw new UnsupportedOperationException(
+ "Only dropping primary key constraint is supported.");
+ }
+ schemaChanges.add(SchemaChange.dropPrimaryKey());
+ return schemaChanges;
}
throw new UnsupportedOperationException("Change is not supported: " +
change.getClass());
}
@@ -740,10 +751,17 @@ public class FlinkCatalog extends AbstractCatalog {
checkArgument(
table instanceof FileStoreTable,
"Only support alter data table, but is: " + table.getClass());
- validateAlterTable(toCatalogTable(table), newTable);
+ CatalogBaseTable oldCatalogTable = toCatalogTable(table);
+ validateAlterTable(oldCatalogTable, newTable);
Map<String, Integer> oldTableNonPhysicalColumnIndex =
FlinkCatalogPropertiesUtil.nonPhysicalColumns(
table.options(), table.rowType().getFieldNames());
+ String primaryKeyConstraintName =
+ oldCatalogTable
+ .getUnresolvedSchema()
+ .getPrimaryKey()
+ .map(pk -> pk.getConstraintName())
+ .orElse(null);
List<SchemaChange> changes = new ArrayList<>();
@@ -773,7 +791,9 @@ public class FlinkCatalog extends AbstractCatalog {
.flatMap(
tableChange ->
toSchemaChange(
- tableChange,
oldTableNonPhysicalColumnIndex)
+ tableChange,
+
oldTableNonPhysicalColumnIndex,
+ primaryKeyConstraintName)
.stream())
.collect(Collectors.toList());
changes.addAll(schemaChanges);
@@ -848,10 +868,10 @@ public class FlinkCatalog extends AbstractCatalog {
if (!table1IsMaterialized) {
org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema();
org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema();
- boolean pkEquality = false;
+ boolean allowAlterPk = false;
if (ts1.getPrimaryKey().isPresent() &&
ts2.getPrimaryKey().isPresent()) {
- pkEquality =
+ allowAlterPk =
Objects.equals(
ts1.getPrimaryKey().get().getConstraintName(),
ts2.getPrimaryKey().get().getConstraintName())
@@ -859,10 +879,14 @@ public class FlinkCatalog extends AbstractCatalog {
ts1.getPrimaryKey().get().getColumnNames(),
ts2.getPrimaryKey().get().getColumnNames());
} else if (!ts1.getPrimaryKey().isPresent() &&
!ts2.getPrimaryKey().isPresent()) {
- pkEquality = true;
+ allowAlterPk = true;
+ } else if (ts1.getPrimaryKey().isPresent() &&
!ts2.getPrimaryKey().isPresent()) {
+ // dropping primary key is allowed on empty tables,
+ // SchemaManager will validate this
+ allowAlterPk = true;
}
- if (!pkEquality) {
+ if (!allowAlterPk) {
throw new UnsupportedOperationException(
"Altering primary key is not supported yet.");
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 840dce7942..cd7805b9b3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1776,4 +1776,28 @@ public class SchemaChangeITCase extends
CatalogITCaseBase {
Row.of(1L, 100L, "buy", null, null, null,
"2024-01-01", "10"),
Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02",
"11"));
}
+
+ @Test
+ public void testDropPrimaryKeyOnEmptyTable() {
+ sql("CREATE TABLE T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)");
+
+ // drop primary key on empty table should succeed
+ sql("ALTER TABLE T DROP PRIMARY KEY");
+
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.get(0).toString()).doesNotContain("PRIMARY KEY");
+ }
+
+ @Test
+ public void testDropPrimaryKeyOnNonEmptyTable() {
+ sql("CREATE TABLE T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)");
+ sql("INSERT INTO T VALUES (1, 2, 'hello')");
+
+ // drop primary key on non-empty table should fail
+ assertThatThrownBy(() -> sql("ALTER TABLE T DROP PRIMARY KEY"))
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot drop primary keys on a non-empty
table."));
+ }
}