This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new cd9db377e [Improve] spark-submit improvements (#3900)
cd9db377e is described below
commit cd9db377e9157774a38f051a7b49735f3d645ba2
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 20 14:20:39 2024 +0800
[Improve] spark-submit improvements (#3900)
---
.../common/util}/ChildFirstClassLoader.scala | 60 ++-------
.../util}/ClassLoaderObjectInputStream.scala | 2 +-
.../flink/client/trait/FlinkClientTrait.scala | 2 +-
.../streampark/flink/proxy/FlinkShimsProxy.scala | 28 +++-
.../spark/client/bean/SubmitRequest.scala | 36 +++--
.../spark/client/conf/SparkConfiguration.scala | 9 +-
.../spark/client/proxy/ChildFirstClassLoader.scala | 132 -------------------
.../proxy/ClassLoaderObjectInputStream.scala | 82 ------------
.../spark/client/proxy/SparkShimsProxy.scala | 39 ++++--
.../spark/client/impl/YarnApplicationClient.scala | 146 +++++++++------------
.../spark/client/trait/SparkClientTrait.scala | 38 ++++--
11 files changed, 175 insertions(+), 399 deletions(-)
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala
similarity index 64%
rename from
streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
rename to
streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala
index ec8f360b5..fabe9e73b 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala
@@ -15,13 +15,11 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.proxy
+package org.apache.streampark.common.util
import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.util
-import java.util.function.Consumer
-import java.util.regex.Pattern
import scala.util.Try
@@ -36,40 +34,12 @@ import scala.util.Try
class ChildFirstClassLoader(
urls: Array[URL],
parent: ClassLoader,
- flinkResourcePattern: Pattern,
- classLoadingExceptionHandler: Consumer[Throwable])
+ parentFirstClasses: List[String],
+ loadJarFilter: String => Boolean)
extends URLClassLoader(urls, parent) {
ClassLoader.registerAsParallelCapable()
- def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern:
Pattern) {
- this(
- urls,
- parent,
- flinkResourcePattern,
- (t: Throwable) => throw t)
- }
-
- ClassLoader.registerAsParallelCapable()
-
- private val FLINK_PATTERN =
- Pattern.compile("flink-(.*).jar", Pattern.CASE_INSENSITIVE |
Pattern.DOTALL)
-
- private val JAR_PROTOCOL = "jar"
-
- private val PARENT_FIRST_PATTERNS = List(
- "java.",
- "javax.xml",
- "org.slf4j",
- "org.apache.log4j",
- "org.apache.logging",
- "org.apache.commons.logging",
- "org.apache.commons.cli",
- "ch.qos.logback",
- "org.xml",
- "org.w3c",
- "org.apache.hadoop")
-
@throws[ClassNotFoundException]
override def loadClass(name: String, resolve: Boolean): Class[_] = {
try {
@@ -78,7 +48,7 @@ class ChildFirstClassLoader(
super.findLoadedClass(name) match {
case null =>
// check whether the class should go parent-first
- PARENT_FIRST_PATTERNS.find(name.startsWith) match {
+ parentFirstClasses.find(name.startsWith) match {
case Some(_) => super.loadClass(name, resolve)
case _ => Try(findClass(name)).getOrElse(super.loadClass(name,
resolve))
}
@@ -90,9 +60,7 @@ class ChildFirstClassLoader(
}
}
} catch {
- case e: Throwable =>
- classLoadingExceptionHandler.accept(e)
- null
+ case e: Throwable => throw e
}
}
@@ -105,20 +73,14 @@ class ChildFirstClassLoader(
}
/**
- * e.g. flinkResourcePattern: flink-1.12 <p> flink-1.12.jar/resource
flink-1.14.jar/resource
- * other.jar/resource \=> after filterFlinkShimsResource \=>
flink-1.12.jar/resource
- * other.jar/resource
- *
* @param urlClassLoaderResource
* @return
*/
- private def filterFlinkShimsResource(urlClassLoaderResource: URL): URL = {
- if (urlClassLoaderResource != null && JAR_PROTOCOL ==
urlClassLoaderResource.getProtocol) {
+ private def filterResource(urlClassLoaderResource: URL): URL = {
+ if (urlClassLoaderResource != null && "jar" ==
urlClassLoaderResource.getProtocol) {
val spec = urlClassLoaderResource.getFile
- val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
- val matchState =
- FLINK_PATTERN.matcher(filename).matches &&
!flinkResourcePattern.matcher(filename).matches
- if (matchState) {
+ val jarName = new File(spec.substring(0, spec.indexOf("!/"))).getName
+ if (loadJarFilter(jarName)) {
return null
}
}
@@ -127,7 +89,7 @@ class ChildFirstClassLoader(
private def addResources(result: util.List[URL], resources:
util.Enumeration[URL]) = {
while (resources.hasMoreElements) {
- val urlClassLoaderResource =
filterFlinkShimsResource(resources.nextElement)
+ val urlClassLoaderResource = filterResource(resources.nextElement)
if (urlClassLoaderResource != null) {
result.add(urlClassLoaderResource)
}
@@ -145,7 +107,7 @@ class ChildFirstClassLoader(
addResources(result, parent.getResources(name))
}
new util.Enumeration[URL]() {
- final private[proxy] val iter = result.iterator
+ private[this] val iter = result.iterator
override def hasMoreElements: Boolean = iter.hasNext
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala
similarity index 98%
rename from
streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala
rename to
streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala
index ae0866292..574c7d51a 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.proxy
+package org.apache.streampark.common.util
import java.io.{InputStream, IOException, ObjectInputStream, ObjectStreamClass}
import java.lang.reflect.Proxy
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 1d328207a..da9187426 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -64,7 +64,7 @@ trait FlinkClientTrait extends Logger {
|--------------------------------------- flink job start
---------------------------------------
| userFlinkHome : ${submitRequest.flinkVersion.flinkHome}
| flinkVersion : ${submitRequest.flinkVersion.version}
- | appName : ${submitRequest.appName}
+ | appName : ${submitRequest.effectiveAppName}
| devMode : ${submitRequest.developmentMode.name()}
| execMode : ${submitRequest.executionMode.name()}
| k8sNamespace : ${submitRequest.kubernetesNamespace}
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 1d95303ac..5dd25ec5f 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.proxy
import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion}
-import org.apache.streampark.common.util.{ClassLoaderUtils, Logger}
+import org.apache.streampark.common.util.{ChildFirstClassLoader,
ClassLoaderObjectInputStream, ClassLoaderUtils, Logger}
import org.apache.streampark.common.util.ImplicitsUtils._
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File,
ObjectOutputStream}
@@ -35,6 +35,8 @@ object FlinkShimsProxy extends Logger {
private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String,
ClassLoader]()
+ private[this] val FLINK_JAR_PATTERN = Pattern.compile("flink-(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+
private[this] val INCLUDE_PATTERN: Pattern =
Pattern.compile("(streampark-shaded-jackson-)(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
private[this] def getFlinkShimsResourcePattern(majorVersion: String) =
@@ -42,6 +44,19 @@ object FlinkShimsProxy extends Logger {
private[this] lazy val FLINK_SHIMS_PREFIX = "streampark-flink-shims_flink"
+ private[this] lazy val PARENT_FIRST_PATTERNS = List(
+ "java.",
+ "javax.xml",
+ "org.slf4j",
+ "org.apache.log4j",
+ "org.apache.logging",
+ "org.apache.commons.logging",
+ "org.apache.commons.cli",
+ "ch.qos.logback",
+ "org.xml",
+ "org.w3c",
+ "org.apache.hadoop")
+
/**
* Get shimsClassLoader to execute for scala API
*
@@ -97,10 +112,16 @@ object FlinkShimsProxy extends Logger {
new ChildFirstClassLoader(
shimsUrls.toArray,
Thread.currentThread().getContextClassLoader,
- getFlinkShimsResourcePattern(flinkVersion.majorVersion))
+ PARENT_FIRST_PATTERNS,
+ jarName => loadJarFilter(jarName, flinkVersion))
})
}
+ private def loadJarFilter(jarName: String, flinkVersion: FlinkVersion):
Boolean = {
+ val childFirstPattern =
getFlinkShimsResourcePattern(flinkVersion.majorVersion)
+ FLINK_JAR_PATTERN.matcher(jarName).matches &&
!childFirstPattern.matcher(jarName).matches
+ }
+
private def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File =>
Unit): Unit = {
val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME)
require(
@@ -177,7 +198,8 @@ object FlinkShimsProxy extends Logger {
new ChildFirstClassLoader(
shimsUrls.toArray,
Thread.currentThread().getContextClassLoader,
- getFlinkShimsResourcePattern(flinkVersion.majorVersion))
+ PARENT_FIRST_PATTERNS,
+ jarName => loadJarFilter(jarName, flinkVersion))
})
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
index 146ccb5d3..2feca4b0f 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
@@ -52,18 +52,25 @@ case class SubmitRequest(
@Nullable buildResult: BuildResult,
@Nullable extraParameter: JavaMap[String, Any]) {
+ val DEFAULT_SUBMIT_PARAM = Map[String, Any](
+ "spark.driver.cores" -> "1",
+ "spark.driver.memory" -> "1g",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "1g")
+
private[this] lazy val appProperties: Map[String, String] = getParameterMap(
KEY_SPARK_PROPERTY_PREFIX)
lazy val appMain: String = this.developmentMode match {
- case FlinkDevelopmentMode.FLINK_SQL =>
- Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS
+ case FlinkDevelopmentMode.FLINK_SQL =>
Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS
case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
}
- lazy val effectiveAppName: String =
- if (this.appName == null) appProperties(KEY_FLINK_APP_NAME)
- else this.appName
+ lazy val effectiveAppName: String = if (this.appName == null) {
+ appProperties(KEY_FLINK_APP_NAME)
+ } else {
+ this.appName
+ }
lazy val libs: List[URL] = {
val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
@@ -71,22 +78,11 @@ case class SubmitRequest(
.getOrElse(List.empty[URL])
}
- lazy val classPaths: List[URL] = sparkVersion.sparkLibs ++ libs
-
- lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
-
- lazy val userJarFile: File = {
+ lazy val userJarPath: String = {
executionMode match {
case _ =>
checkBuildResult()
- new File(buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath)
- }
- }
-
- lazy val safePackageProgram: Boolean = {
- sparkVersion.version.split("\\.").map(_.trim.toInt) match {
- case Array(a, b, c) if a >= 3 => b > 1
- case _ => false
+ buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath
}
}
@@ -133,7 +129,7 @@ case class SubmitRequest(
}
@throws[IOException]
- def isSymlink(file: File): Boolean = {
+ private def isSymlink(file: File): Boolean = {
if (file == null) throw new NullPointerException("File must not be null")
Files.isSymbolicLink(file.toPath)
}
@@ -163,7 +159,7 @@ case class SubmitRequest(
}
@throws[Exception]
- def checkBuildResult(): Unit = {
+ private def checkBuildResult(): Unit = {
executionMode match {
case _ =>
if (this.buildResult == null) {
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
index 99e97d3b6..adf067974 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
@@ -17,11 +17,4 @@
package org.apache.streampark.spark.client.conf
-object SparkConfiguration {
- val defaultParameters = Map[String, Any](
- "spark.driver.cores" -> "1",
- "spark.driver.memory" -> "1g",
- "spark.executor.cores" -> "1",
- "spark.executor.memory" -> "1g")
-
-}
+object SparkConfiguration {}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala
deleted file mode 100644
index 7ca8886cb..000000000
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.spark.client.proxy
-
-import java.io.{File, IOException}
-import java.net.{URL, URLClassLoader}
-import java.util
-import java.util.regex.Pattern
-
-import scala.language.existentials
-import scala.util.Try
-
-/**
- * A variant of the URLClassLoader that first loads from the URLs and only
after that from the
- * parent.
- *
- * <p>{@link # getResourceAsStream ( String )} uses {@link # getResource (
String )} internally so
- * we don't override that.
- */
-
-class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader,
resourcePattern: Pattern)
- extends URLClassLoader(urls, parent) {
-
- ClassLoader.registerAsParallelCapable()
-
- private val SPARK_PATTERN =
- Pattern.compile("spark-(.*).jar", Pattern.CASE_INSENSITIVE |
Pattern.DOTALL)
-
- private val JAR_PROTOCOL = "jar"
-
- private val PARENT_FIRST_PATTERNS = List(
- "java.",
- "javax.xml",
- "org.slf4j",
- "org.apache.log4j",
- "org.apache.logging",
- "org.apache.commons.logging",
- "ch.qos.logback",
- "org.xml",
- "org.w3c",
- "org.apache.hadoop")
-
- @throws[ClassNotFoundException]
- override def loadClass(name: String, resolve: Boolean): Class[_] = {
- this.synchronized {
- // First, check if the class has already been loaded
- val clazz = super.findLoadedClass(name) match {
- case null =>
- // check whether the class should go parent-first
- for (parentFirstPattern <- PARENT_FIRST_PATTERNS) {
- if (name.startsWith(parentFirstPattern)) {
- super.loadClass(name, resolve)
- }
- }
- Try(findClass(name)).getOrElse(super.loadClass(name, resolve))
- case c: Class[_] =>
- if (resolve) {
- resolveClass(c)
- }
- c
- }
- clazz
- }
- }
-
- override def getResource(name: String): URL = {
- // first, try and find it via the URLClassloader
- val urlClassLoaderResource = findResource(name)
- if (urlClassLoaderResource != null) return urlClassLoaderResource
- // delegate to super
- super.getResource(name)
- }
-
- private def filterShimsResource(urlClassLoaderResource: URL): URL = {
- if (urlClassLoaderResource != null && JAR_PROTOCOL ==
urlClassLoaderResource.getProtocol) {
- val spec = urlClassLoaderResource.getFile
- val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
- val matchState =
- SPARK_PATTERN.matcher(filename).matches && !resourcePattern
- .matcher(filename)
- .matches
- if (matchState) {
- return null
- }
- }
- urlClassLoaderResource
- }
-
- private def addResources(result: util.List[URL], resources:
util.Enumeration[URL]) = {
- while (resources.hasMoreElements) {
- val urlClassLoaderResource = filterShimsResource(resources.nextElement)
- if (urlClassLoaderResource != null) {
- result.add(urlClassLoaderResource)
- }
- }
- result
- }
-
- @throws[IOException]
- override def getResources(name: String): util.Enumeration[URL] = {
- // first get resources from URLClassloader
- val result = addResources(new util.ArrayList[URL], findResources(name))
- val parent = getParent
- if (parent != null) {
- // get parent urls
- addResources(result, parent.getResources(name))
- }
- new util.Enumeration[URL]() {
- final private[proxy] val iter = result.iterator
-
- override def hasMoreElements: Boolean = iter.hasNext
-
- override def nextElement: URL = iter.next
- }
- }
-
-}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala
deleted file mode 100644
index a03c97018..000000000
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.spark.client.proxy
-
-import java.io.{InputStream, IOException, ObjectInputStream, ObjectStreamClass}
-import java.lang.reflect.Proxy
-
-import scala.util.Try
-
-/**
- * A special ObjectInputStream that loads a class based on a specified
<code>ClassLoader</code>
- * rather than the system default. <p> This is useful in dynamic container
environments.
- *
- * @since 1.1
- */
-class ClassLoaderObjectInputStream(classLoader: ClassLoader, inputStream:
InputStream)
- extends ObjectInputStream(inputStream) {
-
- /**
- * Resolve a class specified by the descriptor using the specified
ClassLoader or the super
- * ClassLoader.
- *
- * @param objectStreamClass
- * descriptor of the class
- * @return
- * the Class object described by the ObjectStreamClass
- * @throws IOException
- * in case of an I/O error
- * @throws ClassNotFoundException
- * if the Class cannot be found
- */
- @throws[IOException]
- @throws[ClassNotFoundException]
- override protected def resolveClass(objectStreamClass: ObjectStreamClass):
Class[_] = {
- // delegate to super class loader which can resolve primitives
- Try(Class.forName(objectStreamClass.getName, false, classLoader))
- .getOrElse(super.resolveClass(objectStreamClass))
- }
-
- /**
- * Create a proxy class that implements the specified interfaces using the
specified ClassLoader
- * or the super ClassLoader.
- *
- * @param interfaces
- * the interfaces to implement
- * @return
- * a proxy class implementing the interfaces
- * @throws IOException
- * in case of an I/O error
- * @throws ClassNotFoundException
- * if the Class cannot be found
- * @see
- * ObjectInputStream#resolveProxyClass(String[])
- * @since 2.1
- */
- @throws[IOException]
- @throws[ClassNotFoundException]
- override protected def resolveProxyClass(interfaces: Array[String]):
Class[_] = {
- val interfaceClasses = new Array[Class[_]](interfaces.length)
- for (i <- interfaces.indices) {
- interfaceClasses(i) = Class.forName(interfaces(i), false, classLoader)
- }
- Try(Proxy.getProxyClass(classLoader, interfaceClasses: _*))
- .getOrElse(super.resolveProxyClass(interfaces))
- }
-
-}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index 5cdfb9063..97e749e2b 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.spark.client.proxy
import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{ConfigKeys, SparkVersion}
-import org.apache.streampark.common.util.{ClassLoaderUtils, Logger}
+import org.apache.streampark.common.util.{ChildFirstClassLoader,
ClassLoaderObjectInputStream, ClassLoaderUtils, Logger}
import org.apache.streampark.common.util.ImplicitsUtils._
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File,
ObjectOutputStream}
@@ -33,21 +33,31 @@ object SparkShimsProxy extends Logger {
private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String,
ClassLoader]()
- private[this] val VERIFY_SQL_CLASS_LOADER_CACHE =
- MutableMap[String, ClassLoader]()
+ private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String,
ClassLoader]()
- private[this] val INCLUDE_PATTERN: Pattern =
- Pattern.compile(
- "(streampark-shaded-jackson-)(.*).jar",
- Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+ private[this] val INCLUDE_PATTERN: Pattern =
Pattern.compile("(streampark-shaded-jackson-)(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
private[this] def getSparkShimsResourcePattern(sparkLargeVersion: String) =
Pattern.compile(
s"spark-(.*)-$sparkLargeVersion(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+ private[this] lazy val SPARK_JAR_PATTERN = Pattern.compile("spark-(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+
private[this] lazy val SPARK_SHIMS_PREFIX = "streampark-spark-shims_spark"
+ private[this] lazy val PARENT_FIRST_PATTERNS = List(
+ "java.",
+ "javax.xml",
+ "org.slf4j",
+ "org.apache.log4j",
+ "org.apache.logging",
+ "org.apache.commons.logging",
+ "ch.qos.logback",
+ "org.xml",
+ "org.w3c",
+ "org.apache.hadoop")
+
def proxy[T](sparkVersion: SparkVersion, func: ClassLoader => T): T = {
val shimsClassLoader = getSparkShimsClassLoader(sparkVersion)
ClassLoaderUtils
@@ -61,7 +71,7 @@ object SparkShimsProxy extends Logger {
}
// need to load all spark-table dependencies compatible with different
versions
- def getVerifySqlLibClassLoader(sparkVersion: SparkVersion): ClassLoader = {
+ private def getVerifySqlLibClassLoader(sparkVersion: SparkVersion):
ClassLoader = {
logInfo(s"Add verify sql lib,spark version: $sparkVersion")
VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(
s"${sparkVersion.fullVersion}", {
@@ -87,11 +97,17 @@ object SparkShimsProxy extends Logger {
new ChildFirstClassLoader(
shimsUrls.toArray,
Thread.currentThread().getContextClassLoader,
- getSparkShimsResourcePattern(sparkVersion.majorVersion))
+ PARENT_FIRST_PATTERNS,
+ jarName => loadJarFilter(jarName, sparkVersion))
})
}
- def addShimsUrls(sparkVersion: SparkVersion, addShimUrl: File => Unit): Unit
= {
+ private def loadJarFilter(jarName: String, sparkVersion: SparkVersion):
Boolean = {
+ val childFirstPattern =
getSparkShimsResourcePattern(sparkVersion.majorVersion)
+ SPARK_JAR_PATTERN.matcher(jarName).matches &&
!childFirstPattern.matcher(jarName).matches
+ }
+
+ private def addShimsUrls(sparkVersion: SparkVersion, addShimUrl: File =>
Unit): Unit = {
val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME)
require(
appHome != null,
@@ -155,7 +171,8 @@ object SparkShimsProxy extends Logger {
new ChildFirstClassLoader(
shimsUrls.toArray,
Thread.currentThread().getContextClassLoader,
- getSparkShimsResourcePattern(sparkVersion.majorVersion))
+ PARENT_FIRST_PATTERNS,
+ jarName => loadJarFilter(jarName, sparkVersion))
})
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
index 9870799c0..af5776f18 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
@@ -17,27 +17,23 @@
package org.apache.streampark.spark.client.impl
-import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.SparkExecutionMode
import org.apache.streampark.common.util.HadoopUtils
-import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.streampark.spark.client.`trait`.SparkClientTrait
import org.apache.streampark.spark.client.bean._
-import org.apache.streampark.spark.client.conf.SparkConfiguration
import org.apache.commons.collections.MapUtils
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
-import java.util.concurrent.{CountDownLatch, Executors, ExecutorService}
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.convert.ImplicitConversions._
+import scala.util.{Failure, Success, Try}
/** yarn application mode submit */
object YarnApplicationClient extends SparkClientTrait {
- private val threadPool: ExecutorService = Executors.newFixedThreadPool(1)
-
- private[this] lazy val workspace = Workspace.remote
-
override def doStop(stopRequest: StopRequest): StopResponse = {
HadoopUtils.yarnClient.killApplication(ApplicationId.fromString(stopRequest.jobId))
null
@@ -46,96 +42,80 @@ object YarnApplicationClient extends SparkClientTrait {
override def setConfig(submitRequest: SubmitRequest): Unit = {}
override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = {
- launch(submitRequest)
+ // 1) prepare sparkLauncher
+ val launcher: SparkLauncher = prepareSparkLauncher(submitRequest)
+
+ // 2) set spark config
+ setSparkConfig(submitRequest, launcher)
+
+ // 3) launch
+ Try(launch(launcher)) match {
+ case Success(handle: SparkAppHandle) =>
+ logger.info(s"[StreamPark][YarnApplicationClient] spark job:
${submitRequest.effectiveAppName} is submit successful, " +
+ s"appid: ${handle.getAppId}, " +
+ s"state: ${handle.getState}")
+ SubmitResponse(null, null, handle.getAppId)
+ case Failure(e) => throw e
+ }
+ }
+
+ private def launch(sparkLauncher: SparkLauncher): SparkAppHandle = {
+ logger.info("[StreamPark][YarnApplicationClient] The spark task start")
+ val submitFinished: CountDownLatch = new CountDownLatch(1)
+ val sparkAppHandle = sparkLauncher.startApplication(new
SparkAppHandle.Listener() {
+ override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {}
+ override def stateChanged(handle: SparkAppHandle): Unit = {
+ if (handle.getAppId != null) {
+ logger.info("{} stateChanged :{}", Array(handle.getAppId,
handle.getState.toString))
+ } else {
+ logger.info("stateChanged :{}", handle.getState.toString)
+ }
+ if (SparkAppHandle.State.FAILED == handle.getState) {
+ logger.error("Task run failure stateChanged :{}",
handle.getState.toString)
+ }
+ if (handle.getState.isFinal) {
+ submitFinished.countDown()
+ }
+ }
+ })
+ submitFinished.await()
+ sparkAppHandle
}
- private def launch(submitRequest: SubmitRequest): SubmitResponse = {
- val launcher: SparkLauncher = new SparkLauncher()
+ private def prepareSparkLauncher(submitRequest: SubmitRequest) = {
+ new SparkLauncher()
.setSparkHome(submitRequest.sparkVersion.sparkHome)
- .setAppResource(submitRequest.buildResult
- .asInstanceOf[ShadedBuildResponse]
- .shadedJarPath)
+ .setAppResource(submitRequest.userJarPath)
.setMainClass(submitRequest.appMain)
+ .setAppName(submitRequest.effectiveAppName)
+ .setConf(
+ "spark.yarn.jars",
+ submitRequest.hdfsWorkspace.sparkLib + "/*.jar")
+ .setVerbose(true)
.setMaster("yarn")
.setDeployMode(submitRequest.executionMode match {
case SparkExecutionMode.YARN_CLIENT => "client"
case SparkExecutionMode.YARN_CLUSTER => "cluster"
case _ =>
- throw new IllegalArgumentException(
+ throw new UnsupportedOperationException(
"[StreamPark][YarnApplicationClient] Yarn mode only support
\"client\" and \"cluster\".")
-
})
- .setAppName(submitRequest.appName)
- .setConf(
- "spark.yarn.jars",
- submitRequest
- .hdfsWorkspace
- .sparkLib + "/*.jar")
- .setVerbose(true)
-
- import scala.collection.JavaConverters._
- setDynamicProperties(launcher, submitRequest.properties.asScala.toMap)
+ }
- // TODO: Adds command line arguments for the application.
- // launcher.addAppArgs()
+ private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher:
SparkLauncher): Unit = {
+ logger.info("[StreamPark][SparkClient][YarnApplicationClient] set spark
configuration.")
+ // 1) set spark conf
+ submitRequest.properties.foreach(prop => {
+ val k = prop._1
+ val v = prop._2.toString
+ logInfo(s"| $k : $v")
+ sparkLauncher.setConf(k, v)
+ })
+ // 2) appArgs...
if (MapUtils.isNotEmpty(submitRequest.extraParameter) &&
submitRequest.extraParameter
.containsKey("sql")) {
- launcher.addAppArgs("--sql",
submitRequest.extraParameter.get("sql").toString)
- }
-
- logger.info("[StreamPark][YarnApplicationClient] The spark task start")
- val cdlForApplicationId: CountDownLatch = new CountDownLatch(1)
-
- var sparkAppHandle: SparkAppHandle = null
- threadPool.execute(new Runnable {
- override def run(): Unit = {
- try {
- val countDownLatch: CountDownLatch = new CountDownLatch(1)
- sparkAppHandle = launcher.startApplication(new
SparkAppHandle.Listener() {
- override def stateChanged(handle: SparkAppHandle): Unit = {
- if (handle.getAppId != null) {
- if (cdlForApplicationId.getCount != 0) {
- cdlForApplicationId.countDown()
- }
- logger.info("{} stateChanged :{}", Array(handle.getAppId,
handle.getState.toString))
- } else logger.info("stateChanged :{}", handle.getState.toString)
-
- if (SparkAppHandle.State.FAILED.toString ==
handle.getState.toString) {
- logger.error("Task run failure stateChanged :{}",
handle.getState.toString)
- }
-
- if (handle.getState.isFinal) {
- countDownLatch.countDown()
- }
- }
-
- override def infoChanged(handle: SparkAppHandle): Unit = {}
- })
- countDownLatch.await()
- } catch {
- case e: Exception =>
- logger.error(e.getMessage, e)
- }
- }
- })
-
- cdlForApplicationId.await()
- logger.info(
- "[StreamPark][YarnApplicationClient] The task is executing, handle
current state is {}, appid is {}",
- Array(sparkAppHandle.getState.toString, sparkAppHandle.getAppId))
- SubmitResponse(null, null, sparkAppHandle.getAppId)
- }
-
- private def setDynamicProperties(sparkLauncher: SparkLauncher, properties:
Map[String, Any]): Unit = {
- logger.info("[StreamPark][YarnApplicationClient] Spark launcher start
configuration.")
- val finalProperties: Map[String, Any] =
SparkConfiguration.defaultParameters ++ properties
- for ((k, v) <- finalProperties) {
- if (k.startsWith("spark.")) {
- sparkLauncher.setConf(k, v.toString)
- } else {
- logger.info("[StreamPark][YarnApplicationClient] \"{}\" doesn't start
with \"spark.\". Skip it.", k)
- }
+ sparkLauncher.addAppArgs("--sql",
submitRequest.extraParameter.get("sql").toString)
}
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
index 93f32aad0..af7778ebc 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util._
import org.apache.streampark.spark.client.bean._
import scala.collection.convert.ImplicitConversions._
+import scala.util.{Failure, Success, Try}
trait SparkClientTrait extends Logger {
@@ -31,7 +32,7 @@ trait SparkClientTrait extends Logger {
|--------------------------------------- spark job start
-----------------------------------
| userSparkHome : ${submitRequest.sparkVersion.sparkHome}
| sparkVersion : ${submitRequest.sparkVersion.version}
- | appName : ${submitRequest.appName}
+ | appName : ${submitRequest.effectiveAppName}
| devMode : ${submitRequest.developmentMode.name()}
| execMode : ${submitRequest.executionMode.name()}
| applicationType : ${submitRequest.applicationType.getName}
@@ -41,17 +42,19 @@ trait SparkClientTrait extends Logger {
|-------------------------------------------------------------------------------------------
|""".stripMargin)
- submitRequest.developmentMode match {
- case _ =>
- if (submitRequest.userJarFile != null) {
- val uri = submitRequest.userJarFile.getAbsolutePath
- }
- }
+ prepareConfig(submitRequest)
setConfig(submitRequest)
- doSubmit(submitRequest)
-
+ Try(doSubmit(submitRequest)) match {
+ case Success(resp) => resp
+ case Failure(e) =>
+ logError(
+ s"spark job ${submitRequest.appName} start failed, " +
+ s"executionMode: ${submitRequest.executionMode.getName}, " +
+ s"detail: ${ExceptionUtils.stringifyException(e)}")
+ throw e
+ }
}
def setConfig(submitRequest: SubmitRequest): Unit
@@ -78,4 +81,21 @@ trait SparkClientTrait extends Logger {
@throws[Exception]
def doStop(stopRequest: StopRequest): StopResponse
+ private def prepareConfig(submitRequest: SubmitRequest): Unit = {
+ // 1) set default config
+ val userConfig = submitRequest.properties.filter(c => {
+ val k = c._1
+ if (k.startsWith("spark.")) {
+ true
+ } else {
+ logger.warn("[StreamPark] config {} doesn't start with \"spark.\" Skip
it.", k)
+ false
+ }
+ })
+ val defaultConfig = submitRequest.DEFAULT_SUBMIT_PARAM.filter(c =>
!userConfig.containsKey(c._1))
+ submitRequest.properties.clear()
+ submitRequest.properties.putAll(userConfig)
+ submitRequest.properties.putAll(defaultConfig)
+ }
+
}