wuchong commented on a change in pull request #15925:
URL: https://github.com/apache/flink/pull/15925#discussion_r636161868



##########
File path: flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
##########
@@ -128,6 +129,7 @@
     "IF"
     "INPUTFORMAT"
     "ITEMS"
+    "JAR"

Review comment:
       It is possible to add this into non-reserved keywords? Otherwise, users 
use `jar` as field names will not work when upgrading to new version.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
##########
@@ -223,6 +223,7 @@ public TableResult executeModifyOperations(String 
sessionId, List<ModifyOperatio
         final TableEnvironmentInternal tEnv =
                 (TableEnvironmentInternal) context.getTableEnvironment();
         try {
+

Review comment:
       revert

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -111,7 +127,8 @@ public void reset() {
         // If rebuild a new Configuration, it loses control of the 
SessionState if users wants to
         // modify the configuration
         resetSessionConfigurationToDefault(defaultContext.getFlinkConfig());
-        this.executionContext = new ExecutionContext(executionContext);
+        
buildClassLoaderAndUpdateDependencies(defaultContext.getDependencies());

Review comment:
       Is it on purpose that `RESET` will also reset added jars? IIRC, `RESET` 
only reset configurations. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
##########
@@ -225,6 +225,8 @@ private CliStrings() {
 
     public static final String MESSAGE_EXECUTE_STATEMENT = "Execute statement 
succeed.";
 
+    public static final String MESSAGE_ADD_JAR_STATEMENT = "The specified jar 
is added.";

Review comment:
       nit: 
   
   ```suggestion
       public static final String MESSAGE_ADD_JAR_STATEMENT = "The specified 
jar is added into session classloader.";
   ```

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -275,4 +315,33 @@ private void 
resetSessionConfigurationToDefault(Configuration defaultConf) {
         }
         sessionConfiguration.addAll(defaultConf);
     }
+
+    private void buildClassLoaderAndUpdateDependencies(Collection<URL> 
newDependencies) {
+        // merge the jar in config with the jar maintained in session
+        Set<URL> jarsInConfig;
+        try {
+            jarsInConfig =
+                    new HashSet<>(
+                            ConfigUtils.decodeListFromConfig(
+                                    sessionConfiguration, 
PipelineOptions.JARS, URL::new));
+        } catch (MalformedURLException e) {
+            throw new SqlExecutionException(
+                    "Failed to parse the option `JARS` in configuration.", e);

Review comment:
       Please use the value of `PipelineOptions.JARS` instead of `JARS`. 

##########
File path: flink-table/flink-sql-client/src/test/resources/sql/function.q
##########
@@ -15,6 +15,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+ADD JAR '$VAR_PIPELINE_JARS_PATH';

Review comment:
       I think the variable is not suitable here, because it refers to the 
`pipeline.jars` configuration value. Would be better to use `$VAR_UDF_JAR_PATH` 
or something similar to make the meaning clear.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -244,6 +261,29 @@ public static SessionContext create(DefaultContext 
defaultContext, String sessio
                 executionContext);
     }
 
+    public void addJar(String jarPath) {
+        // check the jar path is legal
+        URL jar;
+        try {
+            jar = fromLocalFile(new 
File(jarPath).getAbsoluteFile()).toUri().toURL();

Review comment:
       Please add some negative tests, e.g. jar from remote url . 

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -126,61 +133,77 @@ public void testSetAndResetKeyInConfigOptions() throws 
Exception {
         // runtime config from flink-conf
         sessionContext.set(OBJECT_REUSE.key(), "false");
 
-        assertEquals("hive", 
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
-        assertEquals(128, 
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
-        assertEquals("test", getConfiguration(sessionContext).getString(NAME));
-        assertFalse(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+        assertEquals("hive", getConfiguration().getString(TABLE_SQL_DIALECT));
+        assertEquals(128, getConfiguration().getInteger(MAX_PARALLELISM));
+        assertEquals("test", getConfiguration().getString(NAME));
+        assertFalse(getConfiguration().getBoolean(OBJECT_REUSE));
 
         sessionContext.reset(TABLE_SQL_DIALECT.key());
-        assertEquals("default", 
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
+        assertEquals("default", 
getConfiguration().getString(TABLE_SQL_DIALECT));
 
         sessionContext.reset(MAX_PARALLELISM.key());
-        assertEquals(16, 
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
+        assertEquals(16, getConfiguration().getInteger(MAX_PARALLELISM));
 
         sessionContext.reset(NAME.key());
-        assertNull(getConfiguration(sessionContext).get(NAME));
+        assertNull(getConfiguration().get(NAME));
 
         sessionContext.reset(OBJECT_REUSE.key());
-        assertTrue(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+        assertTrue(getConfiguration().getBoolean(OBJECT_REUSE));
     }
 
     @Test
-    public void testSetWithConfigOptionAndResetWithYamlKey() throws Exception {
-        SessionContext sessionContext = createSessionContext();
+    public void testSetWithConfigOptionAndResetWithYamlKey() {
         // runtime config option and has deprecated key
         sessionContext.set(TABLE_PLANNER.key(), "blink");
-        assertEquals(
-                "blink", 
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
+        assertEquals("blink", 
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
 
         sessionContext.reset(TABLE_PLANNER.key());
-        assertEquals(
-                "old", 
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
-        assertEquals(
-                "old", 
getConfigurationMap(sessionContext).get("execution.planner").toLowerCase());
+        assertEquals("old", 
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
+        assertEquals("old", 
getConfigurationMap().get("execution.planner").toLowerCase());
     }
 
     @Test
-    public void testSetAndResetKeyNotInYaml() throws Exception {
-        SessionContext sessionContext = createSessionContext();
+    public void testSetAndResetKeyNotInYaml() {
         // other property not in yaml and flink-conf
         sessionContext.set("aa", "11");
         sessionContext.set("bb", "22");
 
-        assertEquals("11", getConfigurationMap(sessionContext).get("aa"));
-        assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+        assertEquals("11", getConfigurationMap().get("aa"));
+        assertEquals("22", getConfigurationMap().get("bb"));
 
         sessionContext.reset("aa");
-        assertNull(getConfigurationMap(sessionContext).get("aa"));
-        assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+        assertNull(getConfigurationMap().get("aa"));
+        assertEquals("22", getConfigurationMap().get("bb"));
 
         sessionContext.reset("bb");
-        assertNull(getConfigurationMap(sessionContext).get("bb"));
+        assertNull(getConfigurationMap().get("bb"));
+    }
+
+    @Test
+    public void testAddJar() throws IOException {
+        File udfJar =
+                TestUserClassLoaderJar.createJarFile(
+                        tempFolder.newFolder("test-jar"), 
"test-classloader-udf.jar");
+
+        sessionContext.addJar(udfJar.getPath());
+        assertEquals(
+                Collections.singletonList(udfJar.toURI().toURL().toString()),
+                getConfiguration().get(JARS));
+
+        // reset to the default classloader
+        sessionContext.reset();
+        assertEquals(Collections.emptyList(), getConfiguration().get(JARS));
+    }
+
+    @Test
+    public void testAddIllegalJar() {
+        exception.expect(SqlExecutionException.class);

Review comment:
       Please always verify exception message when testing an exception. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -244,6 +261,29 @@ public static SessionContext create(DefaultContext 
defaultContext, String sessio
                 executionContext);
     }
 
+    public void addJar(String jarPath) {
+        // check the jar path is legal
+        URL jar;
+        try {
+            jar = fromLocalFile(new 
File(jarPath).getAbsoluteFile()).toUri().toURL();
+            JarUtils.checkJarFile(jar);
+        } catch (IOException e) {
+            throw new SqlExecutionException(
+                    String.format("Failed to get the jar file with specified 
path: %s.", jarPath));

Review comment:
       Please don't lose the original `IOException` when you wrapping a new 
exception.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to