This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new f44affdad minor improvement
f44affdad is described below
commit f44affdad6368a7566ba694762765e606cac8435
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 28 15:18:11 2023 +0800
minor improvement
---
.../apache/streampark/flink/client/FlinkClient.scala | 14 --------------
.../streampark/flink/proxy/FlinkShimsProxy.scala | 18 ++++++------------
2 files changed, 6 insertions(+), 26 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index 14d063e88..c2ea9ba48 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -18,13 +18,10 @@
package org.apache.streampark.flink.client
import org.apache.streampark.common.conf.FlinkVersion
-import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.proxy.FlinkShimsProxy
-import java.net.URL
-
import scala.language.{implicitConversions, reflectiveCalls}
import scala.reflect.ClassTag
@@ -72,20 +69,9 @@ object FlinkClient extends Logger {
request: Object,
flinkVersion: FlinkVersion,
requestBody: (String, String)): T = {
-
- val classpath = request match {
- case request: SubmitRequest =>
- request.executionMode match {
- case ExecutionMode.REMOTE => List(request.userJarFile.toURI.toURL)
- case _ => List.empty[URL]
- }
- case _ => List.empty[URL]
- }
-
flinkVersion.checkVersion()
FlinkShimsProxy.proxy(
flinkVersion,
- classpath,
(classLoader: ClassLoader) => {
val submitClass =
classLoader.loadClass(FLINK_CLIENT_HANDLER_CLASS_NAME)
val requestClass = classLoader.loadClass(requestBody._1)
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 016c51873..0307c125f 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -22,7 +22,6 @@ import org.apache.streampark.common.util.{ClassLoaderUtils,
Logger, Utils}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File,
ObjectOutputStream}
import java.net.URL
-import java.sql.DriverManager
import java.util.function.{Function => JavaFunc}
import java.util.regex.Pattern
@@ -56,8 +55,8 @@ object FlinkShimsProxy extends Logger {
* @tparam T
* @return
*/
- def proxy[T](flinkVersion: FlinkVersion, classPaths: List[URL], func:
ClassLoader => T): T = {
- val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion, classPaths)
+ def proxy[T](flinkVersion: FlinkVersion, func: ClassLoader => T): T = {
+ val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () =>
func(shimsClassLoader))
}
@@ -71,11 +70,8 @@ object FlinkShimsProxy extends Logger {
* @tparam T
* @return
*/
- def proxy[T](
- flinkVersion: FlinkVersion,
- classPaths: List[URL],
- func: JavaFunc[ClassLoader, T]): T = {
- val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion, classPaths)
+ def proxy[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T
= {
+ val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () =>
func(shimsClassLoader))
}
@@ -161,15 +157,13 @@ object FlinkShimsProxy extends Logger {
ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () =>
func(shimsClassLoader))
}
- private[this] def getFlinkShimsClassLoader(
- flinkVersion: FlinkVersion,
- classPaths: List[URL]): ClassLoader = {
+ private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion):
ClassLoader = {
logInfo(s"add flink shims urls classloader,flink version: $flinkVersion")
SHIMS_CLASS_LOADER_CACHE.getOrElseUpdate(
s"${flinkVersion.fullVersion}", {
// 1) flink/lib
val libURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib",
!_.getName.startsWith("log4j"))
- val shimsUrls = ListBuffer[URL](libURL: _*) ++ classPaths
+ val shimsUrls = ListBuffer[URL](libURL: _*)
// 2) add all shims jar
addShimsUrls(