This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new cd9db377e [Improve] spark-submit improvements (#3900)
cd9db377e is described below

commit cd9db377e9157774a38f051a7b49735f3d645ba2
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 20 14:20:39 2024 +0800

    [Improve] spark-submit improvements (#3900)
---
 .../common/util}/ChildFirstClassLoader.scala       |  60 ++-------
 .../util}/ClassLoaderObjectInputStream.scala       |   2 +-
 .../flink/client/trait/FlinkClientTrait.scala      |   2 +-
 .../streampark/flink/proxy/FlinkShimsProxy.scala   |  28 +++-
 .../spark/client/bean/SubmitRequest.scala          |  36 +++--
 .../spark/client/conf/SparkConfiguration.scala     |   9 +-
 .../spark/client/proxy/ChildFirstClassLoader.scala | 132 -------------------
 .../proxy/ClassLoaderObjectInputStream.scala       |  82 ------------
 .../spark/client/proxy/SparkShimsProxy.scala       |  39 ++++--
 .../spark/client/impl/YarnApplicationClient.scala  | 146 +++++++++------------
 .../spark/client/trait/SparkClientTrait.scala      |  38 ++++--
 11 files changed, 175 insertions(+), 399 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala
similarity index 64%
rename from 
streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
rename to 
streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala
index ec8f360b5..fabe9e73b 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala
@@ -15,13 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.proxy
+package org.apache.streampark.common.util
 
 import java.io.{File, IOException}
 import java.net.{URL, URLClassLoader}
 import java.util
-import java.util.function.Consumer
-import java.util.regex.Pattern
 
 import scala.util.Try
 
