tsreaper commented on code in PR #347:
URL: https://github.com/apache/flink-table-store/pull/347#discussion_r1013790492


##########
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java:
##########
@@ -281,6 +281,176 @@ public void testAddColumn() throws Exception {
         assertThat(results.toString()).isEqualTo("[[8]]");
     }
 
+    @Test
+    public void testRenameColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, 
"default.db/testRenameColumn");
+        createTestHelper(tablePath);
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenameColumn").collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenameColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        spark.sql("ALTER TABLE tablestore.default.testRenameColumn RENAME 
COLUMN a to aa");
+
+        List<Row> afterRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenameColumn").collectAsList();
+        assertThat(afterRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenameColumn (\n"
+                                + "  `aa` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+    }
+
+    @Test
+    public void testRenamePartitionKey() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.testRenamePartitionKey (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a)\n"
+                        + "TBLPROPERTIES ('foo' = 'bar')");
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenamePartitionKey")
+                        .collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenamePartitionKey (\n"
+                                + "  `a` BIGINT,\n"
+                                + "  `b` STRING)\n"
+                                + "PARTITIONED BY (a)\n"
+                                + "]]");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE 
tablestore.default.testRenamePartitionKey RENAME COLUMN a to aa"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("java.lang.UnsupportedOperationException: Cannot 
rename partition key");
+    }
+
+    @Test
+    public void testDropSingleColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, 
"default.db/testDropSingleColumn");
+        createTestHelper(tablePath);
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testDropSingleColumn")
+                        .collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropSingleColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        spark.sql("ALTER TABLE tablestore.default.testDropSingleColumn DROP 
COLUMN a");
+
+        List<Row> afterRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testDropSingleColumn")
+                        .collectAsList();
+        assertThat(afterRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropSingleColumn (\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+    }
+
+    @Test
+    public void testDropColumns() throws Exception {

Review Comment:
   Test that table can still be read.



##########
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java:
##########
@@ -281,6 +281,176 @@ public void testAddColumn() throws Exception {
         assertThat(results.toString()).isEqualTo("[[8]]");
     }
 
+    @Test
+    public void testRenameColumn() throws Exception {

Review Comment:
   Add these tests:
   1. Table can still be read after column names are changed. See 
`testAddColumn` for how to create such test.
   2. Swap two column names
       ```sql
       ALTER TABLE T RENAME COLUMN a to tmp;
       ALTER TABLE T RENAME COLUMN b to a;
       ALTER TABLE T RENAME COLUMN tmp to b;
       ```
       And check that table can still be read and order of columns is not 
disturbed.



##########
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java:
##########
@@ -281,6 +281,176 @@ public void testAddColumn() throws Exception {
         assertThat(results.toString()).isEqualTo("[[8]]");
     }
 
+    @Test
+    public void testRenameColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, 
"default.db/testRenameColumn");
+        createTestHelper(tablePath);
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenameColumn").collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenameColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        spark.sql("ALTER TABLE tablestore.default.testRenameColumn RENAME 
COLUMN a to aa");
+
+        List<Row> afterRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenameColumn").collectAsList();
+        assertThat(afterRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenameColumn (\n"
+                                + "  `aa` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+    }
+
+    @Test
+    public void testRenamePartitionKey() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.testRenamePartitionKey (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a)\n"
+                        + "TBLPROPERTIES ('foo' = 'bar')");
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenamePartitionKey")
+                        .collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenamePartitionKey (\n"
+                                + "  `a` BIGINT,\n"
+                                + "  `b` STRING)\n"
+                                + "PARTITIONED BY (a)\n"
+                                + "]]");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE 
tablestore.default.testRenamePartitionKey RENAME COLUMN a to aa"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("java.lang.UnsupportedOperationException: Cannot 
rename partition key");
+    }
+
+    @Test
+    public void testDropSingleColumn() throws Exception {

Review Comment:
   Test that table can still be read.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -191,6 +193,49 @@ public TableSchema commitChanges(List<SchemaChange> 
changes) throws Exception {
                     newFields.add(
                             new DataField(
                                     id, addColumn.fieldName(), dataType, 
addColumn.description()));
+                } else if (change instanceof RenameColumn) {
+                    RenameColumn rename = (RenameColumn) change;
+                    if (schema.partitionKeys().contains(rename.fieldName())) {
+                        throw new UnsupportedOperationException("Cannot rename 
partition key");
+                    }
+                    if (newFields.stream()
+                            .anyMatch(
+                                    f ->
+                                            StringUtils.equalsIgnoreCase(
+                                                    f.name(), 
rename.newName()))) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "The column [%s] exists in the 
table[%s].",
+                                        rename.newName(), tableRoot));
+                    }
+
+                    updateNestedColumn(
+                            newFields,
+                            new String[] {rename.fieldName()},
+                            0,
+                            (field) ->
+                                    new DataField(
+                                            field.id(),
+                                            rename.newName(),
+                                            field.type(),
+                                            field.description()));
+                } else if (change instanceof DropColumn) {
+                    DropColumn drop = (DropColumn) change;
+                    if (schema.partitionKeys().contains(drop.fieldName())) {
+                        throw new UnsupportedOperationException("Cannot drop 
partition key");
+                    }
+                    if (schema.primaryKeys().contains(drop.fieldName())) {
+                        throw new UnsupportedOperationException("Cannot drop 
primary key");
+                    }
+                    if (!newFields.removeIf(
+                            f ->
+                                    StringUtils.equalsIgnoreCase(
+                                            f.name(), ((DropColumn) 
change).fieldName()))) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "The column [%s] doesn't exist in the 
table[%s].",
+                                        drop.fieldName(), tableRoot));
+                    }

Review Comment:
   Consider a table without partition keys and primary keys. It seems that we 
can drop all columns of this table, which is obviously undesired. Fix this case 
and add a test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to