This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ExecutableOperation in repository https://gitbox.apache.org/repos/asf/flink.git
commit 768c09c5183c47692bd4af6968fd0381b7a0a006 Author: Jark Wu <[email protected]> AuthorDate: Tue Mar 14 17:55:53 2023 +0800 [FLINK-31450][table] Move execution logic of UseOperations out from TableEnvironmentImpl This closes #22175 --- .../table/api/internal/TableEnvironmentImpl.java | 13 ------------- .../flink/table/operations/UseCatalogOperation.java | 11 ++++++++++- .../flink/table/operations/UseDatabaseOperation.java | 10 ++++++++++ .../flink/table/operations/UseModulesOperation.java | 20 ++++++++++++++++++-- .../apache/flink/table/operations/UseOperation.java | 2 +- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 90c1cd24701..f5871f08dc3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -115,8 +115,6 @@ import org.apache.flink.table.operations.SourceQueryOperation; import org.apache.flink.table.operations.StatementSetOperation; import org.apache.flink.table.operations.TableSourceQueryOperation; import org.apache.flink.table.operations.UnloadModuleOperation; -import org.apache.flink.table.operations.UseCatalogOperation; -import org.apache.flink.table.operations.UseDatabaseOperation; import org.apache.flink.table.operations.UseModulesOperation; import org.apache.flink.table.operations.command.AddJarOperation; import org.apache.flink.table.operations.command.ExecutePlanOperation; @@ -1233,17 +1231,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { return loadModule((LoadModuleOperation) operation); } else if (operation instanceof UnloadModuleOperation) { return unloadModule((UnloadModuleOperation) operation); - } else if (operation instanceof UseModulesOperation) { - return useModules((UseModulesOperation) operation); - } else if (operation instanceof UseCatalogOperation) { - UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation; - catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName()); - return TableResultImpl.TABLE_RESULT_OK; - } else if (operation instanceof UseDatabaseOperation) { - UseDatabaseOperation useDatabaseOperation = (UseDatabaseOperation) operation; - catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName()); - catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName()); - return TableResultImpl.TABLE_RESULT_OK; } else if (operation instanceof ShowCatalogsOperation) { return buildShowResult("catalog name", listCatalogs()); } else if (operation instanceof ShowCreateTableOperation) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java index c442fc5daba..c77e589ce65 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java @@ -18,10 +18,13 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; + /** Operation to describe a USE CATALOG statement. */ public class UseCatalogOperation implements UseOperation { - private String catalogName; + private final String catalogName; public UseCatalogOperation(String catalogName) { this.catalogName = catalogName; @@ -35,4 +38,10 @@ public class UseCatalogOperation implements UseOperation { public String asSummaryString() { return String.format("USE CATALOG %s", catalogName); } + + @Override + public TableResultInternal execute(Context ctx) { + ctx.getCatalogManager().setCurrentCatalog(catalogName); + return TableResultImpl.TABLE_RESULT_OK; + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java index f0b96424062..190e32d8160 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java @@ -18,6 +18,9 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; + /** Operation to describe a USE [catalogName.]dataBaseName statement. */ public class UseDatabaseOperation implements UseOperation { @@ -41,4 +44,11 @@ public class UseDatabaseOperation implements UseOperation { public String asSummaryString() { return String.format("USE %s.%s", catalogName, databaseName); } + + @Override + public TableResultInternal execute(Context ctx) { + ctx.getCatalogManager().setCurrentCatalog(catalogName); + ctx.getCatalogManager().setCurrentDatabase(databaseName); + return TableResultImpl.TABLE_RESULT_OK; + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseModulesOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseModulesOperation.java index decdd770d66..6f540584430 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseModulesOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseModulesOperation.java @@ -18,6 +18,10 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; + import java.util.Collections; import java.util.List; @@ -26,15 +30,27 @@ public class UseModulesOperation implements UseOperation { private final List<String> moduleNames; public UseModulesOperation(List<String> moduleNames) { - this.moduleNames = moduleNames; + this.moduleNames = Collections.unmodifiableList(moduleNames); } public List<String> getModuleNames() { - return Collections.unmodifiableList(moduleNames); + return moduleNames; } @Override public String asSummaryString() { return String.format("USE MODULES: %s", moduleNames); } + + @Override + public TableResultInternal execute(Context ctx) { + try { + ctx.getModuleManager().useModules(moduleNames.toArray(new String[0])); + return TableResultImpl.TABLE_RESULT_OK; + } catch (ValidationException e) { + throw new ValidationException( + String.format("Could not execute %s. %s", asSummaryString(), e.getMessage()), + e); + } + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java index efc6a899fdd..e99b2a51ed9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java @@ -29,4 +29,4 @@ import org.apache.flink.annotation.Internal; * switching current database. */ @Internal -public interface UseOperation extends Operation {} +public interface UseOperation extends Operation, ExecutableOperation {}
