This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0303806fc18 [FLINK-31517][table] Move execution logic of 
AddJarOperation out from TableEnvironmentImpl
0303806fc18 is described below

commit 0303806fc18deb3189c264f3f8eeade0e5a286c7
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 21 23:03:18 2023 +0100

    [FLINK-31517][table] Move execution logic of AddJarOperation out from 
TableEnvironmentImpl
---
 .../table/api/internal/TableEnvironmentImpl.java   | 48 ----------------------
 .../table/operations/command/AddJarOperation.java  | 24 ++++++++++-
 2 files changed, 23 insertions(+), 49 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 644cf578776..3f5b2c5f61f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -57,7 +57,6 @@ import 
org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.ExtendedOperationExecutor;
@@ -90,14 +89,11 @@ import 
org.apache.flink.table.operations.SourceQueryOperation;
 import org.apache.flink.table.operations.StatementSetOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.operations.command.ExecutePlanOperation;
 import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
 import org.apache.flink.table.operations.ddl.CompilePlanOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
 import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 import org.apache.flink.table.resource.ResourceManager;
-import org.apache.flink.table.resource.ResourceType;
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
@@ -446,19 +442,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         return 
functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
-    // TODO: Maybe we should expose addJar as tEnv's API later.
-    private TableResultInternal addJar(AddJarOperation addJarOperation) {
-        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
addJarOperation.getPath());
-        try {
-            
resourceManager.registerJarResources(Collections.singletonList(resourceUri));
-            return TableResultImpl.TABLE_RESULT_OK;
-        } catch (IOException e) {
-            throw new TableException(
-                    String.format("Could not register the specified resource 
[%s].", resourceUri),
-                    e);
-        }
-    }
-
     @Override
     public void createTemporaryTable(String path, TableDescriptor descriptor) {
         Preconditions.checkNotNull(path, "Path must not be null.");
@@ -942,8 +925,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
             return executeInternal(Collections.singletonList((ModifyOperation) 
operation));
         } else if (operation instanceof StatementSetOperation) {
             return executeInternal(((StatementSetOperation) 
operation).getOperations());
-        } else if (operation instanceof AddJarOperation) {
-            return addJar((AddJarOperation) operation);
         } else if (operation instanceof LoadModuleOperation) {
             return loadModule((LoadModuleOperation) operation);
         } else if (operation instanceof UnloadModuleOperation) {
@@ -1005,26 +986,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         }
     }
 
-    private TableResultInternal createCatalog(CreateCatalogOperation 
operation) {
-        String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
-        try {
-            String catalogName = operation.getCatalogName();
-            Map<String, String> properties = operation.getProperties();
-
-            Catalog catalog =
-                    FactoryUtil.createCatalog(
-                            catalogName,
-                            properties,
-                            tableConfig,
-                            resourceManager.getUserClassLoader());
-            catalogManager.registerCatalog(catalogName, catalog);
-
-            return TableResultImpl.TABLE_RESULT_OK;
-        } catch (CatalogException e) {
-            throw new ValidationException(exMsg, e);
-        }
-    }
-
     private TableResultInternal loadModule(LoadModuleOperation operation) {
         final String exMsg = 
getDDLOpExecuteErrorMsg(operation.asSummaryString());
         try {
@@ -1099,15 +1060,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                 .collect(Collectors.toList());
     }
 
-    /** Get catalog from catalogName or throw a ValidationException if the 
catalog not exists. */
-    private Catalog getCatalogOrThrowException(String catalogName) {
-        return getCatalog(catalogName)
-                .orElseThrow(
-                        () ->
-                                new ValidationException(
-                                        String.format("Catalog %s does not 
exist", catalogName)));
-    }
-
     private String getDDLOpExecuteErrorMsg(String action) {
         return String.format("Could not execute %s", action);
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
index 00465279cb1..6bfb98719d9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
@@ -18,10 +18,19 @@
 
 package org.apache.flink.table.operations.command;
 
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.ExecutableOperation;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
+
+import java.io.IOException;
+import java.util.Collections;
 
 /** Operation to describe an ADD JAR statement. */
-public class AddJarOperation implements Operation {
+public class AddJarOperation implements Operation, ExecutableOperation {
 
     private final String path;
 
@@ -37,4 +46,17 @@ public class AddJarOperation implements Operation {
     public String asSummaryString() {
         return String.format("ADD JAR '%s'", path);
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, getPath());
+        try {
+            
ctx.getResourceManager().registerJarResources(Collections.singletonList(resourceUri));
+            return TableResultImpl.TABLE_RESULT_OK;
+        } catch (IOException e) {
+            throw new TableException(
+                    String.format("Could not register the specified resource 
[%s].", resourceUri),
+                    e);
+        }
+    }
 }

Reply via email to