LadyForest commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r611443796
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1095,6 +1109,75 @@ private TableResult buildShowResult(String columnName,
String[] objects) {
Arrays.stream(objects).map((c) -> new
String[]{c}).toArray(String[][]::new));
}
+ private TableResult buildShowCreateTableResult(CatalogBaseTable table,
ObjectIdentifier sqlIdentifier) {
+ StringBuilder sb = new StringBuilder("CREATE TABLE ");
+ TableSchema schema = table.getSchema();
+ String comment = table.getComment();
+ Map<String, String> options = table.getOptions();
+
+ sb.append(String.format("`%s` (\n",
sqlIdentifier.getObjectName()));
+ // append columns
+ sb.append(String.join(",\n",
+ schema
+ .getTableColumns()
+ .stream()
+ .map(col -> {
+ if (col.getExpr().isPresent()) {
+ return String.format("%s`%s` AS
%s", printIndent, col.getName(), col.getExpr().get());
+ } else {
+ return String.format("%s`%s`
%s", printIndent, col.getName(), col.getType());
+ }
+ }).collect(Collectors.toList())));
+
+ // append watermark spec
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ sb.append(",\n") // add delimiter for last line
+ .append(String.join(",\n",
schema.getWatermarkSpecs().stream().map(
+ sepc -> String.format("%sWATERMARK FOR
`%s` AS %s", printIndent, sepc.getRowtimeAttribute(), sepc.getWatermarkExpr())
+ ).collect(Collectors.toList())));
+ }
+ // append constraint
+ if (schema.getPrimaryKey().isPresent()) {
+ UniqueConstraint constraint =
schema.getPrimaryKey().get();
+ sb.append(",\n") // add delimiter for last line
+ .append(String.format("%s%s", printIndent,
constraint.asCanonicalString()));
+ }
+ sb.append("\n) ");
+ // append comment
+ if (comment != null) {
Review comment:
`comment` won't be null for `DefaultCatalogTable#getComment` (see its
impl), I think it's better to use `StringUtils.isNotEmpty(comment)`
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -989,6 +989,30 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends
AbstractTestBase {
assertEquals(false, tableSchema2.getPrimaryKey.isPresent)
}
+ @Test
+ def testCreateTableAndShowCreateTable(): Unit = {
+ val createDDL =
+ """ |CREATE TABLE `TBL1` (
+ | `A` BIGINT NOT NULL,
+ | `H` STRING,
+ | `G` AS 2 * (`A` + 1),
+ | `B` STRING NOT NULL,
+ | `TS` TIMESTAMP(3),
+ | `PROC` AS PROCTIME(),
+ | WATERMARK FOR `TS` AS `TS` - INTERVAL '5' SECOND,
+ | CONSTRAINT test_constraint PRIMARY KEY (`A`, `B`) NOT ENFORCED
+ |) COMMENT 'test show create table statement'
+ |PARTITIONED BY (`B`, `H`)
+ |WITH (
+ | 'connector' = 'kafka',
+ | 'kafka.topic' = 'log.test'
+ |)
+ |""".stripMargin
+ tableEnv.executeSql(createDDL)
+ val row = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next();
Review comment:
`;` can be removed
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1095,6 +1109,75 @@ private TableResult buildShowResult(String columnName,
String[] objects) {
Arrays.stream(objects).map((c) -> new
String[]{c}).toArray(String[][]::new));
}
+ private TableResult buildShowCreateTableResult(CatalogBaseTable table,
ObjectIdentifier sqlIdentifier) {
+ StringBuilder sb = new StringBuilder("CREATE TABLE ");
Review comment:
I think we need to add a temporary table check here.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -989,6 +989,30 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends
AbstractTestBase {
assertEquals(false, tableSchema2.getPrimaryKey.isPresent)
}
+ @Test
+ def testCreateTableAndShowCreateTable(): Unit = {
+ val createDDL =
+ """ |CREATE TABLE `TBL1` (
+ | `A` BIGINT NOT NULL,
+ | `H` STRING,
+ | `G` AS 2 * (`A` + 1),
+ | `B` STRING NOT NULL,
+ | `TS` TIMESTAMP(3),
+ | `PROC` AS PROCTIME(),
+ | WATERMARK FOR `TS` AS `TS` - INTERVAL '5' SECOND,
+ | CONSTRAINT test_constraint PRIMARY KEY (`A`, `B`) NOT ENFORCED
+ |) COMMENT 'test show create table statement'
+ |PARTITIONED BY (`B`, `H`)
+ |WITH (
+ | 'connector' = 'kafka',
+ | 'kafka.topic' = 'log.test'
+ |)
+ |""".stripMargin
+ tableEnv.executeSql(createDDL)
+ val row = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next();
Review comment:
I think we'd better add a case for `create view`
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1095,6 +1109,75 @@ private TableResult buildShowResult(String columnName,
String[] objects) {
Arrays.stream(objects).map((c) -> new
String[]{c}).toArray(String[][]::new));
}
+ private TableResult buildShowCreateTableResult(CatalogBaseTable table,
ObjectIdentifier sqlIdentifier) {
+ StringBuilder sb = new StringBuilder("CREATE TABLE ");
+ TableSchema schema = table.getSchema();
+ String comment = table.getComment();
+ Map<String, String> options = table.getOptions();
+
+ sb.append(String.format("`%s` (\n",
sqlIdentifier.getObjectName()));
+ // append columns
+ sb.append(String.join(",\n",
+ schema
+ .getTableColumns()
+ .stream()
+ .map(col -> {
+ if (col.getExpr().isPresent()) {
+ return String.format("%s`%s` AS
%s", printIndent, col.getName(), col.getExpr().get());
+ } else {
+ return String.format("%s`%s`
%s", printIndent, col.getName(), col.getType());
+ }
+ }).collect(Collectors.toList())));
+
+ // append watermark spec
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ sb.append(",\n") // add delimiter for last line
+ .append(String.join(",\n",
schema.getWatermarkSpecs().stream().map(
+ sepc -> String.format("%sWATERMARK FOR
`%s` AS %s", printIndent, sepc.getRowtimeAttribute(), sepc.getWatermarkExpr())
+ ).collect(Collectors.toList())));
+ }
+ // append constraint
+ if (schema.getPrimaryKey().isPresent()) {
+ UniqueConstraint constraint =
schema.getPrimaryKey().get();
+ sb.append(",\n") // add delimiter for last line
+ .append(String.format("%s%s", printIndent,
constraint.asCanonicalString()));
+ }
+ sb.append("\n) ");
+ // append comment
+ if (comment != null) {
+ sb.append(String.format("COMMENT '%s'\n", comment));
+ }
+ // append partitions
+ if (table instanceof CatalogTable) {
+ CatalogTable catalogTable = (CatalogTable) table;
+ if (catalogTable.isPartitioned()) {
+ sb.append("PARTITIONED BY (")
+ .append(String.join(", ",
+ catalogTable
+ .getPartitionKeys()
+ .stream()
+ .map(key ->
String.format("`%s`", key))
+
.collect(Collectors.toList())))
+ .append(")\n");
+ }
+ }
+ // append `with` properties
+ sb.append("WITH (\n")
+ .append(String.join(",\n",
+ options
+ .entrySet()
+ .stream()
+ .map(entry -> String.format("%s'%s' =
'%s'", printIndent, entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList())))
+ .append("\n)\n");
+
+ Object[][] rows = new Object[][]{new Object[]{sb.toString()}};
Review comment:
I'm afraid that the row might be extremely large if the DDL string
resides in one row since all show statements are now printed as tableau form.
See the impl of `PrintUntils#columnWidthsByContent`, it calculates the column
size by the content size.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1095,6 +1109,75 @@ private TableResult buildShowResult(String columnName,
String[] objects) {
Arrays.stream(objects).map((c) -> new
String[]{c}).toArray(String[][]::new));
}
+ private TableResult buildShowCreateTableResult(CatalogBaseTable table,
ObjectIdentifier sqlIdentifier) {
+ StringBuilder sb = new StringBuilder("CREATE TABLE ");
+ TableSchema schema = table.getSchema();
+ String comment = table.getComment();
+ Map<String, String> options = table.getOptions();
+
+ sb.append(String.format("`%s` (\n",
sqlIdentifier.getObjectName()));
Review comment:
```java
String.format("`%s` (\n", sqlIdentifier.getObjectName())
```
Will only display table name without catalog and database info. Is it by
design?
If not, then I guess it should be `sqlIdentifier.asSerializableString()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]