lsyldliu commented on code in PR #24777: URL: https://github.com/apache/flink/pull/24777#discussion_r1598351419
########## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropMaterializedTable.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlDrop; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +/** DROP MATERIALIZED TABLE DDL sql call. */ +public class SqlDropMaterializedTable extends SqlDrop { + + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("DROP MATERIALIZED TABLE", SqlKind.DROP_TABLE); + + private final SqlIdentifier tableIdentifier; + + public SqlDropMaterializedTable( + SqlParserPos pos, SqlIdentifier tableIdentifier, boolean ifExists) { + super(OPERATOR, pos, ifExists); + this.tableIdentifier = tableIdentifier; + } + + public String[] fullTableName() { + return tableIdentifier.names.toArray(new String[0]); + } + + public boolean getIfExists() { + return this.ifExists; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(tableIdentifier); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DROP"); Review Comment: ```suggestion writer.keyword("DROP MATERIALIZED TABLE"); ``` ########## flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl: ########## @@ -1801,6 +1801,23 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar } } +SqlDropMaterializedTable SqlDropMaterializedTable(Span s, boolean replace) : Review Comment: We can return `SqlDrop` directly. ########## flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl: ########## @@ -2427,6 +2444,8 @@ SqlDrop SqlDropExtended(Span s, boolean replace) : ( drop = SqlDropCatalog(s, replace) | + drop = SqlDropMaterializedTable(s, replace) Review Comment: We should throw an exception if the user specifies the TEMPORARY keyword, similar to the `CREATE MATERIALIZED TABLE` process logic. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; + +/** A converter for {@link SqlDropMaterializedTable}. */ +public class SqlDropMaterializedTableConverter + implements SqlNodeConverter<SqlDropMaterializedTable> { + @Override + public Operation convertSqlNode( + SqlDropMaterializedTable sqlDropMaterializedTable, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlDropMaterializedTable.fullTableName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + // Currently we don't support temporary materialized table, so isTemporary is always false Review Comment: We should remove the parameter `isTemporary` for DropMaterializedTableOperation constructor. This is a result of my negligence.  ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java: ########## @@ -312,4 +313,17 @@ void testAlterMaterializedTableResume() { assertThat(operation2.asSummaryString()) .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])"); } + + @Test + void testDropMaterializedTable() { + final String sql = "DROP MATERIALIZED TABLE mtbl1"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(DropMaterializedTableOperation.class); + assertThat(((DropMaterializedTableOperation) operation).isIfExists()).isFalse(); + + final String sql2 = "DROP MATERIALIZED TABLE IF EXISTS mtbl1"; + Operation operation2 = parse(sql2); + assertThat(operation2).isInstanceOf(DropMaterializedTableOperation.class); + assertThat(((DropMaterializedTableOperation) operation2).isIfExists()).isTrue(); Review Comment: assert the operation `asSummaryString`? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -414,6 +422,78 @@ protected static String getManuallyRefreshStatement( return insertStatement.toString(); } + private static ResultFetcher callDropMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + DropMaterializedTableOperation dropMaterializedTableOperation) { + ObjectIdentifier tableIdentifier = dropMaterializedTableOperation.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == materializedTable.getRefreshStatus()) { + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler( + materializedTable.getSerializedRefreshHandler(), + operationExecutor.getSessionContext().getUserClassloader()); + // get job running status + JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + if (!jobStatus.isTerminalState()) { + try { + cancelJob(operationExecutor, handle, refreshHandler); + } catch (Exception e) { + jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + // throw exception if job is not terminal state + if (!jobStatus.isTerminalState()) { + throw new SqlExecutionException( + String.format( Review Comment: ``` String.format( "Failed to drop the materialized table %s because the continuous refresh job %s could not be canceled." + " The current status of the continuous refresh job is %s.", tableIdentifier, jobStatus) ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -414,6 +422,78 @@ protected static String getManuallyRefreshStatement( return insertStatement.toString(); } + private static ResultFetcher callDropMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + DropMaterializedTableOperation dropMaterializedTableOperation) { + ObjectIdentifier tableIdentifier = dropMaterializedTableOperation.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED Review Comment: We should also consider the `INTIALIZING` status, the refresh may be submitted, we also need to handle it. For the `INTITIALIZING` status, due to we don't known the jobId, so one simple solution is throw exception directly, we should wait the is submitted only then can we drop the materialized table. WDYT? ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -720,6 +768,88 @@ void testAlterMaterializedTableWithoutSavepointDirConfigured( "Savepoint directory is not configured, can't stop job with savepoint."); } + @Test + void testDropMaterializedTable(@InjectClusterClient RestClusterClient<?> restClusterClient) Review Comment: Also test the statement `DROP MATERIALIZED TABLE IF EXISTS xxx` if materialized table actually not exists. If the table not exist, we should do nothing. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -414,6 +422,78 @@ protected static String getManuallyRefreshStatement( return insertStatement.toString(); } + private static ResultFetcher callDropMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + DropMaterializedTableOperation dropMaterializedTableOperation) { + ObjectIdentifier tableIdentifier = dropMaterializedTableOperation.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); Review Comment: We should consider if the table doesn't exist, but the user using `DROP MATERIALIZED TABLE IF EXISTS xxx`, nothing should happen for this case. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -167,28 +176,42 @@ void before() throws Exception { fileSystemCatalogPath = fileCatalogPath.toString(); fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr; + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + sessionHandle = initializeSession(); } @AfterEach - void after(@InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception { - // cancel all running jobs for releasing mini cluster resources - for (JobStatusMessage j : restClusterClient.listJobs().get()) { - // cancel all continuous refresh jobs - if (j.getJobName().endsWith("continuous_refresh_job") - && (j.getJobState() == JobStatus.RUNNING - || j.getJobState() == JobStatus.CREATED)) { - restClusterClient.cancel(j.getJobId()).get(); + void after() throws Exception { + Set<TableInfo> tableInfos = + service.listTables( + sessionHandle, + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + Collections.singleton(CatalogBaseTable.TableKind.TABLE)); + + // drop all materialized tables + for (TableInfo tableInfo : tableInfos) { + // get materialized table + ResolvedCatalogBaseTable<?> resolvedTable = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + if (resolvedTable instanceof ResolvedCatalogMaterializedTable) { + // drop materialized table + String dropTableDDL = + String.format( + "DROP MATERIALIZED TABLE %s", Review Comment: It would be better add `IF EXISTS` keyword. ########## flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java: ########## @@ -384,6 +384,25 @@ void testAlterMaterializedTableReset() { + " "); } + @Test + void testDropMaterializedTable() { + final String sql = "DROP MATERIALIZED TABLE tbl1"; + final String expected = "DROP MATERIALIZED TABLE `TBL1`"; + sql(sql).ok(expected); + + final String sql2 = "DROP MATERIALIZED TABLE IF EXISTS tbl1"; + sql(sql2).ok("DROP MATERIALIZED TABLE IF EXISTS `TBL1`"); + + final String sql3 = "DROP MATERIALIZED TABLE tb1 ^IF^ EXISTS"; Review Comment: Please also add a test case for drop temporary materialized table ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -414,6 +422,78 @@ protected static String getManuallyRefreshStatement( return insertStatement.toString(); } + private static ResultFetcher callDropMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + DropMaterializedTableOperation dropMaterializedTableOperation) { + ObjectIdentifier tableIdentifier = dropMaterializedTableOperation.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == materializedTable.getRefreshStatus()) { + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler( + materializedTable.getSerializedRefreshHandler(), + operationExecutor.getSessionContext().getUserClassloader()); + // get job running status + JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + if (!jobStatus.isTerminalState()) { + try { + cancelJob(operationExecutor, handle, refreshHandler); + } catch (Exception e) { + jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + // throw exception if job is not terminal state + if (!jobStatus.isTerminalState()) { + throw new SqlExecutionException( + String.format( + "Drop materialized table failed, current job status is %s.", + jobStatus), + e); + } else { + LOG.warn( + "Cancel continuous refresh job {} of materialized table {} occur exception, but job is terminal state, skip the cancel operation.", Review Comment: ``` "An exception occurred while canceling the continuous refresh job {} for materialized table {}," + " but since the job is in a terminal state, skip the cancel operation." ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -414,6 +422,78 @@ protected static String getManuallyRefreshStatement( return insertStatement.toString(); } + private static ResultFetcher callDropMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + DropMaterializedTableOperation dropMaterializedTableOperation) { + ObjectIdentifier tableIdentifier = dropMaterializedTableOperation.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == materializedTable.getRefreshStatus()) { + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler( + materializedTable.getSerializedRefreshHandler(), + operationExecutor.getSessionContext().getUserClassloader()); + // get job running status + JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + if (!jobStatus.isTerminalState()) { + try { + cancelJob(operationExecutor, handle, refreshHandler); + } catch (Exception e) { + jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + // throw exception if job is not terminal state + if (!jobStatus.isTerminalState()) { + throw new SqlExecutionException( + String.format( + "Drop materialized table failed, current job status is %s.", + jobStatus), + e); + } else { + LOG.warn( + "Cancel continuous refresh job {} of materialized table {} occur exception, but job is terminal state, skip the cancel operation.", + refreshHandler.getJobId(), + tableIdentifier); + } + } + } else { + LOG.info( + "The continuous refresh job {} of materialized table {} is not running, no need to cancel it.", Review Comment: ``` "No need to cancel the continuous refresh job {} for materialized table {} as it is not currently running." ``` ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -167,28 +169,42 @@ void before() throws Exception { fileSystemCatalogPath = fileCatalogPath.toString(); fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr; + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + sessionHandle = initializeSession(); } @AfterEach - void after(@InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception { - // cancel all running jobs for releasing mini cluster resources - for (JobStatusMessage j : restClusterClient.listJobs().get()) { - // cancel all continuous refresh jobs - if (j.getJobName().endsWith("continuous_refresh_job") - && (j.getJobState() == JobStatus.RUNNING - || j.getJobState() == JobStatus.CREATED)) { - restClusterClient.cancel(j.getJobId()).get(); + void after() throws Exception { + Set<TableInfo> tableInfos = + service.listTables( + sessionHandle, + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + Collections.singleton(CatalogBaseTable.TableKind.TABLE)); Review Comment: I think the correct table kind should be `MATERIALIZED TABLE`, we should extend the function of `listTables` method, it should support return materialized table. and the `listTables` method should return the correct table kind, table is table, materialized table is materialized table, these are two things. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -414,6 +422,78 @@ protected static String getManuallyRefreshStatement( return insertStatement.toString(); } + private static ResultFetcher callDropMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + DropMaterializedTableOperation dropMaterializedTableOperation) { + ObjectIdentifier tableIdentifier = dropMaterializedTableOperation.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == materializedTable.getRefreshStatus()) { + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler( + materializedTable.getSerializedRefreshHandler(), + operationExecutor.getSessionContext().getUserClassloader()); + // get job running status + JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + if (!jobStatus.isTerminalState()) { + try { + cancelJob(operationExecutor, handle, refreshHandler); + } catch (Exception e) { + jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + // throw exception if job is not terminal state + if (!jobStatus.isTerminalState()) { + throw new SqlExecutionException( + String.format( + "Drop materialized table failed, current job status is %s.", + jobStatus), + e); + } else { + LOG.warn( + "Cancel continuous refresh job {} of materialized table {} occur exception, but job is terminal state, skip the cancel operation.", + refreshHandler.getJobId(), + tableIdentifier); + } + } + } else { + LOG.info( + "The continuous refresh job {} of materialized table {} is not running, no need to cancel it.", + refreshHandler.getJobId(), + tableIdentifier); + } + } + + operationExecutor.callExecutableOperation(handle, dropMaterializedTableOperation); + + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } + + private static JobStatus getJobStatus( + OperationExecutor operationExecutor, + OperationHandle handle, + ContinuousRefreshHandler refreshHandler) { + ResultFetcher resultFetcher = + operationExecutor.callDescribeJobOperation( + operationExecutor.getTableEnvironment(), + handle, + new DescribeJobOperation(refreshHandler.getJobId())); + List<RowData> result = fetchAllResults(resultFetcher); + String jobStatus = result.get(0).getString(2).toString(); + return JobStatus.valueOf(jobStatus); + } + + private static void cancelJob( + OperationExecutor operationExecutor, + OperationHandle handle, + ContinuousRefreshHandler refreshHandler) { Review Comment: Replace this with `String jobId` would be better? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
