This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3849f1194904374f1b692688d1d3f711b3be9a27 Author: Jane Chan <[email protected]> AuthorDate: Thu Dec 30 17:13:41 2021 +0800 [FLINK-25176][table] Introduce "ALTER TABLE ... COMPACT" syntax This closes #18236 --- .../src/main/codegen/data/Parser.tdd | 3 + .../src/main/codegen/includes/parserImpls.ftl | 12 ++ .../flink/sql/parser/ddl/SqlAlterTableCompact.java | 54 +++++++++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 15 +++ .../table/api/internal/TableEnvironmentImpl.java | 3 + .../flink/table/catalog/ManagedTableListener.java | 4 +- .../operations/ddl/AlterTableCompactOperation.java | 58 ++++++++++ .../operations/SqlToOperationConverter.java | 58 +++++++++- .../operations/SqlToOperationConverterTest.java | 121 +++++++++++++++++++-- .../flink/table/api/TableEnvironmentTest.scala | 37 +++++++ 10 files changed, 351 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 1fb8518..e400d3f 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -31,6 +31,7 @@ "org.apache.flink.sql.parser.ddl.SqlAlterFunction" "org.apache.flink.sql.parser.ddl.SqlAlterTable" "org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint" + "org.apache.flink.sql.parser.ddl.SqlAlterTableCompact" "org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint" "org.apache.flink.sql.parser.ddl.SqlAlterTableOptions" "org.apache.flink.sql.parser.ddl.SqlAlterTableRename" @@ -110,6 +111,7 @@ "CATALOGS" "CHANGELOG_MODE" "COMMENT" + "COMPACT" "COLUMNS" "DATABASES" "ENFORCED" @@ -184,6 +186,7 @@ "COMMAND_FUNCTION" "COMMAND_FUNCTION_CODE" "COMMITTED" + "COMPACT" "CONDITIONAL" "CONDITION_NUMBER" "CONNECTION" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 86537a7..f68c50f 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -508,6 +508,7 @@ SqlAlterTable SqlAlterTable() : SqlIdentifier newTableIdentifier = null; SqlNodeList propertyList = SqlNodeList.EMPTY; SqlNodeList propertyKeyList = SqlNodeList.EMPTY; + SqlNodeList partitionSpec = null; SqlIdentifier constraintName; SqlTableConstraint constraint; } @@ -556,6 +557,17 @@ SqlAlterTable SqlAlterTable() : constraintName, startPos.plus(getPos())); } + | + [ + <PARTITION> + { partitionSpec = new SqlNodeList(getPos()); + PartitionSpecCommaList(partitionSpec); + } + ] + <COMPACT> + { + return new SqlAlterTableCompact(startPos.plus(getPos()), tableIdentifier, partitionSpec); + } ) } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java new file mode 100644 index 0000000..247f4f9 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nullable; + +import java.util.List; + +/** ALTER TABLE [[catalogName.] dataBasesName].tableName [PARTITION partition_spec] COMPACT. */ +public class SqlAlterTableCompact extends SqlAlterTable { + + public SqlAlterTableCompact( + SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) { + super(pos, tableName, partitionSpec); + } + + public SqlAlterTableCompact(SqlParserPos pos, SqlIdentifier tableName) { + super(pos, tableName); + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(tableIdentifier, partitionSpec); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("COMPACT"); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a6bbbb8..cc624bb 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -288,6 +288,21 @@ public class FlinkSqlParserImplTest extends SqlParserTest { } @Test + public void testAlterTableCompact() { + sql("alter table t1 compact").ok("ALTER TABLE `T1` COMPACT"); + + sql("alter table db1.t1 compact").ok("ALTER TABLE `DB1`.`T1` COMPACT"); + + sql("alter table cat1.db1.t1 compact").ok("ALTER TABLE `CAT1`.`DB1`.`T1` COMPACT"); + + sql("alter table t1 partition(x='y',m='n') compact") + .ok("ALTER TABLE `T1` PARTITION (`X` = 'y', `M` = 'n') COMPACT"); + + sql("alter table t1 partition(^)^ compact") + .fails("(?s).*Encountered \"\\)\" at line 1, column 26.\n.*"); + } + + @Test public void testCreateTable() { final String sql = "CREATE TABLE tbl1 (\n" diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index e163cd7..51550d8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -112,6 +112,7 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation; +import org.apache.flink.table.operations.ddl.AlterTableCompactOperation; import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation; import org.apache.flink.table.operations.ddl.AlterTableOperation; import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation; @@ -998,6 +999,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) { catalog.dropPartition(tablePath, spec, ifExists); } + } else if (alterTableOperation instanceof AlterTableCompactOperation) { + // TODO: FLINK-25176 work with managed table } return TableResultImpl.TABLE_RESULT_OK; } catch (TableAlreadyExistException | TableNotExistException e) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java index 211dce7..49779de 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java @@ -79,7 +79,9 @@ public class ManagedTableListener { } } - private boolean isManagedTable(@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) { + /** Check a resolved catalog table is Flink's managed table or not. */ + public static boolean isManagedTable( + @Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) { if (catalog == null || !catalog.supportsManagedTable()) { // catalog not support managed table return false; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java new file mode 100644 index 0000000..14af950 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.OperationUtils; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** Operation to describe "ALTER TABLE [PARTITION partition_spec] COMPACT" statement. */ +public class AlterTableCompactOperation extends AlterTableOperation { + + private final CatalogPartitionSpec partitionSpec; + + public AlterTableCompactOperation( + ObjectIdentifier tableIdentifier, @Nullable CatalogPartitionSpec partitionSpec) { + super(tableIdentifier); + this.partitionSpec = partitionSpec; + } + + public Map<String, String> getPartitionSpec() { + return partitionSpec == null + ? Collections.emptyMap() + : new LinkedHashMap<>(partitionSpec.getPartitionSpec()); + } + + @Override + public String asSummaryString() { + String spec = + partitionSpec == null + ? "" + : String.format( + "PARTITION (%s) ", + OperationUtils.formatPartitionSpec(partitionSpec)); + return String.format("ALTER TABLE %s %sCOMPACT", tableIdentifier.asSummaryString(), spec); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index a946308..28837d2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -25,6 +25,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterDatabase; import org.apache.flink.sql.parser.ddl.SqlAlterFunction; import org.apache.flink.sql.parser.ddl.SqlAlterTable; import org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint; +import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact; import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint; import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions; import org.apache.flink.sql.parser.ddl.SqlAlterTableRename; @@ -91,10 +92,13 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.CatalogViewImpl; import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ManagedTableListener; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.operations.BeginStatementSetOperation; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.DescribeTableOperation; @@ -129,6 +133,7 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation; +import org.apache.flink.table.operations.ddl.AlterTableCompactOperation; import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation; import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; @@ -169,6 +174,7 @@ import org.apache.calcite.sql.parser.SqlParser; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -484,6 +490,11 @@ public class SqlToOperationConverter { specs.add(new CatalogPartitionSpec(dropPartitions.getPartitionKVs(i))); } return new DropPartitionsOperation(tableIdentifier, dropPartitions.ifExists(), specs); + } else if (sqlAlterTable instanceof SqlAlterTableCompact) { + ResolvedCatalogTable resolvedCatalogTable = + (ResolvedCatalogTable) optionalCatalogTable.get().getResolvedTable(); + return convertAlterTableCompact( + tableIdentifier, resolvedCatalogTable, (SqlAlterTableCompact) sqlAlterTable); } else { throw new ValidationException( String.format( @@ -531,16 +542,57 @@ public class SqlToOperationConverter { CatalogTable oldTable, SqlAlterTableReset alterTableReset) { Map<String, String> newOptions = new HashMap<>(oldTable.getOptions()); - // reset empty key is not allowed + // reset empty or 'connector' key is not allowed Set<String> resetKeys = alterTableReset.getResetKeys(); - if (resetKeys.isEmpty()) { - throw new ValidationException("ALTER TABLE RESET does not support empty key"); + if (resetKeys.isEmpty() || resetKeys.contains(FactoryUtil.CONNECTOR.key())) { + String exMsg = + resetKeys.isEmpty() + ? "ALTER TABLE RESET does not support empty key" + : "ALTER TABLE RESET does not support changing 'connector'"; + throw new ValidationException(exMsg); } // reset table option keys resetKeys.forEach(newOptions::remove); return new AlterTableOptionsOperation(tableIdentifier, oldTable.copy(newOptions)); } + private Operation convertAlterTableCompact( + ObjectIdentifier tableIdentifier, + ResolvedCatalogTable resolvedCatalogTable, + SqlAlterTableCompact alterTableCompact) { + Catalog catalog = catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null); + if (ManagedTableListener.isManagedTable(catalog, resolvedCatalogTable)) { + LinkedHashMap<String, String> partitionKVs = alterTableCompact.getPartitionKVs(); + CatalogPartitionSpec partitionSpec = null; + if (partitionKVs != null) { + List<String> orderedPartitionKeys = resolvedCatalogTable.getPartitionKeys(); + Set<String> validPartitionKeySet = new HashSet<>(orderedPartitionKeys); + String exMsg = + orderedPartitionKeys.isEmpty() + ? String.format("Table %s is not partitioned.", tableIdentifier) + : String.format( + "Available ordered partition columns: [%s]", + orderedPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'"))); + partitionKVs.forEach( + (partitionKey, partitionValue) -> { + if (!validPartitionKeySet.contains(partitionKey)) { + throw new ValidationException( + String.format( + "Partition column '%s' not defined in the table schema. %s", + partitionKey, exMsg)); + } + }); + partitionSpec = new CatalogPartitionSpec(partitionKVs); + } + return new AlterTableCompactOperation(tableIdentifier, partitionSpec); + } + throw new ValidationException( + String.format( + "ALTER TABLE COMPACT operation is not supported for non-managed table %s", + tableIdentifier)); + } + /** Convert CREATE FUNCTION statement. */ private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) { UnresolvedIdentifier unresolvedIdentifier = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 97d4ae3..9651cc6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -66,6 +66,7 @@ import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.operations.command.ShowJarsOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation; +import org.apache.flink.table.operations.ddl.AlterTableCompactOperation; import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation; import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; @@ -1167,7 +1168,7 @@ public class SqlToOperationConverterTest { @Test public void testAlterTable() throws Exception { - prepareTable(false); + prepareNonManagedTable(false); final String[] renameTableSqls = new String[] { "alter table cat1.db1.tb1 rename to tb2", @@ -1193,6 +1194,7 @@ public class SqlToOperationConverterTest { "alter table cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')", SqlDialect.DEFAULT); Map<String, String> expectedOptions = new HashMap<>(); + expectedOptions.put("connector", "dummy"); expectedOptions.put("k", "v"); expectedOptions.put("k1", "v1"); expectedOptions.put("K2", "V2"); @@ -1201,7 +1203,15 @@ public class SqlToOperationConverterTest { // test alter table reset operation = parse("alter table cat1.db1.tb1 reset ('k')", SqlDialect.DEFAULT); - assertAlterTableOptions(operation, expectedIdentifier, Collections.emptyMap()); + assertAlterTableOptions( + operation, expectedIdentifier, Collections.singletonMap("connector", "dummy")); + assertThatThrownBy( + () -> + parse( + "alter table cat1.db1.tb1 reset ('connector')", + SqlDialect.DEFAULT)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("ALTER TABLE RESET does not support changing 'connector'"); assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset ()", SqlDialect.DEFAULT)) .isInstanceOf(ValidationException.class) @@ -1210,7 +1220,7 @@ public class SqlToOperationConverterTest { @Test public void testAlterTableAddPkConstraint() throws Exception { - prepareTable(false); + prepareNonManagedTable(false); // Test alter add table constraint. Operation operation = parse( @@ -1236,7 +1246,7 @@ public class SqlToOperationConverterTest { @Test public void testAlterTableAddPkConstraintEnforced() throws Exception { - prepareTable(false); + prepareNonManagedTable(false); // Test alter table add enforced assertThatThrownBy( () -> @@ -1253,7 +1263,7 @@ public class SqlToOperationConverterTest { @Test public void testAlterTableAddUniqueConstraint() throws Exception { - prepareTable(false); + prepareNonManagedTable(false); // Test alter add table constraint. assertThatThrownBy( () -> @@ -1266,7 +1276,7 @@ public class SqlToOperationConverterTest { @Test public void testAlterTableAddUniqueConstraintEnforced() throws Exception { - prepareTable(false); + prepareNonManagedTable(false); // Test alter table add enforced assertThatThrownBy( () -> @@ -1279,7 +1289,7 @@ public class SqlToOperationConverterTest { @Test public void testAlterTableDropConstraint() throws Exception { - prepareTable(true); + prepareNonManagedTable(true); // Test alter table add enforced Operation operation = parse("alter table tb1 drop constraint ct1", SqlDialect.DEFAULT); assertThat(operation).isInstanceOf(AlterTableDropConstraintOperation.class); @@ -1293,6 +1303,83 @@ public class SqlToOperationConverterTest { } @Test + public void testAlterTableCompactOnNonManagedTable() throws Exception { + prepareNonManagedTable(false); + assertThatThrownBy(() -> parse("alter table tb1 compact", SqlDialect.DEFAULT)) + .isInstanceOf(ValidationException.class) + .hasMessage( + "ALTER TABLE COMPACT operation is not supported for non-managed table `cat1`.`db1`.`tb1`"); + } + + @Test + public void testAlterTableCompact() throws Exception { + prepareManagedTable(false); + Operation operation = parse("alter table tb1 compact", SqlDialect.DEFAULT); + assertThat(operation).isInstanceOf(AlterTableCompactOperation.class); + AlterTableCompactOperation compactOperation = (AlterTableCompactOperation) operation; + + assertThat(compactOperation.asSummaryString()) + .isEqualTo("ALTER TABLE cat1.db1.tb1 COMPACT"); + + // specify partition on a non-partitioned table + assertThatThrownBy( + () -> + parse( + "alter table tb1 partition(dt = 'a') compact", + SqlDialect.DEFAULT)) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Partition column 'dt' not defined in the table schema. Table `cat1`.`db1`.`tb1` is not partitioned."); + + // alter a non-existed table + assertThatThrownBy(() -> parse("alter table tb2 compact", SqlDialect.DEFAULT)) + .isInstanceOf(ValidationException.class) + .hasMessage("Table `cat1`.`db1`.`tb2` doesn't exist or is a temporary table."); + } + + @Test + public void testAlterTableCompactPartition() throws Exception { + prepareManagedTable(true); + + // compact partitioned table without partition_spec + assertThat(parse("alter table tb1 compact", SqlDialect.DEFAULT).asSummaryString()) + .isEqualTo("ALTER TABLE cat1.db1.tb1 COMPACT"); + + // compact partitioned table without full partition_spec + assertThat( + parse("alter table tb1 partition (b=1) compact", SqlDialect.DEFAULT) + .asSummaryString()) + .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (b=1) COMPACT"); + + assertThat( + parse("alter table tb1 partition (c=2) compact", SqlDialect.DEFAULT) + .asSummaryString()) + .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (c=2) COMPACT"); + + // compact partitioned table with full partition_spec + assertThat( + parse("alter table tb1 partition (b=1,c=2) compact", SqlDialect.DEFAULT) + .asSummaryString()) + .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (b=1, c=2) COMPACT"); + + // compact partitioned table with disordered partition_spec + assertThat( + parse("alter table tb1 partition (c=2,b=1) compact", SqlDialect.DEFAULT) + .asSummaryString()) + .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (c=2, b=1) COMPACT"); + + // compact partitioned table with a non-existed partition_spec + assertThatThrownBy( + () -> + parse( + "alter table tb1 partition (dt = 'a') compact", + SqlDialect.DEFAULT)) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Partition column 'dt' not defined in the table schema. Available ordered partition columns: ['b', 'c']"); + } + + @Test public void testCreateViewWithMatchRecognize() { Map<String, String> prop = new HashMap<>(); prop.put("connector", "values"); @@ -1534,7 +1621,16 @@ public class SqlToOperationConverterTest { return SqlToOperationConverter.convert(planner, catalogManager, node).get(); } - private void prepareTable(boolean hasConstraint) throws Exception { + private void prepareNonManagedTable(boolean hasConstraint) throws Exception { + prepareTable(false, false, hasConstraint); + } + + private void prepareManagedTable(boolean hasPartition) throws Exception { + prepareTable(true, hasPartition, false); + } + + private void prepareTable(boolean managedTable, boolean hasPartition, boolean hasConstraint) + throws Exception { Catalog catalog = new GenericInMemoryCatalog("default", "default"); catalogManager.registerCatalog("cat1", catalog); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true); @@ -1543,14 +1639,19 @@ public class SqlToOperationConverterTest { .column("a", DataTypes.STRING().notNull()) .column("b", DataTypes.BIGINT().notNull()) .column("c", DataTypes.BIGINT()); + Map<String, String> options = new HashMap<>(); + options.put("k", "v"); + if (!managedTable) { + options.put("connector", "dummy"); + } CatalogTable catalogTable = CatalogTable.of( hasConstraint ? builder.primaryKeyNamed("ct1", "a", "b").build() : builder.build(), "tb1", - Collections.emptyList(), - Collections.singletonMap("k", "v")); + hasPartition ? Arrays.asList("b", "c") : Collections.emptyList(), + Collections.unmodifiableMap(options)); catalogManager.setCurrentCatalog("cat1"); catalogManager.setCurrentDatabase("db1"); ObjectPath tablePath = new ObjectPath("db1", "tb1"); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 0ec319d..8ccac9c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -280,6 +280,43 @@ class TableEnvironmentTest { } @Test + def testAlterTableCompactOnNonManagedTable(): Unit = { + val statement = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) WITH ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + tableEnv.executeSql(statement) + + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage("ALTER TABLE COMPACT operation is not supported for " + + "non-managed table `default_catalog`.`default_database`.`MyTable`") + tableEnv.executeSql("alter table MyTable compact") + } + + @Test + def testAlterTableCompactOnManagedTable(): Unit = { + val statement = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) + """.stripMargin + tableEnv.executeSql(statement) + + assertEquals(ResultKind.SUCCESS, + tableEnv.executeSql("ALTER TABLE MyTable COMPACT").getResultKind) + } + + @Test def testExecuteSqlWithCreateAlterDropTable(): Unit = { val createTableStmt = """
