tsreaper commented on code in PR #347:
URL: https://github.com/apache/flink-table-store/pull/347#discussion_r1013890357
##########
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java:
##########
@@ -281,6 +281,176 @@ public void testAddColumn() throws Exception {
assertThat(results.toString()).isEqualTo("[[8]]");
}
+ @Test
+ public void testRenameColumn() throws Exception {
+ Path tablePath = new Path(warehousePath,
"default.db/testRenameColumn");
+ createTestHelper(tablePath);
+
+ List<Row> beforeRename =
+ spark.sql("SHOW CREATE TABLE
tablestore.default.testRenameColumn").collectAsList();
+ assertThat(beforeRename.toString())
+ .isEqualTo(
+ "[[CREATE TABLE testRenameColumn (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT,\n"
+ + " `c` STRING)\n"
+ + "]]");
+
+ spark.sql("ALTER TABLE tablestore.default.testRenameColumn RENAME
COLUMN a to aa");
+
+ List<Row> afterRename =
+ spark.sql("SHOW CREATE TABLE
tablestore.default.testRenameColumn").collectAsList();
+ assertThat(afterRename.toString())
+ .isEqualTo(
+ "[[CREATE TABLE testRenameColumn (\n"
+ + " `aa` INT NOT NULL,\n"
+ + " `b` BIGINT,\n"
+ + " `c` STRING)\n"
+ + "]]");
+ }
+
+ @Test
+ public void testRenamePartitionKey() {
+ spark.sql("USE tablestore");
+ spark.sql(
+ "CREATE TABLE default.testRenamePartitionKey (\n"
+ + "a BIGINT,\n"
+ + "b STRING) USING tablestore\n"
+ + "COMMENT 'table comment'\n"
+ + "PARTITIONED BY (a)\n"
+ + "TBLPROPERTIES ('foo' = 'bar')");
+
+ List<Row> beforeRename =
+ spark.sql("SHOW CREATE TABLE
tablestore.default.testRenamePartitionKey")
+ .collectAsList();
+ assertThat(beforeRename.toString())
+ .isEqualTo(
+ "[[CREATE TABLE testRenamePartitionKey (\n"
+ + " `a` BIGINT,\n"
+ + " `b` STRING)\n"
+ + "PARTITIONED BY (a)\n"
+ + "]]");
+
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "ALTER TABLE
tablestore.default.testRenamePartitionKey RENAME COLUMN a to aa"))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("java.lang.UnsupportedOperationException: Cannot
rename partition key");
+ }
+
+ @Test
+ public void testDropSingleColumn() throws Exception {
+ Path tablePath = new Path(warehousePath,
"default.db/testDropSingleColumn");
+ createTestHelper(tablePath);
+
+ List<Row> beforeRename =
+ spark.sql("SHOW CREATE TABLE
tablestore.default.testDropSingleColumn")
+ .collectAsList();
+ assertThat(beforeRename.toString())
+ .isEqualTo(
+ "[[CREATE TABLE testDropSingleColumn (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT,\n"
+ + " `c` STRING)\n"
+ + "]]");
+
+ spark.sql("ALTER TABLE tablestore.default.testDropSingleColumn DROP
COLUMN a");
+
+ List<Row> afterRename =
+ spark.sql("SHOW CREATE TABLE
tablestore.default.testDropSingleColumn")
+ .collectAsList();
+ assertThat(afterRename.toString())
+ .isEqualTo(
+ "[[CREATE TABLE testDropSingleColumn (\n"
+ + " `b` BIGINT,\n"
+ + " `c` STRING)\n"
+ + "]]");
+ }
+
+ @Test
+ public void testDropColumns() throws Exception {
Review Comment:
"Read data will fail in this PR." What causes this issue? Are there any
differences between adding a column and renaming or dropping a column? If there
are they should be fixed before merging this PR. That is, let's first implement
FLINK-27846 and then come back to this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]