lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r934269500


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java:
##########
@@ -29,10 +31,13 @@ public class SQLJobSubmission {
 
     private final List<String> sqlLines;
     private final List<String> jars;
+    private final Consumer<Map<String, String>> envProcessor;

Review Comment:
   Revert these changes?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java:
##########
@@ -220,6 +220,7 @@ public void submitSQLJob(SQLJobSubmission job, Duration 
timeout) throws IOExcept
         AutoClosableProcess.create(commands.toArray(new String[0]))
                 .setStdInputs(job.getSqlLines().toArray(new String[0]))
                 .setStdoutProcessor(LOG::info) // logging the SQL statements 
and error message
+                .setEnv(job.getEnvProcessor())

Review Comment:
   This change is not needed now?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -155,6 +164,23 @@ class TableEnvironmentTest {
     verifyTableEnvironmentExecutionExplain(tEnv)
   }
 
+  @Test
+  def testAddAndShowJar(): Unit = {
+    val jarPath = UserClassLoaderJarTestUtils
+      .createJarFile(
+        tempFolder.newFolder(String.format("test-jar-%s", UUID.randomUUID)),
+        "test-classloader-udf.jar",
+        GENERATED_LOWER_UDF_CLASS,
+        String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS)
+      )
+      .getPath
+
+    tableEnv.executeSql("add JAR '" + jarPath + "'")

Review Comment:
   please use String.format and use upper or lower case uniformly.



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java:
##########
@@ -274,6 +272,7 @@ public void removeJar(String jarPath) {
         classLoader.removeURL(jarURL);
     }
 
+    // TODO: Only related to removeJar in test. Remove it once it has no use.

Review Comment:
   Ditto



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = 
"add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello 
world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends 
org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+
+    private static final Path hadoopClasspath = 
TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new 
LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(
+                "HDFS cluster cannot be started on Windows without 
extensions.",
+                !OperatingSystem.isWindows());
+    }
+
+    @Before
+    public void createHDFS() {
+        try {
+            Configuration hdConf = new Configuration();
+
+            File baseDir = new 
File("./target/hdfs/hdfsTest").getAbsoluteFile();
+            FileUtil.fullyDelete(baseDir);
+            hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+            MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+            hdfsCluster = builder.build();
+
+            hdPath = new org.apache.hadoop.fs.Path("/test.jar");
+            hdfs = hdPath.getFileSystem(hdConf);
+
+            File localUdfJar =
+                    UserClassLoaderJarTestUtils.createJarFile(
+                            tempFolder.newFolder(String.format("test-jar-%s", 
UUID.randomUUID())),
+                            "test-classloader-udf.jar",
+                            GENERATED_LOWER_UDF_CLASS,
+                            String.format(GENERATED_LOWER_UDF_CODE, 
GENERATED_LOWER_UDF_CLASS));
+            hdfs.copyFromLocalFile(new 
org.apache.hadoop.fs.Path(localUdfJar.toURI()), hdPath);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail("Test failed " + e.getMessage());
+        }
+
+        Path tmpPath = tempFolder.getRoot().toPath();
+        LOG.info("The current temporary path: {}", tmpPath);
+        this.result = tmpPath.resolve("result");
+    }
+
+    @After
+    public void destroyHDFS() {
+        try {
+            hdfs.delete(hdPath, false);
+            hdfsCluster.shutdown();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testAddRemoteJar() throws Exception {
+        String remoteJarPath =
+                "hdfs://"
+                        + hdfsCluster.getURI().getHost()
+                        + ":"
+                        + hdfsCluster.getNameNodePort()
+                        + "/"
+                        + hdPath;
+
+        try (ClusterController clusterController = flink.startCluster(1)) {
+            Map<String, String> varsMap = new HashMap<>();
+            varsMap.put("$RESULT", this.result.toAbsolutePath().toString());
+            varsMap.put("$MODE", this.executionMode);
+            varsMap.put("$JAR_PATH", remoteJarPath);
+
+            List<String> sqlLines = initializeSqlLines(varsMap);
+
+            executeSqlStatements(clusterController, sqlLines);
+
+            checkJsonResultFile();
+        }
+    }
+
+    private List<String> initializeSqlLines(Map<String, String> vars) throws 
IOException {

Review Comment:
   Here many duplicated method with `PlannerScalaFreeITCase`, so I think we can 
extract an base class to reuse the common method.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = 
"add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello 
world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends 
org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+
+    private static final Path hadoopClasspath = 
TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new 
LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(
+                "HDFS cluster cannot be started on Windows without 
extensions.",
+                !OperatingSystem.isWindows());
+    }
+
+    @Before
+    public void createHDFS() {
+        try {
+            Configuration hdConf = new Configuration();
+
+            File baseDir = new 
File("./target/hdfs/hdfsTest").getAbsoluteFile();

Review Comment:
   Use subDir of current test directory directly?



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = 
"add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello 
world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends 
org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+
+    private static final Path hadoopClasspath = 
TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new 
LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(

Review Comment:
   Why in window, hdfs doesn't work?



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = 
"add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello 
world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends 
org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+
+    private static final Path hadoopClasspath = 
TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new 
LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;

Review Comment:
   Wether can we crate hdfs cluster like `HBaseResource` ? I think this also 
would be benefit to other hdfs related test.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1132,6 +1155,10 @@ public TableResultInternal executeInternal(Operation 
operation) {
             return dropSystemFunction((DropTempSystemFunctionOperation) 
operation);
         } else if (operation instanceof AlterCatalogFunctionOperation) {
             return alterCatalogFunction((AlterCatalogFunctionOperation) 
operation);
+        } else if (operation instanceof AddJarOperation) {
+            return addJar((AddJarOperation) operation);
+        } else if (operation instanceof ShowJarsOperation) {
+            return buildShowResult("jars", listJars());

Review Comment:
   After investigate hive[1] and spark[2], I think we don't need to return the 
title of "jars",  keep the origin behavior, just return the jar list enough.
   
   1. 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli#LanguageManualCli-HiveCLI
   2. 
https://spark.apache.org/docs/latest/sql-ref-syntax-aux-resource-mgmt-list-jar.html
   
   However, TableEnvironment requires us to specify the schema and data when we 
need to return result to client, so here I think we have to specify the schema 
name to `jars`.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AddAndShowJarITCase.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Add and show jar ITCase. */
+public class AddAndShowJarITCase extends StreamingTestBase {
+    private String jarPath;
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        jarPath =
+                UserClassLoaderJarTestUtils.createJarFile(
+                                TEMPORARY_FOLDER.newFolder(
+                                        String.format("test-jar-%s", 
UUID.randomUUID())),
+                                "test-classloader-udf.jar",
+                                GENERATED_LOWER_UDF_CLASS,
+                                String.format(GENERATED_LOWER_UDF_CODE, 
GENERATED_LOWER_UDF_CLASS))
+                        .getPath();
+    }
+
+    @Test
+    public void testAddAndShowJar() {

Review Comment:
   Ditto



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -155,6 +164,23 @@ class TableEnvironmentTest {
     verifyTableEnvironmentExecutionExplain(tEnv)
   }
 
+  @Test
+  def testAddAndShowJar(): Unit = {
+    val jarPath = UserClassLoaderJarTestUtils
+      .createJarFile(
+        tempFolder.newFolder(String.format("test-jar-%s", UUID.randomUUID)),
+        "test-classloader-udf.jar",
+        GENERATED_LOWER_UDF_CLASS,
+        String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS)
+      )
+      .getPath
+
+    tableEnv.executeSql("add JAR '" + jarPath + "'")
+    val tableResult = tableEnv.executeSql("SHOW JARS")
+
+    checkData(util.Arrays.asList(Row.of(jarPath)).iterator(), 
tableResult.collect())

Review Comment:
   Convert the result to list, then assert it? 
`CollectionUtil.iteratorToList(tableResult.collect())`.
   Moreover, please also assert the `ResultKind` such as `    
assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
   `



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {

Review Comment:
   Please use junit5 for new added test, refer to 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java:
##########
@@ -156,7 +156,8 @@ public void testAddJarWithRelativePath() throws IOException 
{
 
     @Test
     public void testAddIllegalJar() {
-        validateAddJarWithException("/path/to/illegal.jar", "JAR file does not 
exist");
+        validateAddJarWithException(

Review Comment:
   Please port the related `add jar` and `show jars` test backed by 
TableEnvironment Because SessionContext didn't support this now.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -192,31 +192,32 @@ private void checkJarResources(List<ResourceUri> 
resourceUris) throws IOExceptio
         }
 
         for (ResourceUri resourceUri : resourceUris) {
-            // here can check whether the resource path is valid
-            Path path = new Path(resourceUri.getUri());
-            // file name should end with .jar suffix
-            String fileExtension = Files.getFileExtension(path.getName());
-            if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
-                throw new ValidationException(
-                        String.format(
-                                "The registering jar resource [%s] must ends 
with '.jar' suffix.",
-                                path));
-            }
+            checkJarPath(new Path(resourceUri.getUri()));
+        }
+    }
 
-            FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
-            // check resource exists firstly
-            if (!fs.exists(path)) {
-                throw new FileNotFoundException(
-                        String.format("Jar resource [%s] not found.", path));
-            }
+    protected void checkJarPath(Path path) throws IOException {
+        // file name should end with .jar suffix
+        String fileExtension = Files.getFileExtension(path.getName());
+        if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+            throw new ValidationException(
+                    String.format(
+                            "The registering jar resource [%s] must ends with 
'.jar' suffix.",
+                            path));
+        }
 
-            // register directory is not allowed for resource
-            if (fs.getFileStatus(path).isDir()) {
-                throw new ValidationException(
-                        String.format(
-                                "The registering jar resource [%s] is a 
directory that is not allowed.",
-                                path));
-            }
+        FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+        // check resource exists firstly
+        if (!fs.exists(path)) {
+            throw new FileNotFoundException(String.format("Jar resource [%s] 
not found.", path));
+        }
+
+        // register directory is not allowed for resource
+        if (fs.getFileStatus(path).isDir()) {
+            throw new ValidationException(
+                    String.format(
+                            "The registering jar resource [%s] is a directory 
that is not allowed.",

Review Comment:
   This method also called by `unregisterJarResource` method, this message is 
not correct now, so change it to `registering or unregistering`?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java:
##########
@@ -248,6 +248,7 @@ public static SessionContext create(DefaultContext 
defaultContext, String sessio
                 executionContext);
     }
 
+    // TODO: Only related to removeJar in test. Remove it once it has no use.

Review Comment:
   I think we should remove this method, and remove the related tests. Due to 
we port the add jar implementation to TableEnvironment, so we should also port 
the tests backed by TableEnvironment.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -192,31 +192,32 @@ private void checkJarResources(List<ResourceUri> 
resourceUris) throws IOExceptio
         }
 
         for (ResourceUri resourceUri : resourceUris) {
-            // here can check whether the resource path is valid
-            Path path = new Path(resourceUri.getUri());
-            // file name should end with .jar suffix
-            String fileExtension = Files.getFileExtension(path.getName());
-            if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
-                throw new ValidationException(
-                        String.format(
-                                "The registering jar resource [%s] must ends 
with '.jar' suffix.",
-                                path));
-            }
+            checkJarPath(new Path(resourceUri.getUri()));
+        }
+    }
 
-            FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
-            // check resource exists firstly
-            if (!fs.exists(path)) {
-                throw new FileNotFoundException(
-                        String.format("Jar resource [%s] not found.", path));
-            }
+    protected void checkJarPath(Path path) throws IOException {
+        // file name should end with .jar suffix
+        String fileExtension = Files.getFileExtension(path.getName());
+        if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+            throw new ValidationException(
+                    String.format(
+                            "The registering jar resource [%s] must ends with 
'.jar' suffix.",

Review Comment:
   This method also called by `unregisterJarResource` method, this message is 
not correct now, so change it to `registering  or unregistering`?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AddAndShowJarITCase.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Add and show jar ITCase. */
+public class AddAndShowJarITCase extends BatchTestBase {
+    private String jarPath;
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        jarPath =
+                UserClassLoaderJarTestUtils.createJarFile(
+                                TEMPORARY_FOLDER.newFolder(
+                                        String.format("test-jar-%s", 
UUID.randomUUID())),
+                                "test-classloader-udf.jar",
+                                GENERATED_LOWER_UDF_CLASS,
+                                String.format(GENERATED_LOWER_UDF_CODE, 
GENERATED_LOWER_UDF_CLASS))
+                        .getPath();
+    }
+
+    @Test
+    public void testAddAndShowJar() {

Review Comment:
   The purpose of this test is not just to test the add jar, we also want to 
test the class in jar can also work from specified query, so we should test one 
query that refer to the UDF class in the jar. 
   I think we don't need to introduce this test class, remove this test to 
`FunctionITCase`.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = 
"add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello 
world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =

Review Comment:
   Please implement this udf in **flink-sql-client-test** module, it will be 
packaged into SqlToolbox.jar, then we can use it directly.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {

Review Comment:
   Change this class name to `UsingRemoteJarITCase`? We also add `create 
function ... using jar ...` related tests in the future.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = 
"add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello 
world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends 
org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+
+    private static final Path hadoopClasspath = 
TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new 
LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(
+                "HDFS cluster cannot be started on Windows without 
extensions.",
+                !OperatingSystem.isWindows());
+    }
+
+    @Before
+    public void createHDFS() {
+        try {
+            Configuration hdConf = new Configuration();
+
+            File baseDir = new 
File("./target/hdfs/hdfsTest").getAbsoluteFile();
+            FileUtil.fullyDelete(baseDir);
+            hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+            MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+            hdfsCluster = builder.build();
+
+            hdPath = new org.apache.hadoop.fs.Path("/test.jar");
+            hdfs = hdPath.getFileSystem(hdConf);
+
+            File localUdfJar =
+                    UserClassLoaderJarTestUtils.createJarFile(
+                            tempFolder.newFolder(String.format("test-jar-%s", 
UUID.randomUUID())),
+                            "test-classloader-udf.jar",
+                            GENERATED_LOWER_UDF_CLASS,
+                            String.format(GENERATED_LOWER_UDF_CODE, 
GENERATED_LOWER_UDF_CLASS));
+            hdfs.copyFromLocalFile(new 
org.apache.hadoop.fs.Path(localUdfJar.toURI()), hdPath);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail("Test failed " + e.getMessage());
+        }
+
+        Path tmpPath = tempFolder.getRoot().toPath();
+        LOG.info("The current temporary path: {}", tmpPath);
+        this.result = tmpPath.resolve("result");
+    }
+
+    @After
+    public void destroyHDFS() {
+        try {
+            hdfs.delete(hdPath, false);
+            hdfsCluster.shutdown();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testAddRemoteJar() throws Exception {
+        String remoteJarPath =
+                "hdfs://"

Review Comment:
   Using String.format()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to