This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ExecutableOperation
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 768c09c5183c47692bd4af6968fd0381b7a0a006
Author: Jark Wu <[email protected]>
AuthorDate: Tue Mar 14 17:55:53 2023 +0800

    [FLINK-31450][table] Move execution logic of UseOperations out from 
TableEnvironmentImpl
    
    This closes #22175
---
 .../table/api/internal/TableEnvironmentImpl.java     | 13 -------------
 .../flink/table/operations/UseCatalogOperation.java  | 11 ++++++++++-
 .../flink/table/operations/UseDatabaseOperation.java | 10 ++++++++++
 .../flink/table/operations/UseModulesOperation.java  | 20 ++++++++++++++++++--
 .../apache/flink/table/operations/UseOperation.java  |  2 +-
 5 files changed, 39 insertions(+), 17 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 90c1cd24701..f5871f08dc3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -115,8 +115,6 @@ import 
org.apache.flink.table.operations.SourceQueryOperation;
 import org.apache.flink.table.operations.StatementSetOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.UseModulesOperation;
 import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.operations.command.ExecutePlanOperation;
@@ -1233,17 +1231,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
             return loadModule((LoadModuleOperation) operation);
         } else if (operation instanceof UnloadModuleOperation) {
             return unloadModule((UnloadModuleOperation) operation);
-        } else if (operation instanceof UseModulesOperation) {
-            return useModules((UseModulesOperation) operation);
-        } else if (operation instanceof UseCatalogOperation) {
-            UseCatalogOperation useCatalogOperation = (UseCatalogOperation) 
operation;
-            
catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
-            return TableResultImpl.TABLE_RESULT_OK;
-        } else if (operation instanceof UseDatabaseOperation) {
-            UseDatabaseOperation useDatabaseOperation = (UseDatabaseOperation) 
operation;
-            
catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName());
-            
catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName());
-            return TableResultImpl.TABLE_RESULT_OK;
         } else if (operation instanceof ShowCatalogsOperation) {
             return buildShowResult("catalog name", listCatalogs());
         } else if (operation instanceof ShowCreateTableOperation) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java
index c442fc5daba..c77e589ce65 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+
 /** Operation to describe a USE CATALOG statement. */
 public class UseCatalogOperation implements UseOperation {
 
-    private String catalogName;
+    private final String catalogName;
 
     public UseCatalogOperation(String catalogName) {
         this.catalogName = catalogName;
@@ -35,4 +38,10 @@ public class UseCatalogOperation implements UseOperation {
     public String asSummaryString() {
         return String.format("USE CATALOG %s", catalogName);
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ctx.getCatalogManager().setCurrentCatalog(catalogName);
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java
index f0b96424062..190e32d8160 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+
 /** Operation to describe a USE [catalogName.]dataBaseName statement. */
 public class UseDatabaseOperation implements UseOperation {
 
@@ -41,4 +44,11 @@ public class UseDatabaseOperation implements UseOperation {
     public String asSummaryString() {
         return String.format("USE %s.%s", catalogName, databaseName);
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ctx.getCatalogManager().setCurrentCatalog(catalogName);
+        ctx.getCatalogManager().setCurrentDatabase(databaseName);
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseModulesOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseModulesOperation.java
index decdd770d66..6f540584430 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseModulesOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseModulesOperation.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+
 import java.util.Collections;
 import java.util.List;
 
@@ -26,15 +30,27 @@ public class UseModulesOperation implements UseOperation {
     private final List<String> moduleNames;
 
     public UseModulesOperation(List<String> moduleNames) {
-        this.moduleNames = moduleNames;
+        this.moduleNames = Collections.unmodifiableList(moduleNames);
     }
 
     public List<String> getModuleNames() {
-        return Collections.unmodifiableList(moduleNames);
+        return moduleNames;
     }
 
     @Override
     public String asSummaryString() {
         return String.format("USE MODULES: %s", moduleNames);
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        try {
+            ctx.getModuleManager().useModules(moduleNames.toArray(new 
String[0]));
+            return TableResultImpl.TABLE_RESULT_OK;
+        } catch (ValidationException e) {
+            throw new ValidationException(
+                    String.format("Could not execute %s. %s", 
asSummaryString(), e.getMessage()),
+                    e);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java
index efc6a899fdd..e99b2a51ed9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java
@@ -29,4 +29,4 @@ import org.apache.flink.annotation.Internal;
  * switching current database.
  */
 @Internal
-public interface UseOperation extends Operation {}
+public interface UseOperation extends Operation, ExecutableOperation {}

Reply via email to