This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new f44affdad minor improvement
f44affdad is described below

commit f44affdad6368a7566ba694762765e606cac8435
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 28 15:18:11 2023 +0800

    minor improvement
---
 .../apache/streampark/flink/client/FlinkClient.scala   | 14 --------------
 .../streampark/flink/proxy/FlinkShimsProxy.scala       | 18 ++++++------------
 2 files changed, 6 insertions(+), 26 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index 14d063e88..c2ea9ba48 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -18,13 +18,10 @@
 package org.apache.streampark.flink.client
 
 import org.apache.streampark.common.conf.FlinkVersion
-import org.apache.streampark.common.enums.ExecutionMode
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.proxy.FlinkShimsProxy
 
-import java.net.URL
-
 import scala.language.{implicitConversions, reflectiveCalls}
 import scala.reflect.ClassTag
 
@@ -72,20 +69,9 @@ object FlinkClient extends Logger {
       request: Object,
       flinkVersion: FlinkVersion,
       requestBody: (String, String)): T = {
-
-    val classpath = request match {
-      case request: SubmitRequest =>
-        request.executionMode match {
-          case ExecutionMode.REMOTE => List(request.userJarFile.toURI.toURL)
-          case _ => List.empty[URL]
-        }
-      case _ => List.empty[URL]
-    }
-
     flinkVersion.checkVersion()
     FlinkShimsProxy.proxy(
       flinkVersion,
-      classpath,
       (classLoader: ClassLoader) => {
         val submitClass = 
classLoader.loadClass(FLINK_CLIENT_HANDLER_CLASS_NAME)
         val requestClass = classLoader.loadClass(requestBody._1)
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 016c51873..0307c125f 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -22,7 +22,6 @@ import org.apache.streampark.common.util.{ClassLoaderUtils, 
Logger, Utils}
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
 import java.net.URL
-import java.sql.DriverManager
 import java.util.function.{Function => JavaFunc}
 import java.util.regex.Pattern
 
@@ -56,8 +55,8 @@ object FlinkShimsProxy extends Logger {
    * @tparam T
    * @return
    */
-  def proxy[T](flinkVersion: FlinkVersion, classPaths: List[URL], func: 
ClassLoader => T): T = {
-    val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion, classPaths)
+  def proxy[T](flinkVersion: FlinkVersion, func: ClassLoader => T): T = {
+    val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
     ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => 
func(shimsClassLoader))
   }
 
@@ -71,11 +70,8 @@ object FlinkShimsProxy extends Logger {
    * @tparam T
    * @return
    */
-  def proxy[T](
-      flinkVersion: FlinkVersion,
-      classPaths: List[URL],
-      func: JavaFunc[ClassLoader, T]): T = {
-    val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion, classPaths)
+  def proxy[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T 
= {
+    val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
     ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => 
func(shimsClassLoader))
   }
 
@@ -161,15 +157,13 @@ object FlinkShimsProxy extends Logger {
     ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => 
func(shimsClassLoader))
   }
 
-  private[this] def getFlinkShimsClassLoader(
-      flinkVersion: FlinkVersion,
-      classPaths: List[URL]): ClassLoader = {
+  private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion): 
ClassLoader = {
     logInfo(s"add flink shims urls classloader,flink version: $flinkVersion")
     SHIMS_CLASS_LOADER_CACHE.getOrElseUpdate(
       s"${flinkVersion.fullVersion}", {
         // 1) flink/lib
         val libURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib", 
!_.getName.startsWith("log4j"))
-        val shimsUrls = ListBuffer[URL](libURL: _*) ++ classPaths
+        val shimsUrls = ListBuffer[URL](libURL: _*)
 
         // 2) add all shims jar
         addShimsUrls(

Reply via email to