wuchong commented on a change in pull request #15925:
URL: https://github.com/apache/flink/pull/15925#discussion_r638603628



##########
File path: flink-table/flink-sql-client/src/test/resources/sql/set.q
##########
@@ -167,6 +167,37 @@ execution.shutdown-on-attached-exit=false
 execution.target=remote
 jobmanager.rpc.address=$VAR_JOBMANAGER_RPC_ADDRESS
 pipeline.classpaths=
-pipeline.jars=$VAR_PIPELINE_JARS
+pipeline.jars=
+rest.port=$VAR_REST_PORT
+!ok
+
+# test add jar

Review comment:
       ```suggestion
   # test "reset" can work with "add jar"
   ```

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -275,4 +322,33 @@ private void 
resetSessionConfigurationToDefault(Configuration defaultConf) {
         }
         sessionConfiguration.addAll(defaultConf);
     }
+
+    private void buildClassLoaderAndUpdateDependencies(Collection<URL> 
newDependencies) {

Review comment:
       nit: the method name `buildClassLoader...` sounds like it returns a new 
classloader object, however it is not. Would be better to rename to 
`rebuildClassLoader..`.  

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -202,15 +240,32 @@ private SessionContext createSessionContext() throws 
Exception {
         return SessionContext.create(defaultContext, "test-session");
     }
 
-    private Map<String, String> getConfigurationMap(SessionContext context) {
-        return context.getExecutionContext()
+    private Map<String, String> getConfigurationMap() {
+        return sessionContext
+                .getExecutionContext()
                 .getTableEnvironment()
                 .getConfig()
                 .getConfiguration()
                 .toMap();
     }
 
-    private Configuration getConfiguration(SessionContext context) {
-        return 
context.getExecutionContext().getTableEnvironment().getConfig().getConfiguration();
+    private Configuration getConfiguration() {
+        return sessionContext
+                .getExecutionContext()
+                .getTableEnvironment()
+                .getConfig()
+                .getConfiguration();
+    }
+
+    private void validateAddJarException(String jarPath, String errorMessages) 
{
+        Set<URL> originDependencies = sessionContext.getDependencies();
+        try {
+            sessionContext.addJar(jarPath);

Review comment:
       Please add `fail("should fail");`, otherwise the test will pass if no 
exception is thrown. 

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
##########
@@ -106,7 +106,8 @@ public static void setup() throws IOException {
         historyPath = tempFolder.newFile("history").toPath();
 
         replaceVars = new HashMap<>();
-        replaceVars.put("$VAR_PIPELINE_JARS", udfDependency.toString());
+        replaceVars.put("$VAR_UDF_JAR_PATH", udfDependency.getPath());

Review comment:
       `udfDependency` can be a local variable now. 

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -126,61 +140,87 @@ public void testSetAndResetKeyInConfigOptions() throws 
Exception {
         // runtime config from flink-conf
         sessionContext.set(OBJECT_REUSE.key(), "false");
 
-        assertEquals("hive", 
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
-        assertEquals(128, 
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
-        assertEquals("test", getConfiguration(sessionContext).getString(NAME));
-        assertFalse(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+        assertEquals("hive", getConfiguration().getString(TABLE_SQL_DIALECT));
+        assertEquals(128, getConfiguration().getInteger(MAX_PARALLELISM));
+        assertEquals("test", getConfiguration().getString(NAME));
+        assertFalse(getConfiguration().getBoolean(OBJECT_REUSE));
 
         sessionContext.reset(TABLE_SQL_DIALECT.key());
-        assertEquals("default", 
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
+        assertEquals("default", 
getConfiguration().getString(TABLE_SQL_DIALECT));
 
         sessionContext.reset(MAX_PARALLELISM.key());
-        assertEquals(16, 
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
+        assertEquals(16, getConfiguration().getInteger(MAX_PARALLELISM));
 
         sessionContext.reset(NAME.key());
-        assertNull(getConfiguration(sessionContext).get(NAME));
+        assertNull(getConfiguration().get(NAME));
 
         sessionContext.reset(OBJECT_REUSE.key());
-        assertTrue(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+        assertTrue(getConfiguration().getBoolean(OBJECT_REUSE));
     }
 
     @Test
-    public void testSetWithConfigOptionAndResetWithYamlKey() throws Exception {
-        SessionContext sessionContext = createSessionContext();
+    public void testSetWithConfigOptionAndResetWithYamlKey() {
         // runtime config option and has deprecated key
         sessionContext.set(TABLE_PLANNER.key(), "blink");
-        assertEquals(
-                "blink", 
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
+        assertEquals("blink", 
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
 
         sessionContext.reset(TABLE_PLANNER.key());
-        assertEquals(
-                "old", 
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
-        assertEquals(
-                "old", 
getConfigurationMap(sessionContext).get("execution.planner").toLowerCase());
+        assertEquals("old", 
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
+        assertEquals("old", 
getConfigurationMap().get("execution.planner").toLowerCase());
     }
 
     @Test
-    public void testSetAndResetKeyNotInYaml() throws Exception {
-        SessionContext sessionContext = createSessionContext();
+    public void testSetAndResetKeyNotInYaml() {
         // other property not in yaml and flink-conf
         sessionContext.set("aa", "11");
         sessionContext.set("bb", "22");
 
-        assertEquals("11", getConfigurationMap(sessionContext).get("aa"));
-        assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+        assertEquals("11", getConfigurationMap().get("aa"));
+        assertEquals("22", getConfigurationMap().get("bb"));
 
         sessionContext.reset("aa");
-        assertNull(getConfigurationMap(sessionContext).get("aa"));
-        assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+        assertNull(getConfigurationMap().get("aa"));
+        assertEquals("22", getConfigurationMap().get("bb"));
 
         sessionContext.reset("bb");
-        assertNull(getConfigurationMap(sessionContext).get("bb"));
+        assertNull(getConfigurationMap().get("bb"));
+    }
+
+    @Test
+    public void testAddJar() throws IOException {
+        sessionContext.addJar(udfJar.getPath());

Review comment:
       Please also add tests which the jar path is prefixed with `file://` and 
relative path `./xxx`.
   
   You can use `new 
File(".").getAbsoluteFile().toPath().relativize(udfJar.toPath())` to get a 
relative path to the current working dir. 

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -202,15 +240,32 @@ private SessionContext createSessionContext() throws 
Exception {
         return SessionContext.create(defaultContext, "test-session");
     }
 
-    private Map<String, String> getConfigurationMap(SessionContext context) {
-        return context.getExecutionContext()
+    private Map<String, String> getConfigurationMap() {
+        return sessionContext
+                .getExecutionContext()
                 .getTableEnvironment()
                 .getConfig()
                 .getConfiguration()
                 .toMap();
     }
 
-    private Configuration getConfiguration(SessionContext context) {
-        return 
context.getExecutionContext().getTableEnvironment().getConfig().getConfiguration();
+    private Configuration getConfiguration() {
+        return sessionContext
+                .getExecutionContext()
+                .getTableEnvironment()
+                .getConfig()
+                .getConfiguration();
+    }
+
+    private void validateAddJarException(String jarPath, String errorMessages) 
{
+        Set<URL> originDependencies = sessionContext.getDependencies();
+        try {
+            sessionContext.addJar(jarPath);
+        } catch (Exception e) {
+            containsCause(e, IOException.class);
+            assertTrue(e.getCause().getMessage().contains(errorMessages));

Review comment:
       You can use `assertThat(e, 
org.apache.flink.core.testutils.FlinkMatchers#containsMessage(errorMessages))` 
to verify nested exception message, you can also use 
`org.apache.flink.core.testutils.FlinkMatchers#containsCause(java.lang.Throwable)`
 to verify nested exception directly. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -111,7 +133,8 @@ public void reset() {
         // If rebuild a new Configuration, it loses control of the 
SessionState if users wants to
         // modify the configuration
         resetSessionConfigurationToDefault(defaultContext.getFlinkConfig());
-        this.executionContext = new ExecutionContext(executionContext);
+        buildClassLoaderAndUpdateDependencies(dependencies);

Review comment:
       Please add comment to describe why we need to rebuild the classloader 
here (the new classloader actually is the same as before). 
   
   IIUC, the main reason here is because the `pipeline.jars` option has been 
reverted, but the `pipeline.jars` should inlcude `dependencies` jars, otherwise 
the udf jars can't work during runtime execution. 

##########
File path: flink-table/flink-sql-client/src/test/resources/sql/set.q
##########
@@ -167,6 +167,37 @@ execution.shutdown-on-attached-exit=false
 execution.target=remote
 jobmanager.rpc.address=$VAR_JOBMANAGER_RPC_ADDRESS
 pipeline.classpaths=
-pipeline.jars=$VAR_PIPELINE_JARS
+pipeline.jars=
+rest.port=$VAR_REST_PORT
+!ok
+
+# test add jar
+ADD JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is added into session classloader.
+!info
+
+set;
+execution.attached=true
+execution.savepoint.ignore-unclaimed-state=false
+execution.shutdown-on-attached-exit=false
+execution.target=remote
+jobmanager.rpc.address=localhost
+pipeline.classpaths=
+pipeline.jars=$VAR_PIPELINE_JARS_URL
+rest.port=$VAR_REST_PORT
+!ok
+
+reset;
+[INFO] All session properties have been set to their default values.
+!info

Review comment:
       Would be better to call `create function func1 as 'LowerUDF' LANGUAGE 
JAVA;` and `SELECT id, func1(str) FROM (VALUES (1, 'Hello World');` to verify 
the added jar still works. 

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -126,61 +140,87 @@ public void testSetAndResetKeyInConfigOptions() throws 
Exception {
         // runtime config from flink-conf
         sessionContext.set(OBJECT_REUSE.key(), "false");
 
-        assertEquals("hive", 
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
-        assertEquals(128, 
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
-        assertEquals("test", getConfiguration(sessionContext).getString(NAME));
-        assertFalse(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+        assertEquals("hive", getConfiguration().getString(TABLE_SQL_DIALECT));
+        assertEquals(128, getConfiguration().getInteger(MAX_PARALLELISM));
+        assertEquals("test", getConfiguration().getString(NAME));
+        assertFalse(getConfiguration().getBoolean(OBJECT_REUSE));
 
         sessionContext.reset(TABLE_SQL_DIALECT.key());
-        assertEquals("default", 
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
+        assertEquals("default", 
getConfiguration().getString(TABLE_SQL_DIALECT));
 
         sessionContext.reset(MAX_PARALLELISM.key());
-        assertEquals(16, 
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
+        assertEquals(16, getConfiguration().getInteger(MAX_PARALLELISM));
 
         sessionContext.reset(NAME.key());
-        assertNull(getConfiguration(sessionContext).get(NAME));
+        assertNull(getConfiguration().get(NAME));
 
         sessionContext.reset(OBJECT_REUSE.key());
-        assertTrue(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+        assertTrue(getConfiguration().getBoolean(OBJECT_REUSE));
     }
 
     @Test
-    public void testSetWithConfigOptionAndResetWithYamlKey() throws Exception {
-        SessionContext sessionContext = createSessionContext();
+    public void testSetWithConfigOptionAndResetWithYamlKey() {
         // runtime config option and has deprecated key
         sessionContext.set(TABLE_PLANNER.key(), "blink");
-        assertEquals(
-                "blink", 
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
+        assertEquals("blink", 
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
 
         sessionContext.reset(TABLE_PLANNER.key());
-        assertEquals(
-                "old", 
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
-        assertEquals(
-                "old", 
getConfigurationMap(sessionContext).get("execution.planner").toLowerCase());
+        assertEquals("old", 
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
+        assertEquals("old", 
getConfigurationMap().get("execution.planner").toLowerCase());
     }
 
     @Test
-    public void testSetAndResetKeyNotInYaml() throws Exception {
-        SessionContext sessionContext = createSessionContext();
+    public void testSetAndResetKeyNotInYaml() {
         // other property not in yaml and flink-conf
         sessionContext.set("aa", "11");
         sessionContext.set("bb", "22");
 
-        assertEquals("11", getConfigurationMap(sessionContext).get("aa"));
-        assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+        assertEquals("11", getConfigurationMap().get("aa"));
+        assertEquals("22", getConfigurationMap().get("bb"));
 
         sessionContext.reset("aa");
-        assertNull(getConfigurationMap(sessionContext).get("aa"));
-        assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+        assertNull(getConfigurationMap().get("aa"));
+        assertEquals("22", getConfigurationMap().get("bb"));
 
         sessionContext.reset("bb");
-        assertNull(getConfigurationMap(sessionContext).get("bb"));
+        assertNull(getConfigurationMap().get("bb"));
+    }
+
+    @Test
+    public void testAddJar() throws IOException {
+        sessionContext.addJar(udfJar.getPath());
+        assertEquals(
+                Collections.singletonList(udfJar.toURI().toURL().toString()),
+                getConfiguration().get(JARS));
+
+        // reset to the default classloader
+        sessionContext.reset();
+        assertEquals(
+                Collections.singletonList(udfJar.toURI().toURL().toString()),
+                getConfiguration().get(JARS));
+    }
+
+    @Test
+    public void testAddIllegalJar() {
+        validateAddJarException("/path/to/illegal.jar", "JAR file does not 
exist");
+    }
+
+    @Test
+    public void testRemoteJar() {
+        validateAddJarException("hdfs://remote:10080/remote.jar", "JAR file 
does not exist");

Review comment:
       I think this exception message is misleading, the jar actually exists 
there. We just doesn't support remote jar. 
   
   We should check the protocal/schema of the url, we only support `file` now. 
You can use `URI.create(jarPath).getSchema()` to get the schema and throw a 
meaningful message if not `file`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to