This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1b041db [FLINK-26618][sql-client] Fix 'remove jar' statement is not
aligned with pipeline jars
1b041db is described below
commit 1b041dbd9a781ef7a3927df45149be4f09e9ea7b
Author: Paul Lin <[email protected]>
AuthorDate: Sat Mar 19 11:50:57 2022 +0800
[FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with
pipeline jars
This closes #19070.
---
.../client/gateway/context/SessionContext.java | 37 ++++++++++++++--------
.../flink-sql-client/src/test/resources/sql/set.q | 8 +++++
2 files changed, 32 insertions(+), 13 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index d392460..c490823 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -262,7 +262,11 @@ public class SessionContext {
return;
}
+ // merge the jars in config with the jars maintained in session
+ Set<URL> jarsInConfig = getJarsInConfig();
+
Set<URL> newDependencies = new HashSet<>(dependencies);
+ newDependencies.addAll(jarsInConfig);
newDependencies.add(jarURL);
updateClassLoaderAndDependencies(newDependencies);
@@ -281,7 +285,11 @@ public class SessionContext {
}
Set<URL> newDependencies = new HashSet<>(dependencies);
+ // merge the jars in config with the jars maintained in session
+ Set<URL> jarsInConfig = getJarsInConfig();
+ newDependencies.addAll(jarsInConfig);
newDependencies.remove(jarURL);
+
updateClassLoaderAndDependencies(newDependencies);
// renew the execution context
@@ -325,22 +333,11 @@ public class SessionContext {
}
private void updateClassLoaderAndDependencies(Collection<URL>
newDependencies) {
- // merge the jar in config with the jar maintained in session
- Set<URL> jarsInConfig;
- try {
- jarsInConfig =
- new HashSet<>(
- ConfigUtils.decodeListFromConfig(
- sessionConfiguration,
PipelineOptions.JARS, URL::new));
- } catch (MalformedURLException e) {
- throw new SqlExecutionException(
- "Failed to parse the option `pipeline.jars` in
configuration.", e);
- }
- jarsInConfig.addAll(newDependencies);
+ // replace jars with the new dependencies
ConfigUtils.encodeCollectionToConfig(
sessionConfiguration,
PipelineOptions.JARS,
- new ArrayList<>(jarsInConfig),
+ new ArrayList<>(newDependencies),
URL::toString);
// TODO: update the classloader in CatalogManager.
@@ -375,4 +372,18 @@ public class SessionContext {
e);
}
}
+
+ private Set<URL> getJarsInConfig() {
+ Set<URL> jarsInConfig;
+ try {
+ jarsInConfig =
+ new HashSet<>(
+ ConfigUtils.decodeListFromConfig(
+ sessionConfiguration,
PipelineOptions.JARS, URL::new));
+ } catch (MalformedURLException e) {
+ throw new SqlExecutionException(
+ "Failed to parse the option `pipeline.jars` in
configuration.", e);
+ }
+ return jarsInConfig;
+ }
}
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q
b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index 5da0b88..5d214ef 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -170,3 +170,11 @@ SELECT id, func1(str) FROM (VALUES (1, 'Hello World')) AS
T(id, str) ;
+----+-------------+--------------------------------+
Received a total of 1 row
!ok
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+SHOW JARS;
+Empty set
+!ok