Airblader commented on a change in pull request #17155:
URL: https://github.com/apache/flink/pull/17155#discussion_r702632614
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java
##########
@@ -24,12 +24,22 @@
* Abstract Operation to describe all ALTER TABLE statements such as rename
table /set properties.
*/
public abstract class AlterTableOperation implements AlterOperation {
+ protected final boolean ifExists;
Review comment:
I think we should move this below `tableIdentifier` and also change the
order in the constructor to `AlterTableOperation(ObjectIdentifier, boolean)`;
the table identifier is the "primary" information here.
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
##########
@@ -36,40 +36,61 @@
import static java.util.Objects.requireNonNull;
/**
- * Abstract class to describe statements like ALTER TABLE [[catalogName.]
dataBasesName].tableName
- * ...
+ * Abstract class to describe statements like ALTER TABLE IF EXISTS
[[catalogName.]
+ * dataBasesName].tableName ...
*/
public abstract class SqlAlterTable extends SqlCall {
public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("ALTER TABLE", SqlKind.ALTER_TABLE);
+ protected final boolean ifExists;
protected final SqlIdentifier tableIdentifier;
protected final SqlNodeList partitionSpec;
+ public SqlAlterTable(SqlParserPos pos, SqlIdentifier tableName) {
+ this(pos, false, tableName);
+ }
+
+ public SqlAlterTable(SqlParserPos pos, boolean ifExists, SqlIdentifier
tableName) {
Review comment:
The order (SqlParserPos, SqlIdentifier, boolean) would feel more natural
here, I think, since the table identifier is more important than the flag. The
same also for the other constructor which has the boolean.
In the DDL the boolean comes first, but in the Java code I think it is more
logical to have optional flags in the end.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
##########
@@ -40,13 +47,18 @@ public CatalogTable getCatalogTable() {
@Override
public String asSummaryString() {
String description =
- catalogTable.getOptions().entrySet().stream()
- .map(
- entry ->
- OperationUtils.formatParameter(
- entry.getKey(),
entry.getValue()))
- .collect(Collectors.joining(", "));
+ catalogTable == null
Review comment:
For `ALTER TABLE does_not_exist SET ('a' = 'b')` this will result in an
incorrect summary string `ALTER TABLE does_not_exist SET ()`.
This is a bit of a design problem; the operation probably shouldn't contain
a `CatalogTable`?
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java
##########
@@ -43,8 +43,11 @@
* @param pos Parser position
*/
public SqlAlterTableAddConstraint(
- SqlIdentifier tableID, SqlTableConstraint constraint, SqlParserPos
pos) {
- super(pos, tableID);
+ boolean ifExists,
Review comment:
Maybe change to `SqlAlterTableAddConstraint(SqlParserPos, SqlIdentifier,
SqlTableConstraint, boolean)`?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -759,14 +759,43 @@ private void dropTemporaryTableInternal(
*/
public void alterTable(
CatalogBaseTable table, ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
- execute(
- (catalog, path) -> {
- final CatalogBaseTable resolvedTable =
resolveCatalogBaseTable(table);
- catalog.alterTable(path, resolvedTable, ignoreIfNotExists);
- },
- objectIdentifier,
- ignoreIfNotExists,
- "AlterTable");
+ alterTableInternal(table, objectIdentifier, ignoreIfNotExists, true);
+ }
+
+ /**
+ * Alters a view in a given fully qualified path.
+ *
+ * @param table The view to put in the given path
+ * @param objectIdentifier The fully qualified path where to alter the
view.
+ * @param ignoreIfNotExists If false exception will be thrown if the view
or database or catalog
+ * to be altered does not exist.
+ */
+ public void alterView(
+ CatalogBaseTable table, ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
+ alterTableInternal(table, objectIdentifier, ignoreIfNotExists, false);
+ }
+
+ public void alterTableInternal(
+ CatalogBaseTable table,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfNotExists,
+ boolean isDropTable) {
Review comment:
`isDropTable` doesn't fit a generic `alterTableInternal` method, this
should be something like `isTable`?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableAddConstraintOperation.java
##########
@@ -65,7 +68,9 @@ public String asSummaryString() {
params.put("columns", this.columnNames);
return OperationUtils.formatWithChildren(
- "ALTER TABLE ADD CONSTRAINT",
+ String.format(
+ "ALTER TABLE %sADD CONSTRAINT",
+ ifExists ? "IF EXISTS" + StringUtils.SPACE :
StringUtils.EMPTY),
Review comment:
nit
```suggestion
ifExists ? "IF EXISTS " : StringUtils.EMPTY),
```
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java
##########
@@ -37,7 +46,9 @@ public ObjectIdentifier getNewTableIdentifier() {
@Override
public String asSummaryString() {
return String.format(
- "ALTER TABLE %s RENAME TO %s",
- tableIdentifier.asSummaryString(),
newTableIdentifier.asSummaryString());
+ "ALTER TABLE %s%s RENAME TO %s",
+ ifExists ? "IF EXISTS" + StringUtils.SPACE : StringUtils.EMPTY,
Review comment:
nit
```suggestion
ifExists ? "IF EXISTS " : StringUtils.EMPTY,
```
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -759,14 +759,43 @@ private void dropTemporaryTableInternal(
*/
public void alterTable(
CatalogBaseTable table, ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
- execute(
- (catalog, path) -> {
- final CatalogBaseTable resolvedTable =
resolveCatalogBaseTable(table);
- catalog.alterTable(path, resolvedTable, ignoreIfNotExists);
- },
- objectIdentifier,
- ignoreIfNotExists,
- "AlterTable");
+ alterTableInternal(table, objectIdentifier, ignoreIfNotExists, true);
+ }
+
+ /**
+ * Alters a view in a given fully qualified path.
+ *
+ * @param table The view to put in the given path
+ * @param objectIdentifier The fully qualified path where to alter the
view.
+ * @param ignoreIfNotExists If false exception will be thrown if the view
or database or catalog
+ * to be altered does not exist.
+ */
+ public void alterView(
+ CatalogBaseTable table, ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
+ alterTableInternal(table, objectIdentifier, ignoreIfNotExists, false);
+ }
+
+ public void alterTableInternal(
+ CatalogBaseTable table,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfNotExists,
+ boolean isDropTable) {
+ final Optional<CatalogBaseTable> resultOpt =
getUnresolvedTable(objectIdentifier);
+ if (resultOpt.isPresent()) {
+ execute(
+ (catalog, path) -> {
+ final CatalogBaseTable resolvedTable =
resolveCatalogBaseTable(table);
+ catalog.alterTable(path, resolvedTable,
ignoreIfNotExists);
+ },
+ objectIdentifier,
+ ignoreIfNotExists,
+ "AlterTable");
Review comment:
Should this be `"AlterView"` in the view case?
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java
##########
@@ -38,7 +38,15 @@
public SqlAlterTableRename(
SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier
newTableName) {
- super(pos, tableName);
+ this(pos, false, tableName, newTableName);
+ }
+
+ public SqlAlterTableRename(
+ SqlParserPos pos,
+ boolean ifExists,
Review comment:
Maybe change to `SqlAlterTableRename(SqlParserPos, SqlIdentifier,
SqlIdentifier, boolean)`?
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java
##########
@@ -29,22 +29,33 @@
import static java.util.Objects.requireNonNull;
-/** ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [,
name=value]*). */
+/**
+ * ALTER TABLE IF EXISTS [[catalogName.] dataBasesName].tableName SET (
name=value [, name=value]*).
+ */
public class SqlAlterTableOptions extends SqlAlterTable {
private final SqlNodeList propertyList;
public SqlAlterTableOptions(
- SqlParserPos pos, SqlIdentifier tableName, SqlNodeList
propertyList) {
- this(pos, tableName, null, propertyList);
+ SqlParserPos pos, boolean ifExists, SqlIdentifier tableName,
SqlNodeList propertyList) {
Review comment:
Maybe change to `SqlAlterTableOptions(SqlParserPos, SqlIdentifier,
SqlNodeList, boolean)`?
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
##########
@@ -33,13 +33,16 @@
import static java.util.Objects.requireNonNull;
-/** ALTER TABLE [[catalogName.] dataBasesName].tableName RESET ( 'key1' [,
'key2']*). */
+/** ALTER TABLE IF EXISTS [[catalogName.] dataBasesName].tableName RESET (
'key1' [, 'key2']*). */
public class SqlAlterTableReset extends SqlAlterTable {
private final SqlNodeList propertyKeyList;
public SqlAlterTableReset(
- SqlParserPos pos, SqlIdentifier tableName, SqlNodeList
propertyKeyList) {
- super(pos, tableName, null);
+ SqlParserPos pos,
Review comment:
Maybe change to `SqlAlterTableReset(SqlParserPos, SqlIdentifier,
SqlNodeList, boolean)`?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
##########
@@ -36,6 +38,10 @@ public String getConstraintName() {
@Override
public String asSummaryString() {
- return String.format("ALTER TABLE %s DROP CONSTRAINT %s",
tableIdentifier, constraintName);
+ return String.format(
+ "ALTER TABLE %s%s DROP CONSTRAINT %s",
+ ifExists ? "IF EXISTS" + StringUtils.SPACE : StringUtils.EMPTY,
Review comment:
nit
```suggestion
ifExists ? "IF EXISTS " : StringUtils.EMPTY,
```
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -385,13 +385,15 @@ private Operation convertAlterTable(SqlAlterTable
sqlAlterTable) {
ObjectIdentifier tableIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
Optional<CatalogManager.TableLookupResult> optionalCatalogTable =
catalogManager.getTable(tableIdentifier);
- if (!optionalCatalogTable.isPresent() ||
optionalCatalogTable.get().isTemporary()) {
+ if ((!sqlAlterTable.isIfExists() && !optionalCatalogTable.isPresent())
Review comment:
I think since `baseTable` is now nullable anyway, we can make this a bit
simpler. What do you think?
```
final CatalogBaseTable baseTable =
optionalCatalogTable.map(CatalogManager.TableLookupResult::getTable).orElse(null);
if ((baseTable == null && !sqlAlterTable.isIfExists())
|| (baseTable != null && baseTable.isTemporary()) {
throw new ValidationException(
String.format(
"Table %s doesn't exist or is a temporary
table.",
tableIdentifier.toString()));
}
```
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -759,14 +759,43 @@ private void dropTemporaryTableInternal(
*/
public void alterTable(
CatalogBaseTable table, ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
- execute(
- (catalog, path) -> {
- final CatalogBaseTable resolvedTable =
resolveCatalogBaseTable(table);
- catalog.alterTable(path, resolvedTable, ignoreIfNotExists);
- },
- objectIdentifier,
- ignoreIfNotExists,
- "AlterTable");
+ alterTableInternal(table, objectIdentifier, ignoreIfNotExists, true);
+ }
+
+ /**
+ * Alters a view in a given fully qualified path.
+ *
+ * @param table The view to put in the given path
Review comment:
The argument should probably be called `view` rather than `table` now?
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
##########
@@ -38,8 +38,11 @@
* @param pos Parser position
*/
public SqlAlterTableDropConstraint(
- SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos
pos) {
- super(pos, tableID);
+ boolean ifExists,
Review comment:
Maybe change to `SqlAlterTableDropConstraint(SqlParserPos,
SqlIdentifier, SqlIdentifier, boolean)`?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
##########
@@ -22,14 +22,21 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.OperationUtils;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.stream.Collectors;
-/** Operation to describe a ALTER TABLE .. SET .. statement. */
+/** Operation to describe a ALTER TABLE IF EXISTS .. SET .. statement. */
public class AlterTableOptionsOperation extends AlterTableOperation {
private final CatalogTable catalogTable;
public AlterTableOptionsOperation(ObjectIdentifier tableIdentifier,
CatalogTable catalogTable) {
- super(tableIdentifier);
+ this(false, tableIdentifier, catalogTable);
+ }
+
+ public AlterTableOptionsOperation(
Review comment:
`AlterTableOptionsOperation(ObjectIdentifier, CatalogTable, boolean)`?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
##########
@@ -20,13 +20,15 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
-/** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. * */
+import org.apache.commons.lang3.StringUtils;
+
+/** Operation of "ALTER TABLE IF EXISTS ADD [CONSTRAINT constraintName] ..."
clause. * */
public class AlterTableDropConstraintOperation extends AlterTableOperation {
private final String constraintName;
public AlterTableDropConstraintOperation(
- ObjectIdentifier tableIdentifier, String constraintName) {
- super(tableIdentifier);
+ boolean ifExists, ObjectIdentifier tableIdentifier, String
constraintName) {
Review comment:
Maybe change this to
`AlterTableDropConstraintOperation(ObjectIdentifier, String, boolean)`?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java
##########
@@ -20,13 +20,22 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
-/** Operation to describe a ALTER TABLE .. RENAME to .. statement. */
+import org.apache.commons.lang3.StringUtils;
+
+/** Operation to describe a ALTER TABLE IF EXISTS .. RENAME to .. statement. */
public class AlterTableRenameOperation extends AlterTableOperation {
private final ObjectIdentifier newTableIdentifier;
public AlterTableRenameOperation(
ObjectIdentifier tableIdentifier, ObjectIdentifier
newTableIdentifier) {
- super(tableIdentifier);
+ this(false, tableIdentifier, newTableIdentifier);
+ }
+
+ public AlterTableRenameOperation(
Review comment:
`AlterTableRenameOperation(ObjectIdentifier, ObjectIdentifier, boolean)`?
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -995,6 +995,56 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends
AbstractTestBase {
assertEquals(false, tableSchema2.getPrimaryKey.isPresent)
}
+ @Test
+ def testAlterTableWithIfExists(): Unit = {
Review comment:
I think a whole separate IT case for this isn't necessary, this is
really quite heavy on the test performance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]