This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0303806fc18 [FLINK-31517][table] Move execution logic of
AddJarOperation out from TableEnvironmentImpl
0303806fc18 is described below
commit 0303806fc18deb3189c264f3f8eeade0e5a286c7
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 21 23:03:18 2023 +0100
[FLINK-31517][table] Move execution logic of AddJarOperation out from
TableEnvironmentImpl
---
.../table/api/internal/TableEnvironmentImpl.java | 48 ----------------------
.../table/operations/command/AddJarOperation.java | 24 ++++++++++-
2 files changed, 23 insertions(+), 49 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 644cf578776..3f5b2c5f61f 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -57,7 +57,6 @@ import
org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
@@ -90,14 +89,11 @@ import
org.apache.flink.table.operations.SourceQueryOperation;
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
import org.apache.flink.table.operations.ddl.CompilePlanOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.resource.ResourceManager;
-import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
@@ -446,19 +442,6 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
return
functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
}
- // TODO: Maybe we should expose addJar as tEnv's API later.
- private TableResultInternal addJar(AddJarOperation addJarOperation) {
- ResourceUri resourceUri = new ResourceUri(ResourceType.JAR,
addJarOperation.getPath());
- try {
-
resourceManager.registerJarResources(Collections.singletonList(resourceUri));
- return TableResultImpl.TABLE_RESULT_OK;
- } catch (IOException e) {
- throw new TableException(
- String.format("Could not register the specified resource
[%s].", resourceUri),
- e);
- }
- }
-
@Override
public void createTemporaryTable(String path, TableDescriptor descriptor) {
Preconditions.checkNotNull(path, "Path must not be null.");
@@ -942,8 +925,6 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
return executeInternal(Collections.singletonList((ModifyOperation)
operation));
} else if (operation instanceof StatementSetOperation) {
return executeInternal(((StatementSetOperation)
operation).getOperations());
- } else if (operation instanceof AddJarOperation) {
- return addJar((AddJarOperation) operation);
} else if (operation instanceof LoadModuleOperation) {
return loadModule((LoadModuleOperation) operation);
} else if (operation instanceof UnloadModuleOperation) {
@@ -1005,26 +986,6 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
}
}
- private TableResultInternal createCatalog(CreateCatalogOperation
operation) {
- String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
- try {
- String catalogName = operation.getCatalogName();
- Map<String, String> properties = operation.getProperties();
-
- Catalog catalog =
- FactoryUtil.createCatalog(
- catalogName,
- properties,
- tableConfig,
- resourceManager.getUserClassLoader());
- catalogManager.registerCatalog(catalogName, catalog);
-
- return TableResultImpl.TABLE_RESULT_OK;
- } catch (CatalogException e) {
- throw new ValidationException(exMsg, e);
- }
- }
-
private TableResultInternal loadModule(LoadModuleOperation operation) {
final String exMsg =
getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
@@ -1099,15 +1060,6 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
.collect(Collectors.toList());
}
- /** Get catalog from catalogName or throw a ValidationException if the
catalog not exists. */
- private Catalog getCatalogOrThrowException(String catalogName) {
- return getCatalog(catalogName)
- .orElseThrow(
- () ->
- new ValidationException(
- String.format("Catalog %s does not
exist", catalogName)));
- }
-
private String getDDLOpExecuteErrorMsg(String action) {
return String.format("Could not execute %s", action);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
index 00465279cb1..6bfb98719d9 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
@@ -18,10 +18,19 @@
package org.apache.flink.table.operations.command;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.ExecutableOperation;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
+
+import java.io.IOException;
+import java.util.Collections;
/** Operation to describe an ADD JAR statement. */
-public class AddJarOperation implements Operation {
+public class AddJarOperation implements Operation, ExecutableOperation {
private final String path;
@@ -37,4 +46,17 @@ public class AddJarOperation implements Operation {
public String asSummaryString() {
return String.format("ADD JAR '%s'", path);
}
+
+ @Override
+ public TableResultInternal execute(Context ctx) {
+ ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, getPath());
+ try {
+
ctx.getResourceManager().registerJarResources(Collections.singletonList(resourceUri));
+ return TableResultImpl.TABLE_RESULT_OK;
+ } catch (IOException e) {
+ throw new TableException(
+ String.format("Could not register the specified resource
[%s].", resourceUri),
+ e);
+ }
+ }
}