This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.3 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit eeb09ca128553d77e9f8886b335e6048cd498179 Author: yuzelin <[email protected]> AuthorDate: Wed Jan 4 09:42:59 2023 +0800 [FLINK-30522] Fix 'SHOW TBLPROPERTIES' can't read properties of table in Spark3 This closes #451 --- .../org/apache/flink/table/store/CoreOptions.java | 4 +++ .../table/store/file/schema/SchemaManagerTest.java | 20 +++++++----- .../apache/flink/table/store/spark/SparkTable.java | 12 +++++++ .../flink/table/store/spark/SparkReadITCase.java | 18 ++++++++++ .../store/spark/SparkSchemaEvolutionITCase.java | 38 +++++++++++++++++++++- 5 files changed, 83 insertions(+), 9 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java index 22974af1..ba61e105 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java @@ -405,6 +405,10 @@ public class CoreOptions implements Serializable { this.options = options; } + public Map<String, String> toMap() { + return options.toMap(); + } + public int bucket() { return options.get(BUCKET); } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java index 5d6a4f94..fb4086a1 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java @@ -110,14 +110,18 @@ public class SchemaManagerTest { assertThatThrownBy( () -> - manager.commitNewVersion( - new UpdateSchema( - rowType, - partitionKeys, - primaryKeys, - Collections.singletonMap( - CoreOptions.SEQUENCE_FIELD.key(), "f4"), - ""))) + retryArtificialException( + () -> + manager.commitNewVersion( + new UpdateSchema( + rowType, + partitionKeys, + primaryKeys, + Collections.singletonMap( + CoreOptions.SEQUENCE_FIELD + .key(), + "f4"), + "")))) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Nonexistent sequence field: 'f4'"); } diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java index 0984dba1..5e2aeb44 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java @@ -21,6 +21,7 @@ package org.apache.flink.table.store.spark; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.store.file.operation.Lock; import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.store.table.DataTable; import org.apache.flink.table.store.table.SupportsPartition; import org.apache.flink.table.store.table.Table; @@ -39,8 +40,10 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; @@ -118,6 +121,15 @@ public class SparkTable castToWritable(table).deleteWhere(commitUser, predicates, lockFactory); } + @Override + public Map<String, String> properties() { + if (table instanceof DataTable) { + return ((DataTable) table).options().toMap(); + } else { + return Collections.emptyMap(); + } + } + private static org.apache.flink.table.store.table.SupportsWrite castToWritable(Table table) { if (!(table instanceof org.apache.flink.table.store.table.SupportsWrite)) { throw new UnsupportedOperationException( diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java index 1c6f8caf..2d2cba1f 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java @@ -147,6 +147,24 @@ public class SparkReadITCase extends SparkReadTestBase { .isEqualTo("[[a,bigint,], [b,string,], [,,], [# Partitioning,,], [Part 0,a,]]"); } + @Test + public void testShowTableProperties() { + spark.sql("USE tablestore"); + spark.sql( + "CREATE TABLE default.tbl (\n" + + "a INT\n" + + ") TBLPROPERTIES (\n" + + "'k1' = 'v1',\n" + + "'k2' = 'v2'" + + ")"); + + assertThat( + spark.sql("SHOW TBLPROPERTIES default.tbl").collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .contains("[k1,v1]", "[k2,v2]"); + } + @Test public void testCreateTableWithInvalidPk() { spark.sql("USE tablestore"); diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java index fb88cc88..22a096dd 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java @@ -93,6 +93,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + " `a` INT NOT NULL,\n" + " `b` BIGINT,\n" + " `c` STRING)\n" + + buildTableProperties("default.db/testAddNotNullColumn") + "]]"); assertThatThrownBy( @@ -120,6 +121,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + " `a` INT NOT NULL,\n" + " `b` BIGINT,\n" + " `c` STRING)\n" + + buildTableProperties("default.db/testRenameColumn") + "]]"); Dataset<Row> table1 = spark.table("tablestore.default.testRenameColumn"); List<Row> results = table1.select("a", "c").collectAsList(); @@ -135,6 +137,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + " `aa` INT NOT NULL,\n" + " `b` BIGINT,\n" + " `c` STRING)\n" + + buildTableProperties("default.db/testRenameColumn") + "]]"); Dataset<Row> table2 = spark.table("tablestore.default.testRenameColumn"); results = table2.select("aa", "c").collectAsList(); @@ -163,7 +166,14 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { "[[CREATE TABLE testRenamePartitionKey (\n" + " `a` BIGINT,\n" + " `b` STRING)\n" + + "USING tablestore\n" + "PARTITIONED BY (a)\n" + + "COMMENT 'table comment'\n" + + "TBLPROPERTIES(\n" + + " 'foo' = 'bar',\n" + + String.format( + " 'path' = '%s/%s')\n", + warehousePath, "default.db/testRenamePartitionKey") + "]]"); assertThatThrownBy( @@ -194,6 +204,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + " `a` INT NOT NULL,\n" + " `b` BIGINT,\n" + " `c` STRING)\n" + + buildTableProperties("default.db/testDropSingleColumn") + "]]"); spark.sql("ALTER TABLE tablestore.default.testDropSingleColumn DROP COLUMN a"); @@ -206,6 +217,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { "[[CREATE TABLE testDropSingleColumn (\n" + " `b` BIGINT,\n" + " `c` STRING)\n" + + buildTableProperties("default.db/testDropSingleColumn") + "]]"); Dataset<Row> table = spark.table("tablestore.default.testDropSingleColumn"); @@ -226,6 +238,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + " `a` INT NOT NULL,\n" + " `b` BIGINT,\n" + " `c` STRING)\n" + + buildTableProperties("default.db/testDropColumns") + "]]"); spark.sql("ALTER TABLE tablestore.default.testDropColumns DROP COLUMNS a, b"); @@ -233,7 +246,11 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { List<Row> afterRename = spark.sql("SHOW CREATE TABLE tablestore.default.testDropColumns").collectAsList(); assertThat(afterRename.toString()) - .isEqualTo("[[CREATE TABLE testDropColumns (\n" + " `c` STRING)\n" + "]]"); + .isEqualTo( + "[[CREATE TABLE testDropColumns (\n" + + " `c` STRING)\n" + + buildTableProperties("default.db/testDropColumns") + + "]]"); } @Test @@ -255,7 +272,14 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { "[[CREATE TABLE testDropPartitionKey (\n" + " `a` BIGINT,\n" + " `b` STRING)\n" + + "USING tablestore\n" + "PARTITIONED BY (a)\n" + + "COMMENT 'table comment'\n" + + "TBLPROPERTIES(\n" + + " 'foo' = 'bar',\n" + + String.format( + " 'path' = '%s/%s')\n", + warehousePath, "default.db/testDropPartitionKey") + "]]"); assertThatThrownBy( @@ -288,7 +312,13 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { "[[CREATE TABLE testDropPrimaryKey (\n" + " `a` BIGINT NOT NULL,\n" + " `b` STRING NOT NULL)\n" + + "USING tablestore\n" + "PARTITIONED BY (a)\n" + + "COMMENT 'table comment'\n" + + "TBLPROPERTIES(\n" + + String.format( + " 'path' = '%s/%s')\n", + warehousePath, "default.db/testDropPrimaryKey") + "]]"); assertThatThrownBy( @@ -533,4 +563,10 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { .toString()) .isEqualTo("[[15,18,17,16], [19,22,21,20]]"); } + + private String buildTableProperties(String tablePath) { + return String.format( + "TBLPROPERTIES(\n" + " 'file.format' = 'avro',\n" + " 'path' = '%s/%s')\n", + warehousePath, tablePath); + } }
