wuchong commented on a change in pull request #15925:
URL: https://github.com/apache/flink/pull/15925#discussion_r638603628
##########
File path: flink-table/flink-sql-client/src/test/resources/sql/set.q
##########
@@ -167,6 +167,37 @@ execution.shutdown-on-attached-exit=false
execution.target=remote
jobmanager.rpc.address=$VAR_JOBMANAGER_RPC_ADDRESS
pipeline.classpaths=
-pipeline.jars=$VAR_PIPELINE_JARS
+pipeline.jars=
+rest.port=$VAR_REST_PORT
+!ok
+
+# test add jar
Review comment:
```suggestion
# test "reset" can work with "add jar"
```
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -275,4 +322,33 @@ private void
resetSessionConfigurationToDefault(Configuration defaultConf) {
}
sessionConfiguration.addAll(defaultConf);
}
+
+ private void buildClassLoaderAndUpdateDependencies(Collection<URL>
newDependencies) {
Review comment:
nit: the method name `buildClassLoader...` sounds like it returns a new
classloader object, however it is not. Would be better to rename to
`rebuildClassLoader..`.
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -202,15 +240,32 @@ private SessionContext createSessionContext() throws
Exception {
return SessionContext.create(defaultContext, "test-session");
}
- private Map<String, String> getConfigurationMap(SessionContext context) {
- return context.getExecutionContext()
+ private Map<String, String> getConfigurationMap() {
+ return sessionContext
+ .getExecutionContext()
.getTableEnvironment()
.getConfig()
.getConfiguration()
.toMap();
}
- private Configuration getConfiguration(SessionContext context) {
- return
context.getExecutionContext().getTableEnvironment().getConfig().getConfiguration();
+ private Configuration getConfiguration() {
+ return sessionContext
+ .getExecutionContext()
+ .getTableEnvironment()
+ .getConfig()
+ .getConfiguration();
+ }
+
+ private void validateAddJarException(String jarPath, String errorMessages)
{
+ Set<URL> originDependencies = sessionContext.getDependencies();
+ try {
+ sessionContext.addJar(jarPath);
Review comment:
Please add `fail("should fail");`, otherwise the test will pass if no
exception is thrown.
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
##########
@@ -106,7 +106,8 @@ public static void setup() throws IOException {
historyPath = tempFolder.newFile("history").toPath();
replaceVars = new HashMap<>();
- replaceVars.put("$VAR_PIPELINE_JARS", udfDependency.toString());
+ replaceVars.put("$VAR_UDF_JAR_PATH", udfDependency.getPath());
Review comment:
`udfDependency` can be a local variable now.
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -126,61 +140,87 @@ public void testSetAndResetKeyInConfigOptions() throws
Exception {
// runtime config from flink-conf
sessionContext.set(OBJECT_REUSE.key(), "false");
- assertEquals("hive",
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
- assertEquals(128,
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
- assertEquals("test", getConfiguration(sessionContext).getString(NAME));
- assertFalse(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+ assertEquals("hive", getConfiguration().getString(TABLE_SQL_DIALECT));
+ assertEquals(128, getConfiguration().getInteger(MAX_PARALLELISM));
+ assertEquals("test", getConfiguration().getString(NAME));
+ assertFalse(getConfiguration().getBoolean(OBJECT_REUSE));
sessionContext.reset(TABLE_SQL_DIALECT.key());
- assertEquals("default",
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
+ assertEquals("default",
getConfiguration().getString(TABLE_SQL_DIALECT));
sessionContext.reset(MAX_PARALLELISM.key());
- assertEquals(16,
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
+ assertEquals(16, getConfiguration().getInteger(MAX_PARALLELISM));
sessionContext.reset(NAME.key());
- assertNull(getConfiguration(sessionContext).get(NAME));
+ assertNull(getConfiguration().get(NAME));
sessionContext.reset(OBJECT_REUSE.key());
- assertTrue(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+ assertTrue(getConfiguration().getBoolean(OBJECT_REUSE));
}
@Test
- public void testSetWithConfigOptionAndResetWithYamlKey() throws Exception {
- SessionContext sessionContext = createSessionContext();
+ public void testSetWithConfigOptionAndResetWithYamlKey() {
// runtime config option and has deprecated key
sessionContext.set(TABLE_PLANNER.key(), "blink");
- assertEquals(
- "blink",
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
+ assertEquals("blink",
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
sessionContext.reset(TABLE_PLANNER.key());
- assertEquals(
- "old",
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
- assertEquals(
- "old",
getConfigurationMap(sessionContext).get("execution.planner").toLowerCase());
+ assertEquals("old",
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
+ assertEquals("old",
getConfigurationMap().get("execution.planner").toLowerCase());
}
@Test
- public void testSetAndResetKeyNotInYaml() throws Exception {
- SessionContext sessionContext = createSessionContext();
+ public void testSetAndResetKeyNotInYaml() {
// other property not in yaml and flink-conf
sessionContext.set("aa", "11");
sessionContext.set("bb", "22");
- assertEquals("11", getConfigurationMap(sessionContext).get("aa"));
- assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+ assertEquals("11", getConfigurationMap().get("aa"));
+ assertEquals("22", getConfigurationMap().get("bb"));
sessionContext.reset("aa");
- assertNull(getConfigurationMap(sessionContext).get("aa"));
- assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+ assertNull(getConfigurationMap().get("aa"));
+ assertEquals("22", getConfigurationMap().get("bb"));
sessionContext.reset("bb");
- assertNull(getConfigurationMap(sessionContext).get("bb"));
+ assertNull(getConfigurationMap().get("bb"));
+ }
+
+ @Test
+ public void testAddJar() throws IOException {
+ sessionContext.addJar(udfJar.getPath());
Review comment:
Please also add tests which the jar path is prefixed with `file://` and
relative path `./xxx`.
You can use `new
File(".").getAbsoluteFile().toPath().relativize(udfJar.toPath())` to get a
relative path to the current working dir.
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -202,15 +240,32 @@ private SessionContext createSessionContext() throws
Exception {
return SessionContext.create(defaultContext, "test-session");
}
- private Map<String, String> getConfigurationMap(SessionContext context) {
- return context.getExecutionContext()
+ private Map<String, String> getConfigurationMap() {
+ return sessionContext
+ .getExecutionContext()
.getTableEnvironment()
.getConfig()
.getConfiguration()
.toMap();
}
- private Configuration getConfiguration(SessionContext context) {
- return
context.getExecutionContext().getTableEnvironment().getConfig().getConfiguration();
+ private Configuration getConfiguration() {
+ return sessionContext
+ .getExecutionContext()
+ .getTableEnvironment()
+ .getConfig()
+ .getConfiguration();
+ }
+
+ private void validateAddJarException(String jarPath, String errorMessages)
{
+ Set<URL> originDependencies = sessionContext.getDependencies();
+ try {
+ sessionContext.addJar(jarPath);
+ } catch (Exception e) {
+ containsCause(e, IOException.class);
+ assertTrue(e.getCause().getMessage().contains(errorMessages));
Review comment:
You can use `assertThat(e,
org.apache.flink.core.testutils.FlinkMatchers#containsMessage(errorMessages))`
to verify nested exception message, you can also use
`org.apache.flink.core.testutils.FlinkMatchers#containsCause(java.lang.Throwable)`
to verify nested exception directly.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -111,7 +133,8 @@ public void reset() {
// If rebuild a new Configuration, it loses control of the
SessionState if users wants to
// modify the configuration
resetSessionConfigurationToDefault(defaultContext.getFlinkConfig());
- this.executionContext = new ExecutionContext(executionContext);
+ buildClassLoaderAndUpdateDependencies(dependencies);
Review comment:
Please add comment to describe why we need to rebuild the classloader
here (the new classloader actually is the same as before).
IIUC, the main reason here is because the `pipeline.jars` option has been
reverted, but the `pipeline.jars` should inlcude `dependencies` jars, otherwise
the udf jars can't work during runtime execution.
##########
File path: flink-table/flink-sql-client/src/test/resources/sql/set.q
##########
@@ -167,6 +167,37 @@ execution.shutdown-on-attached-exit=false
execution.target=remote
jobmanager.rpc.address=$VAR_JOBMANAGER_RPC_ADDRESS
pipeline.classpaths=
-pipeline.jars=$VAR_PIPELINE_JARS
+pipeline.jars=
+rest.port=$VAR_REST_PORT
+!ok
+
+# test add jar
+ADD JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is added into session classloader.
+!info
+
+set;
+execution.attached=true
+execution.savepoint.ignore-unclaimed-state=false
+execution.shutdown-on-attached-exit=false
+execution.target=remote
+jobmanager.rpc.address=localhost
+pipeline.classpaths=
+pipeline.jars=$VAR_PIPELINE_JARS_URL
+rest.port=$VAR_REST_PORT
+!ok
+
+reset;
+[INFO] All session properties have been set to their default values.
+!info
Review comment:
Would be better to call `create function func1 as 'LowerUDF' LANGUAGE
JAVA;` and `SELECT id, func1(str) FROM (VALUES (1, 'Hello World');` to verify
the added jar still works.
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -126,61 +140,87 @@ public void testSetAndResetKeyInConfigOptions() throws
Exception {
// runtime config from flink-conf
sessionContext.set(OBJECT_REUSE.key(), "false");
- assertEquals("hive",
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
- assertEquals(128,
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
- assertEquals("test", getConfiguration(sessionContext).getString(NAME));
- assertFalse(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+ assertEquals("hive", getConfiguration().getString(TABLE_SQL_DIALECT));
+ assertEquals(128, getConfiguration().getInteger(MAX_PARALLELISM));
+ assertEquals("test", getConfiguration().getString(NAME));
+ assertFalse(getConfiguration().getBoolean(OBJECT_REUSE));
sessionContext.reset(TABLE_SQL_DIALECT.key());
- assertEquals("default",
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
+ assertEquals("default",
getConfiguration().getString(TABLE_SQL_DIALECT));
sessionContext.reset(MAX_PARALLELISM.key());
- assertEquals(16,
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
+ assertEquals(16, getConfiguration().getInteger(MAX_PARALLELISM));
sessionContext.reset(NAME.key());
- assertNull(getConfiguration(sessionContext).get(NAME));
+ assertNull(getConfiguration().get(NAME));
sessionContext.reset(OBJECT_REUSE.key());
- assertTrue(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+ assertTrue(getConfiguration().getBoolean(OBJECT_REUSE));
}
@Test
- public void testSetWithConfigOptionAndResetWithYamlKey() throws Exception {
- SessionContext sessionContext = createSessionContext();
+ public void testSetWithConfigOptionAndResetWithYamlKey() {
// runtime config option and has deprecated key
sessionContext.set(TABLE_PLANNER.key(), "blink");
- assertEquals(
- "blink",
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
+ assertEquals("blink",
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
sessionContext.reset(TABLE_PLANNER.key());
- assertEquals(
- "old",
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
- assertEquals(
- "old",
getConfigurationMap(sessionContext).get("execution.planner").toLowerCase());
+ assertEquals("old",
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
+ assertEquals("old",
getConfigurationMap().get("execution.planner").toLowerCase());
}
@Test
- public void testSetAndResetKeyNotInYaml() throws Exception {
- SessionContext sessionContext = createSessionContext();
+ public void testSetAndResetKeyNotInYaml() {
// other property not in yaml and flink-conf
sessionContext.set("aa", "11");
sessionContext.set("bb", "22");
- assertEquals("11", getConfigurationMap(sessionContext).get("aa"));
- assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+ assertEquals("11", getConfigurationMap().get("aa"));
+ assertEquals("22", getConfigurationMap().get("bb"));
sessionContext.reset("aa");
- assertNull(getConfigurationMap(sessionContext).get("aa"));
- assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+ assertNull(getConfigurationMap().get("aa"));
+ assertEquals("22", getConfigurationMap().get("bb"));
sessionContext.reset("bb");
- assertNull(getConfigurationMap(sessionContext).get("bb"));
+ assertNull(getConfigurationMap().get("bb"));
+ }
+
+ @Test
+ public void testAddJar() throws IOException {
+ sessionContext.addJar(udfJar.getPath());
+ assertEquals(
+ Collections.singletonList(udfJar.toURI().toURL().toString()),
+ getConfiguration().get(JARS));
+
+ // reset to the default classloader
+ sessionContext.reset();
+ assertEquals(
+ Collections.singletonList(udfJar.toURI().toURL().toString()),
+ getConfiguration().get(JARS));
+ }
+
+ @Test
+ public void testAddIllegalJar() {
+ validateAddJarException("/path/to/illegal.jar", "JAR file does not
exist");
+ }
+
+ @Test
+ public void testRemoteJar() {
+ validateAddJarException("hdfs://remote:10080/remote.jar", "JAR file
does not exist");
Review comment:
I think this exception message is misleading, the jar actually exists
there. We just doesn't support remote jar.
We should check the protocal/schema of the url, we only support `file` now.
You can use `URI.create(jarPath).getSchema()` to get the schema and throw a
meaningful message if not `file`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]