lsyldliu commented on code in PR #24777:
URL: https://github.com/apache/flink/pull/24777#discussion_r1599523701


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -270,14 +270,21 @@ public Set<String> listDatabases(String catalogName) {
     public Set<TableInfo> listTables(
             String catalogName, String databaseName, Set<TableKind> 
tableKinds) {
         checkArgument(
-                Arrays.asList(TableKind.TABLE, 
TableKind.VIEW).containsAll(tableKinds),
+                Arrays.asList(TableKind.TABLE, TableKind.VIEW, 
TableKind.MATERIALIZED_TABLE)

Review Comment:
   After some rethink, I think we'd better revert this logic, there's a bit of 
a problem with the implementation here. Since we haven't currently figured out 
the relationship between table and materialized table, it's better not to 
support this feature. We need a separate task to do this.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -414,10 +422,105 @@ protected static String getManuallyRefreshStatement(
         return insertStatement.toString();
     }
 
-    private static String stopJobWithSavepoint(
-            OperationExecutor executor,
+    private static ResultFetcher callDropMaterializedTableOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            DropMaterializedTableOperation dropMaterializedTableOperation) {
+        ObjectIdentifier tableIdentifier = 
dropMaterializedTableOperation.getTableIdentifier();
+        if (operationExecutor

Review Comment:
   I think the better implementation should be following code
   ```
           boolean tableExists = operationExecutor.tableExists(tableIdentifier);
           if (!tableExists) {
               if (dropMaterializedTableOperation.isIfExists()) {
                   LOG.info(
                           "Materialized table {} does not exist, skip the drop 
operation.",
                           tableIdentifier);
                   return ResultFetcher.fromTableResult(handle, 
TABLE_RESULT_OK, false);
               } else {
                   throw new ValidationException(
                           String.format(
                                   "Materialized table %s does not exist.",
                                   tableIdentifier));
               }
           }
   ```
   
   We can add a `tableExists` method for OperationExecutor:
   ```
       public boolean tableExists(ObjectIdentifier tableIdentifier) {
           Optional<Catalog> currentCatalog =
                   sessionContext
                           .getSessionState()
                           .catalogManager
                           .getCatalog(tableIdentifier.getCatalogName());
           if (currentCatalog.isPresent()) {
               return 
currentCatalog.get().tableExists(tableIdentifier.toObjectPath());
           }
           return false;
       }
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -414,10 +422,105 @@ protected static String getManuallyRefreshStatement(
         return insertStatement.toString();
     }
 
-    private static String stopJobWithSavepoint(
-            OperationExecutor executor,
+    private static ResultFetcher callDropMaterializedTableOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            DropMaterializedTableOperation dropMaterializedTableOperation) {
+        ObjectIdentifier tableIdentifier = 
dropMaterializedTableOperation.getTableIdentifier();
+        if (operationExecutor
+                .listTables(
+                        tableIdentifier.getCatalogName(),
+                        tableIdentifier.getDatabaseName(),
+                        
Collections.singleton(CatalogBaseTable.TableKind.TABLE))
+                .stream()
+                .noneMatch(t -> t.getIdentifier().equals(tableIdentifier))) {
+            if (dropMaterializedTableOperation.isIfExists()) {
+                LOG.info(
+                        "Materialized table {} is not exists, skip the drop 
operation.",
+                        tableIdentifier);
+                return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, 
false);
+            } else {
+                throw new ValidationException(
+                        String.format(
+                                "Materialized table with identifier %s does 
not exist.",
+                                tableIdentifier));
+            }
+        }
+
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+                == materializedTable.getRefreshStatus()) {
+            ContinuousRefreshHandler refreshHandler =
+                    deserializeContinuousHandler(
+                            materializedTable.getSerializedRefreshHandler(),
+                            
operationExecutor.getSessionContext().getUserClassloader());
+            // get job running status
+            JobStatus jobStatus = getJobStatus(operationExecutor, handle, 
refreshHandler);
+            if (!jobStatus.isTerminalState()) {
+                try {
+                    cancelJob(operationExecutor, handle, 
refreshHandler.getJobId());
+                } catch (Exception e) {
+                    jobStatus = getJobStatus(operationExecutor, handle, 
refreshHandler);
+                    if (!jobStatus.isTerminalState()) {
+                        throw new SqlExecutionException(
+                                String.format(
+                                        "Failed to drop the materialized table 
%s, because the continuous refresh job %s could not be canceled."

Review Comment:
   Why add `,` before because?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to