This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c2e14ff411e [FLINK-32512][table] Don't register resource to user
resource manager when creating temporary function
c2e14ff411e is described below
commit c2e14ff411e806f9ccf176c85eb8249b8ff12e56
Author: Shammon FY <[email protected]>
AuthorDate: Tue Jul 4 19:00:06 2023 +0800
[FLINK-32512][table] Don't register resource to user resource manager when
creating temporary function
---
.../apache/flink/util/ChildFirstClassLoader.java | 6 +
.../flink/util/FlinkUserCodeClassLoader.java | 2 +-
.../flink/util/FlinkUserCodeClassLoaders.java | 11 ++
.../apache/flink/util/MutableURLClassLoader.java | 8 ++
.../flink/util/FlinkUserCodeClassLoaderTest.java | 5 +
.../HadoopPathBasedBulkFormatBuilderTest.java | 7 +
.../src/test/resources/sql/function.q | 8 ++
.../flink/table/catalog/FunctionCatalog.java | 51 +++++--
.../flink/table/resource/ResourceManager.java | 149 ++++++++++++++++++---
.../flink/table/resource/ResourceManagerTest.java | 116 ++++++++++++++++
.../planner/runtime/stream/sql/FunctionITCase.java | 7 +-
11 files changed, 341 insertions(+), 29 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
index 727e7731815..8f10330ee6e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
@@ -127,4 +127,10 @@ public final class ChildFirstClassLoader extends
FlinkUserCodeClassLoader {
static {
ClassLoader.registerAsParallelCapable();
}
+
+ @Override
+ public MutableURLClassLoader copy() {
+ return new ChildFirstClassLoader(
+ getURLs(), getParent(), alwaysParentFirstPatterns,
classLoadingExceptionHandler);
+ }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
index 66c826cea6e..cd8eb01016d 100644
---
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
+++
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
@@ -32,7 +32,7 @@ public abstract class FlinkUserCodeClassLoader extends
MutableURLClassLoader {
ClassLoader.registerAsParallelCapable();
}
- private final Consumer<Throwable> classLoadingExceptionHandler;
+ protected final Consumer<Throwable> classLoadingExceptionHandler;
protected FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) {
this(urls, parent, NOOP_EXCEPTION_HANDLER);
diff --git
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
index 791b707a7ad..08f31518efa 100644
---
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
+++
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
@@ -139,6 +139,11 @@ public class FlinkUserCodeClassLoaders {
static {
ClassLoader.registerAsParallelCapable();
}
+
+ @Override
+ public MutableURLClassLoader copy() {
+ return new ParentFirstClassLoader(getURLs(), getParent(),
classLoadingExceptionHandler);
+ }
}
/**
@@ -203,6 +208,12 @@ public class FlinkUserCodeClassLoaders {
ensureInner().addURL(url);
}
+ @Override
+ public MutableURLClassLoader copy() {
+ return new SafetyNetWrapperClassLoader(
+ (FlinkUserCodeClassLoader) inner.copy(), getParent());
+ }
+
@Override
public URL getResource(String name) {
return ensureInner().getResource(name);
diff --git
a/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java
b/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java
index 42bf55f131f..62ad58f7315 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java
@@ -39,4 +39,12 @@ public abstract class MutableURLClassLoader extends
URLClassLoader {
public void addURL(URL url) {
super.addURL(url);
}
+
+ /**
+ * Copy the classloader for each job and these jobs can add their jar
files to the classloader
+ * independently.
+ *
+ * @return the copied classloader
+ */
+ public abstract MutableURLClassLoader copy();
}
diff --git
a/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java
b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java
index 401945c110a..6c89f6d9703 100644
---
a/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java
+++
b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java
@@ -58,5 +58,10 @@ public class FlinkUserCodeClassLoaderTest extends TestLogger
{
protected Class<?> loadClassWithoutExceptionHandling(String name,
boolean resolve) {
throw expectedException;
}
+
+ @Override
+ public MutableURLClassLoader copy() {
+ return new ThrowingURLClassLoader(classLoadingExceptionHandler,
expectedException);
+ }
}
}
diff --git
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
index 161a37280e5..3dffbbe9f9c 100644
---
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
+++
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
@@ -22,6 +22,7 @@ import
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
import
org.apache.flink.formats.hadoop.bulk.TestHadoopPathBasedBulkWriterFactory;
import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.MutableURLClassLoader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -97,5 +98,11 @@ public class HadoopPathBasedBulkFormatBuilderTest {
return super.loadClassWithoutExceptionHandling(name, resolve);
}
}
+
+ @Override
+ public MutableURLClassLoader copy() {
+ return new SpecifiedChildFirstUserClassLoader(
+ specifiedClassName, getParent(), getURLs());
+ }
}
}
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q
b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index c554f6e6352..320fbefc898 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -294,6 +294,14 @@ SHOW JARS;
Empty set
!ok
+create temporary function temp_upperudf AS 'UpperUDF' using jar
'$VAR_UDF_JAR_PATH';
+[INFO] Execute statement succeed.
+!info
+
+SHOW JARS;
+Empty set
+!ok
+
create function upperudf AS 'UpperUDF' using jar '$VAR_UDF_JAR_PATH';
[INFO] Execute statement succeed.
!info
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 5ba7f831102..8eb8f82351f 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -47,8 +47,13 @@ import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URLClassLoader;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -135,10 +140,17 @@ public final class FunctionCatalog {
"Could not drop temporary system function. A
function named '%s' doesn't exist.",
name));
}
+ unregisterFunctionJarResources(function);
return function != null;
}
+ private void unregisterFunctionJarResources(@Nullable CatalogFunction
function) {
+ if (function != null && function.getFunctionLanguage() ==
FunctionLanguage.JAVA) {
+
resourceManager.unregisterFunctionResources(function.getFunctionResources());
+ }
+ }
+
/** Registers a temporary catalog function. */
public void registerTemporaryCatalogFunction(
UnresolvedIdentifier unresolvedIdentifier,
@@ -523,6 +535,7 @@ public final class FunctionCatalog {
.getTemporaryOperationListener(normalizedName)
.ifPresent(l ->
l.onDropTemporaryFunction(normalizedName.toObjectPath()));
tempCatalogFunctions.remove(normalizedName);
+ unregisterFunctionJarResources(fd);
} else if (!ignoreIfNotExist) {
throw new ValidationException(
String.format("Temporary catalog function %s doesn't
exist", identifier));
@@ -614,6 +627,8 @@ public final class FunctionCatalog {
CatalogFunction potentialResult =
tempCatalogFunctions.get(normalizedIdentifier);
if (potentialResult != null) {
+ registerFunctionJarResources(
+ oi.asSummaryString(),
potentialResult.getFunctionResources());
return Optional.of(
ContextResolvedFunction.temporary(
FunctionIdentifier.of(oi),
@@ -664,11 +679,12 @@ public final class FunctionCatalog {
String normalizedName = FunctionIdentifier.normalizeName(funcName);
if (tempSystemFunctions.containsKey(normalizedName)) {
+ CatalogFunction function = tempSystemFunctions.get(normalizedName);
+ registerFunctionJarResources(funcName,
function.getFunctionResources());
return Optional.of(
ContextResolvedFunction.temporary(
FunctionIdentifier.of(funcName),
- getFunctionDefinition(
- normalizedName,
tempSystemFunctions.get(normalizedName))));
+ getFunctionDefinition(normalizedName, function)));
}
Optional<FunctionDefinition> candidate =
@@ -685,7 +701,7 @@ public final class FunctionCatalog {
@SuppressWarnings("unchecked")
private void validateAndPrepareFunction(String name, CatalogFunction
function)
- throws ClassNotFoundException {
+ throws ClassNotFoundException, IOException {
// If the input is instance of UserDefinedFunction, it means it uses
the new type inference.
// In this situation the UDF have not been validated and cleaned, so
we need to validate it
// and clean its closure here.
@@ -701,13 +717,30 @@ public final class FunctionCatalog {
} else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
// If the jar resource of UDF used is not empty, register it to
classloader before
// validate.
- registerFunctionJarResources(name,
function.getFunctionResources());
+ List<ResourceUri> resourceUris = function.getFunctionResources();
+ try {
+ if (!resourceUris.isEmpty()) {
+ resourceManager.declareFunctionResources(
+ new HashSet<>(function.getFunctionResources()));
+ }
+ } catch (Exception e) {
+ throw new TableException(
+ String.format(
+ "Failed to register function jar resource '%s'
of function '%s'.",
+ resourceUris, name),
+ e);
+ }
- UserDefinedFunctionHelper.validateClass(
- (Class<? extends UserDefinedFunction>)
- resourceManager
- .getUserClassLoader()
- .loadClass(function.getClassName()));
+ URLClassLoader classLoader =
resourceManager.createUserClassLoader(resourceUris);
+ try {
+ UserDefinedFunctionHelper.validateClass(
+ (Class<? extends UserDefinedFunction>)
+
classLoader.loadClass(function.getClassName()));
+ } finally {
+ if (!resourceUris.isEmpty()) {
+ classLoader.close();
+ }
+ }
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
index 2cde34e11f6..5de2b5d6ef4 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
@@ -45,8 +45,10 @@ import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -55,6 +57,9 @@ import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** A manager for dealing with all user defined resource. */
@Internal
public class ResourceManager implements Closeable {
@@ -65,6 +70,9 @@ public class ResourceManager implements Closeable {
private static final String FILE_SCHEME = "file";
private final Path localResourceDir;
+ /** Resource infos for functions. */
+ private final Map<ResourceUri, ResourceCounter> functionResourceInfos;
+
protected final Map<ResourceUri, URL> resourceInfos;
protected final MutableURLClassLoader userClassLoader;
@@ -80,6 +88,7 @@ public class ResourceManager implements Closeable {
new Path(
config.get(TableConfigOptions.RESOURCES_DOWNLOAD_DIR),
String.format("flink-table-%s", UUID.randomUUID()));
+ this.functionResourceInfos = new HashMap<>();
this.resourceInfos = new HashMap<>();
this.userClassLoader = userClassLoader;
}
@@ -103,7 +112,8 @@ public class ResourceManager implements Closeable {
String.format("Failed to register jar
resource [%s]", url),
e);
}
- }),
+ },
+ false),
true);
}
@@ -124,15 +134,69 @@ public class ResourceManager implements Closeable {
Collections.singletonList(resourceUri),
ResourceType.FILE,
false,
- url -> {});
+ url -> {},
+ false);
registerResources(stagingResources, false);
return resourceInfos.get(new
ArrayList<>(stagingResources.keySet()).get(0)).getPath();
}
+ /**
+ * Declare a resource for function and add it to the function resource
infos. If the file is
+ * remote, it will be copied to a local file. The declared resource will
not be added to
+ * resources and classloader if it is not used in the job.
+ *
+ * @param resourceUris the resource uri for function.
+ */
+ public void declareFunctionResources(Set<ResourceUri> resourceUris) throws
IOException {
+ prepareStagingResources(
+ resourceUris,
+ ResourceType.JAR,
+ true,
+ url -> {
+ try {
+ JarUtils.checkJarFile(url);
+ } catch (IOException e) {
+ throw new ValidationException(
+ String.format("Failed to register jar resource
[%s]", url), e);
+ }
+ },
+ true);
+ }
+
+ /**
+ * Unregister the resource uri in function resources, when the reference
count of the resource
+ * is 0, the resource will be removed from the function resources.
+ *
+ * @param resourceUris the uris to unregister in function resources.
+ */
+ public void unregisterFunctionResources(List<ResourceUri> resourceUris) {
+ if (!resourceUris.isEmpty()) {
+ resourceUris.forEach(
+ uri -> {
+ ResourceCounter counter =
functionResourceInfos.get(uri);
+ if (counter != null && counter.decreaseCounter()) {
+ functionResourceInfos.remove(uri);
+ }
+ });
+ }
+ }
+
public URLClassLoader getUserClassLoader() {
return userClassLoader;
}
+ public URLClassLoader createUserClassLoader(List<ResourceUri>
resourceUris) {
+ if (resourceUris.isEmpty()) {
+ return userClassLoader;
+ }
+ MutableURLClassLoader classLoader = userClassLoader.copy();
+ for (ResourceUri resourceUri : new HashSet<>(resourceUris)) {
+
classLoader.addURL(checkNotNull(functionResourceInfos.get(resourceUri)).url);
+ }
+
+ return classLoader;
+ }
+
public Map<ResourceUri, URL> getResources() {
return Collections.unmodifiableMap(resourceInfos);
}
@@ -171,6 +235,7 @@ public class ResourceManager implements Closeable {
@Override
public void close() throws IOException {
resourceInfos.clear();
+ functionResourceInfos.clear();
IOException exception = null;
try {
@@ -309,6 +374,11 @@ public class ResourceManager implements Closeable {
}
}
+ @VisibleForTesting
+ Map<ResourceUri, ResourceCounter> functionResourceInfos() {
+ return functionResourceInfos;
+ }
+
private Path getResourceLocalPath(Path remotePath) {
String fileName = remotePath.getName();
String fileExtension = Files.getFileExtension(fileName);
@@ -327,7 +397,7 @@ public class ResourceManager implements Closeable {
return new Path(localResourceDir, fileNameWithUUID);
}
- private void checkResources(List<ResourceUri> resourceUris, ResourceType
expectedType)
+ private void checkResources(Collection<ResourceUri> resourceUris,
ResourceType expectedType)
throws IOException {
// check the resource type
if (resourceUris.stream()
@@ -360,10 +430,11 @@ public class ResourceManager implements Closeable {
}
private Map<ResourceUri, URL> prepareStagingResources(
- List<ResourceUri> resourceUris,
+ Collection<ResourceUri> resourceUris,
ResourceType expectedType,
boolean executable,
- Consumer<URL> resourceChecker)
+ Consumer<URL> resourceChecker,
+ boolean declareFunctionResource)
throws IOException {
checkResources(resourceUris, expectedType);
@@ -381,24 +452,41 @@ public class ResourceManager implements Closeable {
}
}
- // here can check whether the resource path is valid
- Path path = new Path(resourceUri.getUri());
URL localUrl;
- // download resource to a local path firstly if in remote
- if (isRemotePath(path)) {
- localUrl = downloadResource(path, executable);
+ ResourceUri localResourceUri = resourceUri;
+ if (expectedType == ResourceType.JAR
+ && functionResourceInfos.containsKey(resourceUri)) {
+ // Get local url from function resource infos.
+ localUrl = functionResourceInfos.get(resourceUri).url;
+ // Register resource uri to increase the reference counter
+ functionResourceInfos
+ .computeIfAbsent(resourceUri, key -> new
ResourceCounter(localUrl))
+ .increaseCounter();
} else {
- localUrl = getURLFromPath(path);
- // if the local resource is a relative path, here convert it
to an absolute path
- // before register
- resourceUri = new ResourceUri(expectedType,
localUrl.getPath());
- }
+ // here can check whether the resource path is valid
+ Path path = new Path(resourceUri.getUri());
+ // download resource to a local path firstly if in remote
+ if (isRemotePath(path)) {
+ localUrl = downloadResource(path, executable);
+ } else {
+ localUrl = getURLFromPath(path);
+ // if the local resource is a relative path, here convert
it to an absolute path
+ // before register
+ localResourceUri = new ResourceUri(expectedType,
localUrl.getPath());
+ }
- // check the local file
- resourceChecker.accept(localUrl);
+ // check the local file
+ resourceChecker.accept(localUrl);
+
+ if (declareFunctionResource) {
+ functionResourceInfos
+ .computeIfAbsent(resourceUri, key -> new
ResourceCounter(localUrl))
+ .increaseCounter();
+ }
+ }
// add it to a staging map
- stagingResourceLocalURLs.put(resourceUri, localUrl);
+ stagingResourceLocalURLs.put(localResourceUri, localUrl);
}
return stagingResourceLocalURLs;
}
@@ -419,4 +507,29 @@ public class ResourceManager implements Closeable {
LOG.info("Register resource [{}] successfully.",
resourceUri.getUri());
});
}
+
+ /**
+ * Resource with reference counter, when the counter is 0, it means the
resource can be removed.
+ */
+ static class ResourceCounter {
+ final URL url;
+ int counter;
+
+ private ResourceCounter(URL url) {
+ this.url = url;
+ this.counter = 0;
+ }
+
+ private void increaseCounter() {
+ this.counter++;
+ }
+
+ private boolean decreaseCounter() {
+ this.counter--;
+ checkState(
+ this.counter >= 0,
+ String.format("Invalid reference count[%d] which must >=
0", this.counter));
+ return this.counter == 0;
+ }
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
index bc18ce9bfd1..42305d806af 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
@@ -24,6 +24,13 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
@@ -47,9 +54,13 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
+import static
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
import static
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
import static
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -60,6 +71,14 @@ import static org.junit.jupiter.api.Assertions.fail;
/** Tests for {@link ResourceManager}. */
public class ResourceManagerTest {
+ private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER1 =
+ UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE,
"test_udf1");
+
+ private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER2 =
+ UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE,
"test_udf2");
+
+ private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER3 =
+ UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE,
"test_udf3");
@TempDir private static File tempFolder;
private static File udfJar;
@@ -253,6 +272,38 @@ public class ResourceManagerTest {
});
}
+ @Test
+ public void testRegisterFunctionResource() throws Exception {
+ URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
+
+ // test class loading before register function resource
+ CommonTestUtils.assertThrows(
+ "LowerUDF",
+ ClassNotFoundException.class,
+ () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false,
userClassLoader));
+
+ ResourceUri resourceUri = new ResourceUri(ResourceType.JAR,
udfJar.getPath());
+ // register the same jar repeatedly
+ resourceManager.declareFunctionResources(
+ new HashSet<>(Arrays.asList(resourceUri, resourceUri)));
+
+ // test class loading after register function resource
+ CommonTestUtils.assertThrows(
+ "LowerUDF",
+ ClassNotFoundException.class,
+ () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false,
userClassLoader));
+
+ URLClassLoader functionClassLoader =
+
resourceManager.createUserClassLoader(Arrays.asList(resourceUri, resourceUri));
+ // test load class
+ final Class<?> clazz1 =
+ Class.forName(GENERATED_LOWER_UDF_CLASS, false,
functionClassLoader);
+ final Class<?> clazz2 =
+ Class.forName(GENERATED_LOWER_UDF_CLASS, false,
functionClassLoader);
+
+ assertEquals(clazz1, clazz2);
+ }
+
@MethodSource("provideResource")
@ParameterizedTest
public void testDownloadResource(String pathString, boolean executable)
throws Exception {
@@ -315,6 +366,71 @@ public class ResourceManagerTest {
assertThat(FileUtils.readFileUtf8(new File(targetUri))).isEqualTo("Bye
Bye");
}
+ @Test
+ void testRegisterFunctionWithResource() {
+ ResourceUri resourceUri = new ResourceUri(ResourceType.JAR,
udfJar.getPath());
+ List<ResourceUri> resourceUris =
Collections.singletonList(resourceUri);
+
+ Configuration configuration = new Configuration();
+ FunctionCatalog functionCatalog =
+ new FunctionCatalog(
+ configuration,
+ resourceManager,
+ CatalogManagerMocks.preparedCatalogManager()
+ .defaultCatalog(
+ DEFAULT_CATALOG,
+ new GenericInMemoryCatalog(
+ DEFAULT_CATALOG,
DEFAULT_DATABASE))
+ .build(),
+ new ModuleManager());
+
+ functionCatalog.registerCatalogFunction(
+ FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS,
resourceUris, false);
+
+ Map<ResourceUri, ResourceManager.ResourceCounter>
functionResourceInfos =
+ resourceManager.functionResourceInfos();
+ // Register catalog function will not register its resource to
function resources.
+ assertThat(functionResourceInfos.containsKey(resourceUri)).isFalse();
+ functionCatalog.dropCatalogFunction(FULL_UNRESOLVED_IDENTIFIER1,
false);
+
+ // Register catalog function again to validate that unregister catalog
function will not
+ // decrease the reference count of resourceUris.
+ functionCatalog.registerCatalogFunction(
+ FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS,
resourceUris, false);
+ functionCatalog.registerTemporaryCatalogFunction(
+ FULL_UNRESOLVED_IDENTIFIER2,
+ new CatalogFunctionImpl(
+ GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA,
resourceUris),
+ false);
+ functionCatalog.registerTemporaryCatalogFunction(
+ FULL_UNRESOLVED_IDENTIFIER3,
+ new CatalogFunctionImpl(
+ GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA,
resourceUris),
+ false);
+ functionCatalog.registerTemporarySystemFunction(
+ GENERATED_LOWER_UDF_CLASS,
+ new CatalogFunctionImpl(
+ GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA,
resourceUris),
+ false);
+
+ // There will be three resources for temporary and system functions
without catalog
+ // function.
+
assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(3);
+ // Drop catalog function will not decrease the reference count of
resourceUris.
+ functionCatalog.dropCatalogFunction(FULL_UNRESOLVED_IDENTIFIER1,
false);
+ // There will be three resources for temporary and system functions.
+
assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(3);
+
+
functionCatalog.dropTemporaryCatalogFunction(FULL_UNRESOLVED_IDENTIFIER2,
false);
+
assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(2);
+
+
functionCatalog.dropTemporaryCatalogFunction(FULL_UNRESOLVED_IDENTIFIER3,
false);
+
assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(1);
+
+ functionCatalog.dropTemporarySystemFunction(GENERATED_LOWER_UDF_CLASS,
false);
+ assertThat(functionResourceInfos.containsKey(resourceUri)).isFalse();
+ }
+
@Test
public void testCloseResourceManagerCleanDownloadedResources() throws
Exception {
resourceManager.close();
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index 0daa554d6e6..e94d12d7ee9 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -52,6 +52,7 @@ import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -229,7 +230,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testCreateTemporarySystemFunctionByUsingJar() {
+ public void testCreateTemporarySystemFunctionByUsingJar() throws Exception
{
String ddl =
String.format(
"CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING
JAR '%s'",
@@ -237,6 +238,10 @@ public class FunctionITCase extends StreamingTestBase {
tEnv().executeSql(ddl);
assertThat(Arrays.asList(tEnv().listFunctions())).contains("f10");
+ try (CloseableIterator<Row> itor = tEnv().executeSql("SHOW
JARS").collect()) {
+ assertThat(itor.hasNext()).isFalse();
+ }
+
tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f10");
assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f10");
}