@@ -36,40 +34,12 @@ import scala.util.Try
 class ChildFirstClassLoader(
     urls: Array[URL],
     parent: ClassLoader,
-    flinkResourcePattern: Pattern,
-    classLoadingExceptionHandler: Consumer[Throwable])
+    parentFirstClasses: List[String],
+    loadJarFilter: String => Boolean)
   extends URLClassLoader(urls, parent) {
 
   ClassLoader.registerAsParallelCapable()
 
-  def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern: 
Pattern) {
-    this(
-      urls,
-      parent,
-      flinkResourcePattern,
-      (t: Throwable) => throw t)
-  }
-
-  ClassLoader.registerAsParallelCapable()
-
-  private val FLINK_PATTERN =
-    Pattern.compile("flink-(.*).jar", Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL)
-
-  private val JAR_PROTOCOL = "jar"
-
-  private val PARENT_FIRST_PATTERNS = List(
-    "java.",
-    "javax.xml",
-    "org.slf4j",
-    "org.apache.log4j",
-    "org.apache.logging",
-    "org.apache.commons.logging",
-    "org.apache.commons.cli",
-    "ch.qos.logback",
-    "org.xml",
-    "org.w3c",
-    "org.apache.hadoop")
-
   @throws[ClassNotFoundException]
   override def loadClass(name: String, resolve: Boolean): Class[_] = {
     try {
@@ -78,7 +48,7 @@ class ChildFirstClassLoader(
         super.findLoadedClass(name) match {
           case null =>
             // check whether the class should go parent-first
-            PARENT_FIRST_PATTERNS.find(name.startsWith) match {
+            parentFirstClasses.find(name.startsWith) match {
               case Some(_) => super.loadClass(name, resolve)
               case _ => Try(findClass(name)).getOrElse(super.loadClass(name, 
resolve))
             }
@@ -90,9 +60,7 @@ class ChildFirstClassLoader(
         }
       }
     } catch {
-      case e: Throwable =>
-        classLoadingExceptionHandler.accept(e)
-        null
+      case e: Throwable => throw e
     }
   }
 
@@ -105,20 +73,14 @@ class ChildFirstClassLoader(
   }
 
   /**
-   * e.g. flinkResourcePattern: flink-1.12 <p> flink-1.12.jar/resource 
flink-1.14.jar/resource
-   * other.jar/resource \=> after filterFlinkShimsResource \=> 
flink-1.12.jar/resource
-   * other.jar/resource
-   *
    * @param urlClassLoaderResource
    * @return
    */
-  private def filterFlinkShimsResource(urlClassLoaderResource: URL): URL = {
-    if (urlClassLoaderResource != null && JAR_PROTOCOL == 
urlClassLoaderResource.getProtocol) {
+  private def filterResource(urlClassLoaderResource: URL): URL = {
+    if (urlClassLoaderResource != null && "jar" == 
urlClassLoaderResource.getProtocol) {
       val spec = urlClassLoaderResource.getFile
-      val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
-      val matchState =
-        FLINK_PATTERN.matcher(filename).matches && 
!flinkResourcePattern.matcher(filename).matches
-      if (matchState) {
+      val jarName = new File(spec.substring(0, spec.indexOf("!/"))).getName
+      if (loadJarFilter(jarName)) {
         return null
       }
     }
@@ -127,7 +89,7 @@ class ChildFirstClassLoader(
 
   private def addResources(result: util.List[URL], resources: 
util.Enumeration[URL]) = {
     while (resources.hasMoreElements) {
-      val urlClassLoaderResource = 
filterFlinkShimsResource(resources.nextElement)
+      val urlClassLoaderResource = filterResource(resources.nextElement)
       if (urlClassLoaderResource != null) {
         result.add(urlClassLoaderResource)
       }
@@ -145,7 +107,7 @@ class ChildFirstClassLoader(
       addResources(result, parent.getResources(name))
     }
     new util.Enumeration[URL]() {
-      final private[proxy] val iter = result.iterator
+      private[this] val iter = result.iterator
 
       override def hasMoreElements: Boolean = iter.hasNext
 
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala
similarity index 98%
rename from 
streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala
rename to 
streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala
index ae0866292..574c7d51a 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.proxy
+package org.apache.streampark.common.util
 
 import java.io.{InputStream, IOException, ObjectInputStream, ObjectStreamClass}
 import java.lang.reflect.Proxy
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 1d328207a..da9187426 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -64,7 +64,7 @@ trait FlinkClientTrait extends Logger {
          |--------------------------------------- flink job start 
---------------------------------------
          |    userFlinkHome    : ${submitRequest.flinkVersion.flinkHome}
          |    flinkVersion     : ${submitRequest.flinkVersion.version}
-         |    appName          : ${submitRequest.appName}
+         |    appName          : ${submitRequest.effectiveAppName}
          |    devMode          : ${submitRequest.developmentMode.name()}
          |    execMode         : ${submitRequest.executionMode.name()}
          |    k8sNamespace     : ${submitRequest.kubernetesNamespace}
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 1d95303ac..5dd25ec5f 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.proxy
 
 import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion}
-import org.apache.streampark.common.util.{ClassLoaderUtils, Logger}
+import org.apache.streampark.common.util.{ChildFirstClassLoader, 
ClassLoaderObjectInputStream, ClassLoaderUtils, Logger}
 import org.apache.streampark.common.util.ImplicitsUtils._
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
@@ -35,6 +35,8 @@ object FlinkShimsProxy extends Logger {
 
   private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, 
ClassLoader]()
 
+  private[this] val FLINK_JAR_PATTERN = Pattern.compile("flink-(.*).jar", 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+
   private[this] val INCLUDE_PATTERN: Pattern = 
Pattern.compile("(streampark-shaded-jackson-)(.*).jar", 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
 
   private[this] def getFlinkShimsResourcePattern(majorVersion: String) =
@@ -42,6 +44,19 @@ object FlinkShimsProxy extends Logger {
 
   private[this] lazy val FLINK_SHIMS_PREFIX = "streampark-flink-shims_flink"
 
+  private[this] lazy val PARENT_FIRST_PATTERNS = List(
+    "java.",
+    "javax.xml",
+    "org.slf4j",
+    "org.apache.log4j",
+    "org.apache.logging",
+    "org.apache.commons.logging",
+    "org.apache.commons.cli",
+    "ch.qos.logback",
+    "org.xml",
+    "org.w3c",
+    "org.apache.hadoop")
+
   /**
    * Get shimsClassLoader to execute for scala API
    *
@@ -97,10 +112,16 @@ object FlinkShimsProxy extends Logger {
         new ChildFirstClassLoader(
           shimsUrls.toArray,
           Thread.currentThread().getContextClassLoader,
-          getFlinkShimsResourcePattern(flinkVersion.majorVersion))
+          PARENT_FIRST_PATTERNS,
+          jarName => loadJarFilter(jarName, flinkVersion))
       })
   }
 
+  private def loadJarFilter(jarName: String, flinkVersion: FlinkVersion): 
Boolean = {
+    val childFirstPattern = 
getFlinkShimsResourcePattern(flinkVersion.majorVersion)
+    FLINK_JAR_PATTERN.matcher(jarName).matches && 
!childFirstPattern.matcher(jarName).matches
+  }
+
   private def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File => 
Unit): Unit = {
     val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME)
     require(
@@ -177,7 +198,8 @@ object FlinkShimsProxy extends Logger {
         new ChildFirstClassLoader(
           shimsUrls.toArray,
           Thread.currentThread().getContextClassLoader,
-          getFlinkShimsResourcePattern(flinkVersion.majorVersion))
+          PARENT_FIRST_PATTERNS,
+          jarName => loadJarFilter(jarName, flinkVersion))
       })
   }
 
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
index 146ccb5d3..2feca4b0f 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
@@ -52,18 +52,25 @@ case class SubmitRequest(
     @Nullable buildResult: BuildResult,
     @Nullable extraParameter: JavaMap[String, Any]) {
 
+  val DEFAULT_SUBMIT_PARAM = Map[String, Any](
+    "spark.driver.cores" -> "1",
+    "spark.driver.memory" -> "1g",
+    "spark.executor.cores" -> "1",
+    "spark.executor.memory" -> "1g")
+
   private[this] lazy val appProperties: Map[String, String] = getParameterMap(
     KEY_SPARK_PROPERTY_PREFIX)
 
   lazy val appMain: String = this.developmentMode match {
-    case FlinkDevelopmentMode.FLINK_SQL =>
-      Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS
+    case FlinkDevelopmentMode.FLINK_SQL => 
Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS
     case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
   }
 
-  lazy val effectiveAppName: String =
-    if (this.appName == null) appProperties(KEY_FLINK_APP_NAME)
-    else this.appName
+  lazy val effectiveAppName: String = if (this.appName == null) {
+    appProperties(KEY_FLINK_APP_NAME)
+  } else {
+    this.appName
+  }
 
   lazy val libs: List[URL] = {
     val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
@@ -71,22 +78,11 @@ case class SubmitRequest(
       .getOrElse(List.empty[URL])
   }
 
-  lazy val classPaths: List[URL] = sparkVersion.sparkLibs ++ libs
-
-  lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
-
-  lazy val userJarFile: File = {
+  lazy val userJarPath: String = {
     executionMode match {
       case _ =>
         checkBuildResult()
-        new File(buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath)
-    }
-  }
-
-  lazy val safePackageProgram: Boolean = {
-    sparkVersion.version.split("\\.").map(_.trim.toInt) match {
-      case Array(a, b, c) if a >= 3 => b > 1
-      case _ => false
+        buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath
     }
   }
 
@@ -133,7 +129,7 @@ case class SubmitRequest(
   }
 
   @throws[IOException]
-  def isSymlink(file: File): Boolean = {
+  private def isSymlink(file: File): Boolean = {
     if (file == null) throw new NullPointerException("File must not be null")
     Files.isSymbolicLink(file.toPath)
   }
@@ -163,7 +159,7 @@ case class SubmitRequest(
   }
 
   @throws[Exception]
-  def checkBuildResult(): Unit = {
+  private def checkBuildResult(): Unit = {
     executionMode match {
       case _ =>
         if (this.buildResult == null) {
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
index 99e97d3b6..adf067974 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
@@ -17,11 +17,4 @@
 
 package org.apache.streampark.spark.client.conf
 
-object SparkConfiguration {
-  val defaultParameters = Map[String, Any](
-    "spark.driver.cores" -> "1",
-    "spark.driver.memory" -> "1g",
-    "spark.executor.cores" -> "1",
-    "spark.executor.memory" -> "1g")
-
-}
+object SparkConfiguration {}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala
deleted file mode 100644
index 7ca8886cb..000000000
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.spark.client.proxy
-
-import java.io.{File, IOException}
-import java.net.{URL, URLClassLoader}
-import java.util
-import java.util.regex.Pattern
-
-import scala.language.existentials
-import scala.util.Try
-
-/**
- * A variant of the URLClassLoader that first loads from the URLs and only 
after that from the
- * parent.
- *
- * <p>{@link # getResourceAsStream ( String )} uses {@link # getResource ( 
String )} internally so
- * we don't override that.
- */
-
-class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, 
resourcePattern: Pattern)
-  extends URLClassLoader(urls, parent) {
-
-  ClassLoader.registerAsParallelCapable()
-
-  private val SPARK_PATTERN =
-    Pattern.compile("spark-(.*).jar", Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL)
-
-  private val JAR_PROTOCOL = "jar"
-
-  private val PARENT_FIRST_PATTERNS = List(
-    "java.",
-    "javax.xml",
-    "org.slf4j",
-    "org.apache.log4j",
-    "org.apache.logging",
-    "org.apache.commons.logging",
-    "ch.qos.logback",
-    "org.xml",
-    "org.w3c",
-    "org.apache.hadoop")
-
-  @throws[ClassNotFoundException]
-  override def loadClass(name: String, resolve: Boolean): Class[_] = {
-    this.synchronized {
-      // First, check if the class has already been loaded
-      val clazz = super.findLoadedClass(name) match {
-        case null =>
-          // check whether the class should go parent-first
-          for (parentFirstPattern <- PARENT_FIRST_PATTERNS) {
-            if (name.startsWith(parentFirstPattern)) {
-              super.loadClass(name, resolve)
-            }
-          }
-          Try(findClass(name)).getOrElse(super.loadClass(name, resolve))
-        case c: Class[_] =>
-          if (resolve) {
-            resolveClass(c)
-          }
-          c
-      }
-      clazz
-    }
-  }
-
-  override def getResource(name: String): URL = {
-    // first, try and find it via the URLClassloader
-    val urlClassLoaderResource = findResource(name)
-    if (urlClassLoaderResource != null) return urlClassLoaderResource
-    // delegate to super
-    super.getResource(name)
-  }
-
-  private def filterShimsResource(urlClassLoaderResource: URL): URL = {
-    if (urlClassLoaderResource != null && JAR_PROTOCOL == 
urlClassLoaderResource.getProtocol) {
-      val spec = urlClassLoaderResource.getFile
-      val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
-      val matchState =
-        SPARK_PATTERN.matcher(filename).matches && !resourcePattern
-          .matcher(filename)
-          .matches
-      if (matchState) {
-        return null
-      }
-    }
-    urlClassLoaderResource
-  }
-
-  private def addResources(result: util.List[URL], resources: 
util.Enumeration[URL]) = {
-    while (resources.hasMoreElements) {
-      val urlClassLoaderResource = filterShimsResource(resources.nextElement)
-      if (urlClassLoaderResource != null) {
-        result.add(urlClassLoaderResource)
-      }
-    }
-    result
-  }
-
-  @throws[IOException]
-  override def getResources(name: String): util.Enumeration[URL] = {
-    // first get resources from URLClassloader
-    val result = addResources(new util.ArrayList[URL], findResources(name))
-    val parent = getParent
-    if (parent != null) {
-      // get parent urls
-      addResources(result, parent.getResources(name))
-    }
-    new util.Enumeration[URL]() {
-      final private[proxy] val iter = result.iterator
-
-      override def hasMoreElements: Boolean = iter.hasNext
-
-      override def nextElement: URL = iter.next
-    }
-  }
-
-}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala
deleted file mode 100644
index a03c97018..000000000
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.spark.client.proxy
-
-import java.io.{InputStream, IOException, ObjectInputStream, ObjectStreamClass}
-import java.lang.reflect.Proxy
-
-import scala.util.Try
-
-/**
- * A special ObjectInputStream that loads a class based on a specified 
<code>ClassLoader</code>
- * rather than the system default. <p> This is useful in dynamic container 
environments.
- *
- * @since 1.1
- */
-class ClassLoaderObjectInputStream(classLoader: ClassLoader, inputStream: 
InputStream)
-  extends ObjectInputStream(inputStream) {
-
-  /**
-   * Resolve a class specified by the descriptor using the specified 
ClassLoader or the super
-   * ClassLoader.
-   *
-   * @param objectStreamClass
-   *   descriptor of the class
-   * @return
-   *   the Class object described by the ObjectStreamClass
-   * @throws IOException
-   *   in case of an I/O error
-   * @throws ClassNotFoundException
-   *   if the Class cannot be found
-   */
-  @throws[IOException]
-  @throws[ClassNotFoundException]
-  override protected def resolveClass(objectStreamClass: ObjectStreamClass): 
Class[_] = {
-    // delegate to super class loader which can resolve primitives
-    Try(Class.forName(objectStreamClass.getName, false, classLoader))
-      .getOrElse(super.resolveClass(objectStreamClass))
-  }
-
-  /**
-   * Create a proxy class that implements the specified interfaces using the 
specified ClassLoader
-   * or the super ClassLoader.
-   *
-   * @param interfaces
-   *   the interfaces to implement
-   * @return
-   *   a proxy class implementing the interfaces
-   * @throws IOException
-   *   in case of an I/O error
-   * @throws ClassNotFoundException
-   *   if the Class cannot be found
-   * @see
-   *   ObjectInputStream#resolveProxyClass(String[])
-   * @since 2.1
-   */
-  @throws[IOException]
-  @throws[ClassNotFoundException]
-  override protected def resolveProxyClass(interfaces: Array[String]): 
Class[_] = {
-    val interfaceClasses = new Array[Class[_]](interfaces.length)
-    for (i <- interfaces.indices) {
-      interfaceClasses(i) = Class.forName(interfaces(i), false, classLoader)
-    }
-    Try(Proxy.getProxyClass(classLoader, interfaceClasses: _*))
-      .getOrElse(super.resolveProxyClass(interfaces))
-  }
-
-}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index 5cdfb9063..97e749e2b 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.spark.client.proxy
 
 import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.{ConfigKeys, SparkVersion}
-import org.apache.streampark.common.util.{ClassLoaderUtils, Logger}
+import org.apache.streampark.common.util.{ChildFirstClassLoader, 
ClassLoaderObjectInputStream, ClassLoaderUtils, Logger}
 import org.apache.streampark.common.util.ImplicitsUtils._
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
@@ -33,21 +33,31 @@ object SparkShimsProxy extends Logger {
 
   private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String, 
ClassLoader]()
 
-  private[this] val VERIFY_SQL_CLASS_LOADER_CACHE =
-    MutableMap[String, ClassLoader]()
+  private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, 
ClassLoader]()
 
-  private[this] val INCLUDE_PATTERN: Pattern =
-    Pattern.compile(
-      "(streampark-shaded-jackson-)(.*).jar",
-      Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+  private[this] val INCLUDE_PATTERN: Pattern = 
Pattern.compile("(streampark-shaded-jackson-)(.*).jar", 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
 
   private[this] def getSparkShimsResourcePattern(sparkLargeVersion: String) =
     Pattern.compile(
       s"spark-(.*)-$sparkLargeVersion(.*).jar",
       Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
 
+  private[this] lazy val SPARK_JAR_PATTERN = Pattern.compile("spark-(.*).jar", 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+
   private[this] lazy val SPARK_SHIMS_PREFIX = "streampark-spark-shims_spark"
 
+  private[this] lazy val PARENT_FIRST_PATTERNS = List(
+    "java.",
+    "javax.xml",
+    "org.slf4j",
+    "org.apache.log4j",
+    "org.apache.logging",
+    "org.apache.commons.logging",
+    "ch.qos.logback",
+    "org.xml",
+    "org.w3c",
+    "org.apache.hadoop")
+
   def proxy[T](sparkVersion: SparkVersion, func: ClassLoader => T): T = {
     val shimsClassLoader = getSparkShimsClassLoader(sparkVersion)
     ClassLoaderUtils
@@ -61,7 +71,7 @@ object SparkShimsProxy extends Logger {
   }
 
   // need to load all spark-table dependencies compatible with different 
versions
-  def getVerifySqlLibClassLoader(sparkVersion: SparkVersion): ClassLoader = {
+  private def getVerifySqlLibClassLoader(sparkVersion: SparkVersion): 
ClassLoader = {
     logInfo(s"Add verify sql lib,spark version: $sparkVersion")
     VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(
       s"${sparkVersion.fullVersion}", {
@@ -87,11 +97,17 @@ object SparkShimsProxy extends Logger {
         new ChildFirstClassLoader(
           shimsUrls.toArray,
           Thread.currentThread().getContextClassLoader,
-          getSparkShimsResourcePattern(sparkVersion.majorVersion))
+          PARENT_FIRST_PATTERNS,
+          jarName => loadJarFilter(jarName, sparkVersion))
       })
   }
 
-  def addShimsUrls(sparkVersion: SparkVersion, addShimUrl: File => Unit): Unit 
= {
+  private def loadJarFilter(jarName: String, sparkVersion: SparkVersion): 
Boolean = {
+    val childFirstPattern = 
getSparkShimsResourcePattern(sparkVersion.majorVersion)
+    SPARK_JAR_PATTERN.matcher(jarName).matches && 
!childFirstPattern.matcher(jarName).matches
+  }
+
+  private def addShimsUrls(sparkVersion: SparkVersion, addShimUrl: File => 
Unit): Unit = {
     val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME)
     require(
       appHome != null,
@@ -155,7 +171,8 @@ object SparkShimsProxy extends Logger {
         new ChildFirstClassLoader(
           shimsUrls.toArray,
           Thread.currentThread().getContextClassLoader,
-          getSparkShimsResourcePattern(sparkVersion.majorVersion))
+          PARENT_FIRST_PATTERNS,
+          jarName => loadJarFilter(jarName, sparkVersion))
       })
   }
 
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
index 9870799c0..af5776f18 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
@@ -17,27 +17,23 @@
 
 package org.apache.streampark.spark.client.impl
 
-import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.SparkExecutionMode
 import org.apache.streampark.common.util.HadoopUtils
-import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 import org.apache.streampark.spark.client.`trait`.SparkClientTrait
 import org.apache.streampark.spark.client.bean._
-import org.apache.streampark.spark.client.conf.SparkConfiguration
 
 import org.apache.commons.collections.MapUtils
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
 
-import java.util.concurrent.{CountDownLatch, Executors, ExecutorService}
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.convert.ImplicitConversions._
+import scala.util.{Failure, Success, Try}
 
 /** yarn application mode submit */
 object YarnApplicationClient extends SparkClientTrait {
 
-  private val threadPool: ExecutorService = Executors.newFixedThreadPool(1)
-
-  private[this] lazy val workspace = Workspace.remote
-
   override def doStop(stopRequest: StopRequest): StopResponse = {
     
HadoopUtils.yarnClient.killApplication(ApplicationId.fromString(stopRequest.jobId))
     null
@@ -46,96 +42,80 @@ object YarnApplicationClient extends SparkClientTrait {
   override def setConfig(submitRequest: SubmitRequest): Unit = {}
 
   override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = {
-    launch(submitRequest)
+    // 1) prepare sparkLauncher
+    val launcher: SparkLauncher = prepareSparkLauncher(submitRequest)
+
+    // 2) set spark config
+    setSparkConfig(submitRequest, launcher)
+
+    // 3) launch
+    Try(launch(launcher)) match {
+      case Success(handle: SparkAppHandle) =>
+        logger.info(s"[StreamPark][YarnApplicationClient] spark job: 
${submitRequest.effectiveAppName} is submit successful, " +
+          s"appid: ${handle.getAppId}, " +
+          s"state: ${handle.getState}")
+        SubmitResponse(null, null, handle.getAppId)
+      case Failure(e) => throw e
+    }
+  }
+
+  private def launch(sparkLauncher: SparkLauncher): SparkAppHandle = {
+    logger.info("[StreamPark][YarnApplicationClient] The spark task start")
+    val submitFinished: CountDownLatch = new CountDownLatch(1)
+    val sparkAppHandle = sparkLauncher.startApplication(new 
SparkAppHandle.Listener() {
+      override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {}
+      override def stateChanged(handle: SparkAppHandle): Unit = {
+        if (handle.getAppId != null) {
+          logger.info("{} stateChanged :{}", Array(handle.getAppId, 
handle.getState.toString))
+        } else {
+          logger.info("stateChanged :{}", handle.getState.toString)
+        }
+        if (SparkAppHandle.State.FAILED == handle.getState) {
+          logger.error("Task run failure stateChanged :{}", 
handle.getState.toString)
+        }
+        if (handle.getState.isFinal) {
+          submitFinished.countDown()
+        }
+      }
+    })
+    submitFinished.await()
+    sparkAppHandle
   }
 
-  private def launch(submitRequest: SubmitRequest): SubmitResponse = {
-    val launcher: SparkLauncher = new SparkLauncher()
+  private def prepareSparkLauncher(submitRequest: SubmitRequest) = {
+    new SparkLauncher()
       .setSparkHome(submitRequest.sparkVersion.sparkHome)
-      .setAppResource(submitRequest.buildResult
-        .asInstanceOf[ShadedBuildResponse]
-        .shadedJarPath)
+      .setAppResource(submitRequest.userJarPath)
       .setMainClass(submitRequest.appMain)
+      .setAppName(submitRequest.effectiveAppName)
+      .setConf(
+        "spark.yarn.jars",
+        submitRequest.hdfsWorkspace.sparkLib + "/*.jar")
+      .setVerbose(true)
       .setMaster("yarn")
       .setDeployMode(submitRequest.executionMode match {
         case SparkExecutionMode.YARN_CLIENT => "client"
         case SparkExecutionMode.YARN_CLUSTER => "cluster"
         case _ =>
-          throw new IllegalArgumentException(
+          throw new UnsupportedOperationException(
             "[StreamPark][YarnApplicationClient] Yarn mode only support 
\"client\" and \"cluster\".")
-
       })
-      .setAppName(submitRequest.appName)
-      .setConf(
-        "spark.yarn.jars",
-        submitRequest
-          .hdfsWorkspace
-          .sparkLib + "/*.jar")
-      .setVerbose(true)
-
-    import scala.collection.JavaConverters._
-    setDynamicProperties(launcher, submitRequest.properties.asScala.toMap)
+  }
 
-    // TODO: Adds command line arguments for the application.
-    // launcher.addAppArgs()
+  private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher: 
SparkLauncher): Unit = {
+    logger.info("[StreamPark][SparkClient][YarnApplicationClient] set spark 
configuration.")
+    // 1) set spark conf
+    submitRequest.properties.foreach(prop => {
+      val k = prop._1
+      val v = prop._2.toString
+      logInfo(s"| $k  : $v")
+      sparkLauncher.setConf(k, v)
+    })
 
+    // 2) appArgs...
     if (MapUtils.isNotEmpty(submitRequest.extraParameter) && 
submitRequest.extraParameter
         .containsKey("sql")) {
-      launcher.addAppArgs("--sql", 
submitRequest.extraParameter.get("sql").toString)
-    }
-
-    logger.info("[StreamPark][YarnApplicationClient] The spark task start")
-    val cdlForApplicationId: CountDownLatch = new CountDownLatch(1)
-
-    var sparkAppHandle: SparkAppHandle = null
-    threadPool.execute(new Runnable {
-      override def run(): Unit = {
-        try {
-          val countDownLatch: CountDownLatch = new CountDownLatch(1)
-          sparkAppHandle = launcher.startApplication(new 
SparkAppHandle.Listener() {
-            override def stateChanged(handle: SparkAppHandle): Unit = {
-              if (handle.getAppId != null) {
-                if (cdlForApplicationId.getCount != 0) {
-                  cdlForApplicationId.countDown()
-                }
-                logger.info("{} stateChanged :{}", Array(handle.getAppId, 
handle.getState.toString))
-              } else logger.info("stateChanged :{}", handle.getState.toString)
-
-              if (SparkAppHandle.State.FAILED.toString == 
handle.getState.toString) {
-                logger.error("Task run failure stateChanged :{}", 
handle.getState.toString)
-              }
-
-              if (handle.getState.isFinal) {
-                countDownLatch.countDown()
-              }
-            }
-
-            override def infoChanged(handle: SparkAppHandle): Unit = {}
-          })
-          countDownLatch.await()
-        } catch {
-          case e: Exception =>
-            logger.error(e.getMessage, e)
-        }
-      }
-    })
-
-    cdlForApplicationId.await()
-    logger.info(
-      "[StreamPark][YarnApplicationClient] The task is executing, handle 
current state is {}, appid is {}",
-      Array(sparkAppHandle.getState.toString, sparkAppHandle.getAppId))
-    SubmitResponse(null, null, sparkAppHandle.getAppId)
-  }
-
-  private def setDynamicProperties(sparkLauncher: SparkLauncher, properties: 
Map[String, Any]): Unit = {
-    logger.info("[StreamPark][YarnApplicationClient] Spark launcher start 
configuration.")
-    val finalProperties: Map[String, Any] = 
SparkConfiguration.defaultParameters ++ properties
-    for ((k, v) <- finalProperties) {
-      if (k.startsWith("spark.")) {
-        sparkLauncher.setConf(k, v.toString)
-      } else {
-        logger.info("[StreamPark][YarnApplicationClient] \"{}\" doesn't start 
with \"spark.\". Skip it.", k)
-      }
+      sparkLauncher.addAppArgs("--sql", 
submitRequest.extraParameter.get("sql").toString)
     }
   }
 
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
index 93f32aad0..af7778ebc 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util._
 import org.apache.streampark.spark.client.bean._
 
 import scala.collection.convert.ImplicitConversions._
+import scala.util.{Failure, Success, Try}
 
 trait SparkClientTrait extends Logger {
 
@@ -31,7 +32,7 @@ trait SparkClientTrait extends Logger {
          |--------------------------------------- spark job start 
-----------------------------------
          |    userSparkHome    : ${submitRequest.sparkVersion.sparkHome}
          |    sparkVersion     : ${submitRequest.sparkVersion.version}
-         |    appName          : ${submitRequest.appName}
+         |    appName          : ${submitRequest.effectiveAppName}
          |    devMode          : ${submitRequest.developmentMode.name()}
          |    execMode         : ${submitRequest.executionMode.name()}
          |    applicationType  : ${submitRequest.applicationType.getName}
@@ -41,17 +42,19 @@ trait SparkClientTrait extends Logger {
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
 
-    submitRequest.developmentMode match {
-      case _ =>
-        if (submitRequest.userJarFile != null) {
-          val uri = submitRequest.userJarFile.getAbsolutePath
-        }
-    }
+    prepareConfig(submitRequest)
 
     setConfig(submitRequest)
 
-    doSubmit(submitRequest)
-
+    Try(doSubmit(submitRequest)) match {
+      case Success(resp) => resp
+      case Failure(e) =>
+        logError(
+          s"spark job ${submitRequest.appName} start failed, " +
+            s"executionMode: ${submitRequest.executionMode.getName}, " +
+            s"detail: ${ExceptionUtils.stringifyException(e)}")
+        throw e
+    }
   }
 
   def setConfig(submitRequest: SubmitRequest): Unit
@@ -78,4 +81,21 @@ trait SparkClientTrait extends Logger {
   @throws[Exception]
   def doStop(stopRequest: StopRequest): StopResponse
 
+  private def prepareConfig(submitRequest: SubmitRequest): Unit = {
+    // 1) set default config
+    val userConfig = submitRequest.properties.filter(c => {
+      val k = c._1
+      if (k.startsWith("spark.")) {
+        true
+      } else {
+        logger.warn("[StreamPark] config {} doesn't start with \"spark.\" Skip 
it.", k)
+        false
+      }
+    })
+    val defaultConfig = submitRequest.DEFAULT_SUBMIT_PARAM.filter(c => 
!userConfig.containsKey(c._1))
+    submitRequest.properties.clear()
+    submitRequest.properties.putAll(userConfig)
+    submitRequest.properties.putAll(defaultConfig)
+  }
+
 }


Reply via email to