wuchong commented on a change in pull request #15925:
URL: https://github.com/apache/flink/pull/15925#discussion_r636161868
##########
File path: flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
##########
@@ -128,6 +129,7 @@
"IF"
"INPUTFORMAT"
"ITEMS"
+ "JAR"
Review comment:
It is possible to add this into non-reserved keywords? Otherwise, users
use `jar` as field names will not work when upgrading to new version.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
##########
@@ -223,6 +223,7 @@ public TableResult executeModifyOperations(String
sessionId, List<ModifyOperatio
final TableEnvironmentInternal tEnv =
(TableEnvironmentInternal) context.getTableEnvironment();
try {
+
Review comment:
revert
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -111,7 +127,8 @@ public void reset() {
// If rebuild a new Configuration, it loses control of the
SessionState if users wants to
// modify the configuration
resetSessionConfigurationToDefault(defaultContext.getFlinkConfig());
- this.executionContext = new ExecutionContext(executionContext);
+
buildClassLoaderAndUpdateDependencies(defaultContext.getDependencies());
Review comment:
Is it on purpose that `RESET` will also reset added jars? IIRC, `RESET`
only reset configurations.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
##########
@@ -225,6 +225,8 @@ private CliStrings() {
public static final String MESSAGE_EXECUTE_STATEMENT = "Execute statement
succeed.";
+ public static final String MESSAGE_ADD_JAR_STATEMENT = "The specified jar
is added.";
Review comment:
nit:
```suggestion
public static final String MESSAGE_ADD_JAR_STATEMENT = "The specified
jar is added into session classloader.";
```
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -275,4 +315,33 @@ private void
resetSessionConfigurationToDefault(Configuration defaultConf) {
}
sessionConfiguration.addAll(defaultConf);
}
+
+ private void buildClassLoaderAndUpdateDependencies(Collection<URL>
newDependencies) {
+ // merge the jar in config with the jar maintained in session
+ Set<URL> jarsInConfig;
+ try {
+ jarsInConfig =
+ new HashSet<>(
+ ConfigUtils.decodeListFromConfig(
+ sessionConfiguration,
PipelineOptions.JARS, URL::new));
+ } catch (MalformedURLException e) {
+ throw new SqlExecutionException(
+ "Failed to parse the option `JARS` in configuration.", e);
Review comment:
Please use the value of `PipelineOptions.JARS` instead of `JARS`.
##########
File path: flink-table/flink-sql-client/src/test/resources/sql/function.q
##########
@@ -15,6 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+ADD JAR '$VAR_PIPELINE_JARS_PATH';
Review comment:
I think the variable is not suitable here, because it refers to the
`pipeline.jars` configuration value. Would be better to use `$VAR_UDF_JAR_PATH`
or something similar to make the meaning clear.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -244,6 +261,29 @@ public static SessionContext create(DefaultContext
defaultContext, String sessio
executionContext);
}
+ public void addJar(String jarPath) {
+ // check the jar path is legal
+ URL jar;
+ try {
+ jar = fromLocalFile(new
File(jarPath).getAbsoluteFile()).toUri().toURL();
Review comment:
Please add some negative tests, e.g. jar from remote url .
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -126,61 +133,77 @@ public void testSetAndResetKeyInConfigOptions() throws
Exception {
// runtime config from flink-conf
sessionContext.set(OBJECT_REUSE.key(), "false");
- assertEquals("hive",
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
- assertEquals(128,
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
- assertEquals("test", getConfiguration(sessionContext).getString(NAME));
- assertFalse(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+ assertEquals("hive", getConfiguration().getString(TABLE_SQL_DIALECT));
+ assertEquals(128, getConfiguration().getInteger(MAX_PARALLELISM));
+ assertEquals("test", getConfiguration().getString(NAME));
+ assertFalse(getConfiguration().getBoolean(OBJECT_REUSE));
sessionContext.reset(TABLE_SQL_DIALECT.key());
- assertEquals("default",
getConfiguration(sessionContext).getString(TABLE_SQL_DIALECT));
+ assertEquals("default",
getConfiguration().getString(TABLE_SQL_DIALECT));
sessionContext.reset(MAX_PARALLELISM.key());
- assertEquals(16,
getConfiguration(sessionContext).getInteger(MAX_PARALLELISM));
+ assertEquals(16, getConfiguration().getInteger(MAX_PARALLELISM));
sessionContext.reset(NAME.key());
- assertNull(getConfiguration(sessionContext).get(NAME));
+ assertNull(getConfiguration().get(NAME));
sessionContext.reset(OBJECT_REUSE.key());
- assertTrue(getConfiguration(sessionContext).getBoolean(OBJECT_REUSE));
+ assertTrue(getConfiguration().getBoolean(OBJECT_REUSE));
}
@Test
- public void testSetWithConfigOptionAndResetWithYamlKey() throws Exception {
- SessionContext sessionContext = createSessionContext();
+ public void testSetWithConfigOptionAndResetWithYamlKey() {
// runtime config option and has deprecated key
sessionContext.set(TABLE_PLANNER.key(), "blink");
- assertEquals(
- "blink",
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
+ assertEquals("blink",
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
sessionContext.reset(TABLE_PLANNER.key());
- assertEquals(
- "old",
getConfiguration(sessionContext).get(TABLE_PLANNER).name().toLowerCase());
- assertEquals(
- "old",
getConfigurationMap(sessionContext).get("execution.planner").toLowerCase());
+ assertEquals("old",
getConfiguration().get(TABLE_PLANNER).name().toLowerCase());
+ assertEquals("old",
getConfigurationMap().get("execution.planner").toLowerCase());
}
@Test
- public void testSetAndResetKeyNotInYaml() throws Exception {
- SessionContext sessionContext = createSessionContext();
+ public void testSetAndResetKeyNotInYaml() {
// other property not in yaml and flink-conf
sessionContext.set("aa", "11");
sessionContext.set("bb", "22");
- assertEquals("11", getConfigurationMap(sessionContext).get("aa"));
- assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+ assertEquals("11", getConfigurationMap().get("aa"));
+ assertEquals("22", getConfigurationMap().get("bb"));
sessionContext.reset("aa");
- assertNull(getConfigurationMap(sessionContext).get("aa"));
- assertEquals("22", getConfigurationMap(sessionContext).get("bb"));
+ assertNull(getConfigurationMap().get("aa"));
+ assertEquals("22", getConfigurationMap().get("bb"));
sessionContext.reset("bb");
- assertNull(getConfigurationMap(sessionContext).get("bb"));
+ assertNull(getConfigurationMap().get("bb"));
+ }
+
+ @Test
+ public void testAddJar() throws IOException {
+ File udfJar =
+ TestUserClassLoaderJar.createJarFile(
+ tempFolder.newFolder("test-jar"),
"test-classloader-udf.jar");
+
+ sessionContext.addJar(udfJar.getPath());
+ assertEquals(
+ Collections.singletonList(udfJar.toURI().toURL().toString()),
+ getConfiguration().get(JARS));
+
+ // reset to the default classloader
+ sessionContext.reset();
+ assertEquals(Collections.emptyList(), getConfiguration().get(JARS));
+ }
+
+ @Test
+ public void testAddIllegalJar() {
+ exception.expect(SqlExecutionException.class);
Review comment:
Please always verify exception message when testing an exception.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -244,6 +261,29 @@ public static SessionContext create(DefaultContext
defaultContext, String sessio
executionContext);
}
+ public void addJar(String jarPath) {
+ // check the jar path is legal
+ URL jar;
+ try {
+ jar = fromLocalFile(new
File(jarPath).getAbsoluteFile()).toUri().toURL();
+ JarUtils.checkJarFile(jar);
+ } catch (IOException e) {
+ throw new SqlExecutionException(
+ String.format("Failed to get the jar file with specified
path: %s.", jarPath));
Review comment:
Please don't lose the original `IOException` when you wrapping a new
exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]