This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new e5f191a18 [Improve] shaded log4j dependencies (#2616)
e5f191a18 is described below

commit e5f191a18a4536dd966a2f0c36604a89990f90a8
Author: benjobs <[email protected]>
AuthorDate: Thu Apr 13 21:56:54 2023 +0800

    [Improve] shaded log4j dependencies (#2616)
    
    * [Improve] Flink CONF_DIR improvement
    
    * [Improve] shaded log4j dependencies
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 pom.xml                                            |  81 ++++-------
 streampark-common/pom.xml                          |   7 +
 .../streampark/common/conf/ConfigConst.scala       |   6 -
 .../streampark/common/util/ClassLoaderUtils.scala  |   2 -
 .../apache/streampark/common/util/FileUtils.scala  |   3 +-
 .../org/apache/streampark/common/util/Logger.scala | 123 +++++++++++++----
 .../streampark-console-service/pom.xml             |  33 +++++
 streampark-flink/pom.xml                           |   1 +
 .../impl/KubernetesNativeApplicationClient.scala   |  19 ++-
 .../flink/client/impl/YarnSessionClient.scala      |   3 +-
 .../client/trait/KubernetesNativeClientTrait.scala |  16 +--
 .../streampark-flink-shims-base/pom.xml            |   1 -
 {streampark-common => streampark-shade}/pom.xml    |  65 ++++++++-
 streampark-shaded/pom.xml                          |  36 +++++
 .../streampark-shaded-jackson}/pom.xml             | 148 +--------------------
 streampark-shaded/streampark-shaded-slf4j/pom.xml  | 135 +++++++++++++++++++
 streampark-spark/pom.xml                           |   6 +
 .../src/main/resources/application.properties      |  20 ---
 .../src/main/resources/spark-firemonitor           |  28 ----
 .../src/main/resources/spark-rpcdemo               |  28 ----
 .../src/main/resources/start-firemonitor-server.sh |  37 ------
 .../src/main/resources/start-rpcdemo-server.sh     |  37 ------
 .../src/main/resources/stop-firemonitor-server.sh  |  37 ------
 .../src/main/resources/stop-rpcdemo-server.sh      |  37 ------
 .../org/apache/streampark/spark/core/Spark.scala   | 109 +++++++++------
 .../apache/streampark/spark/core/SparkBatch.scala  |   4 +-
 26 files changed, 492 insertions(+), 530 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1ddb914c9..f4414e15a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
 
     <modules>
         <module>streampark-common</module>
+        <module>streampark-shaded</module>
         <module>streampark-flink</module>
         <module>streampark-storage</module>
         <module>streampark-console</module>
@@ -101,13 +102,11 @@
         <hive.version>2.3.4</hive.version>
         <hadoop.version>2.10.2</hadoop.version>
         <hbase.version>2.1.10</hbase.version>
-        <zkclient.version>0.11</zkclient.version>
-        <curator.version>4.2.0</curator.version>
         <redis.version>3.3.0</redis.version>
         <es.version>6.2.3</es.version>
         <influxdb.version>2.17</influxdb.version>
         <protobuf.version>2.5.0</protobuf.version>
-        <slf4j.version>1.7.30</slf4j.version>
+        <slf4j.version>1.7.32</slf4j.version>
         <log4j.version>2.17.1</log4j.version>
         <logback.version>1.2.11</logback.version>
         <grpc.version>1.15.0</grpc.version>
@@ -534,12 +533,6 @@
                 <version>${slf4j.version}</version>
             </dependency>
 
-            <dependency>
-                <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-simple</artifactId>
-                <version>${slf4j.version}</version>
-            </dependency>
-
             <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>log4j-over-slf4j</artifactId>
@@ -564,12 +557,32 @@
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-classic</artifactId>
                 <version>${logback.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
 
             <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-core</artifactId>
                 <version>${logback.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
 
             <dependency>
@@ -637,60 +650,12 @@
             <scope>provided</scope>
         </dependency>
 
-        <!--log4j -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
+            <optional>true</optional>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-api</artifactId>
-        </dependency>
-        <!--log4j end-->
-
-        <!-- logback -->
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-core</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <!-- logback end -->
     </dependencies>
 
     <build>
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index f072e6348..0f6a72f64 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -174,6 +174,13 @@
             <optional>true</optional>
         </dependency>
 
+        <!--log4j -->
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-shaded-slf4j</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 029b258f6..d31049166 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -83,12 +83,6 @@ object ConfigConst {
 
   // spark
 
-  val KEY_SPARK_USER_ARGS = "spark.user.args"
-
-  val KEY_SPARK_CONF = "spark.conf"
-
-  val KEY_SPARK_DEBUG_CONF = "spark.debug.conf"
-
   val KEY_SPARK_MAIN_CLASS = "spark.main.class"
 
   val KEY_SPARK_APP_NAME = "spark.app.name"
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
index f0766e6c0..001d69c76 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
@@ -20,8 +20,6 @@ import java.io.File
 import java.net.{URL, URLClassLoader}
 import java.util.function.Supplier
 
-import scala.util.Try
-
 object ClassLoaderUtils extends Logger {
 
   private[this] val originalClassLoader: ClassLoader = 
Thread.currentThread().getContextClassLoader
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 1676d1bff..dc39a1865 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -23,7 +23,7 @@ import java.util.Scanner
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
-object FileUtils extends org.apache.commons.io.FileUtils {
+object FileUtils {
 
   private[this] def bytesToHexString(src: Array[Byte]): String = {
     val stringBuilder = new mutable.StringBuilder
@@ -151,3 +151,4 @@ object FileUtils extends org.apache.commons.io.FileUtils {
   }
 
 }
+
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index b6281961d..da04653b8 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -16,83 +16,160 @@
  */
 package org.apache.streampark.common.util
 
-import org.slf4j.{Logger => SlfLogger, LoggerFactory}
-import org.slf4j.impl.StaticLoggerBinder
+import org.apache.streampark.shaded.ch.qos.logback.classic.LoggerContext
+import 
org.apache.streampark.shaded.ch.qos.logback.classic.joran.JoranConfigurator
+import 
org.apache.streampark.shaded.ch.qos.logback.classic.util.ContextSelectorStaticBinder
+import 
org.apache.streampark.shaded.ch.qos.logback.classic.util.{ContextInitializer => 
LogBackContextInitializer}
+import org.apache.streampark.shaded.ch.qos.logback.core.{CoreConstants, 
LogbackException}
+import org.apache.streampark.shaded.ch.qos.logback.core.status.StatusUtil
+import org.apache.streampark.shaded.ch.qos.logback.core.util.StatusPrinter
+
+import org.apache.streampark.shaded.org.slf4j.{ILoggerFactory, Logger => 
Slf4JLogger}
+import org.apache.streampark.shaded.org.slf4j.spi.LoggerFactoryBinder
+
+import java.io.{ByteArrayInputStream, File}
+import java.net.URL
+import java.nio.charset.StandardCharsets
+import scala.util.Try
+import scala.util.Success
+import scala.util.Failure
 
 trait Logger {
 
-  @transient private[this] var _logger: SlfLogger = _
+  @transient private[this] var _logger: Slf4JLogger = _
 
   private[this] val prefix = "[StreamPark]"
 
   protected def logName: String = this.getClass.getName.stripSuffix("$")
 
-  protected def logger: SlfLogger = {
+  protected def logger: Slf4JLogger = {
     if (_logger == null) {
-      initializeLogIfNecessary(false)
-      _logger = LoggerFactory.getLogger(logName)
+      initializeLogging()
+      val factory = LoggerFactory.getLoggerFactory()
+      _logger = factory.getLogger(logName)
     }
     _logger
   }
 
   def logInfo(msg: => String) {
-    if (logger.isInfoEnabled) logger.info(s"$prefix $msg")
+    logger.info(s"$prefix $msg")
   }
 
   def logInfo(msg: => String, throwable: Throwable) {
-    if (logger.isInfoEnabled) logger.info(s"$prefix $msg", throwable)
+    logger.info(s"$prefix $msg", throwable)
   }
 
   def logDebug(msg: => String) {
-    if (logger.isDebugEnabled) logger.debug(s"$prefix $msg")
+    logger.debug(s"$prefix $msg")
   }
 
   def logDebug(msg: => String, throwable: Throwable) {
-    if (logger.isDebugEnabled) logger.debug(s"$prefix $msg", throwable)
+    logger.debug(s"$prefix $msg", throwable)
   }
 
   def logTrace(msg: => String) {
-    if (logger.isTraceEnabled) logger.trace(s"$prefix $msg")
+    logger.trace(s"$prefix $msg")
   }
 
   def logTrace(msg: => String, throwable: Throwable) {
-    if (logger.isTraceEnabled) logger.trace(s"$prefix $msg", throwable)
+    logger.trace(s"$prefix $msg", throwable)
   }
 
   def logWarn(msg: => String) {
-    if (logger.isWarnEnabled) logger.warn(s"$prefix $msg")
+    logger.warn(s"$prefix $msg")
   }
 
   def logWarn(msg: => String, throwable: Throwable) {
-    if (logger.isWarnEnabled) logger.warn(s"$prefix $msg", throwable)
+    logger.warn(s"$prefix $msg", throwable)
   }
 
   def logError(msg: => String) {
-    if (logger.isErrorEnabled) logger.error(s"$prefix $msg")
+    logger.error(s"$prefix $msg")
   }
 
   def logError(msg: => String, throwable: Throwable) {
-    if (logger.isErrorEnabled) logger.error(s"$prefix $msg", throwable)
+    logger.error(s"$prefix $msg", throwable)
   }
 
-  protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
+  protected def initializeLogging(): Unit = {
     if (!Logger.initialized) {
       Logger.initLock.synchronized {
         if (!Logger.initialized) {
-          initializeLogging(isInterpreter)
+          Logger.initialized = true
+          logger
         }
       }
     }
   }
 
-  private def initializeLogging(isInterpreter: Boolean): Unit = {
-    StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
-    Logger.initialized = true
-    logger
-  }
 }
 
 private object Logger {
   @volatile private var initialized = false
   val initLock = new Object()
 }
+
+private[this] object LoggerFactory extends LoggerFactoryBinder {
+
+  private lazy val contextSelectorBinder: ContextSelectorStaticBinder = {
+    val defaultLoggerContext = new LoggerContext
+    Try(new ContextInitializer(defaultLoggerContext).autoConfig()) match {
+      case Success(s) => s
+      case Failure(e) =>
+        val msg = "Failed to auto configure default logger context"
+        // scalastyle:off println
+        System.err.println(msg)
+        // scalastyle:off println
+        System.err.println("Reported exception:")
+        e.printStackTrace()
+    }
+    if (!StatusUtil.contextHasStatusListener(defaultLoggerContext)) {
+      StatusPrinter.printInCaseOfErrorsOrWarnings(defaultLoggerContext)
+    }
+    val selectorBinder = new ContextSelectorStaticBinder()
+    selectorBinder.init(defaultLoggerContext, new Object())
+    selectorBinder
+  }
+
+  override def getLoggerFactory: ILoggerFactory = {
+    if (contextSelectorBinder.getContextSelector == null) {
+      throw new IllegalStateException("contextSelector cannot be null. See 
also " + CoreConstants.CODES_URL + "#null_CS")
+    }
+    contextSelectorBinder.getContextSelector.getLoggerContext
+  }
+
+  override def getLoggerFactoryClassStr: String = 
contextSelectorBinder.getClass.getName
+
+  private class ContextInitializer(loggerContext: LoggerContext) extends 
LogBackContextInitializer(loggerContext) {
+    override def configureByResource(url: URL): Unit = {
+      Utils.notNull(url, "URL argument cannot be null")
+      val path = url.getPath
+      if (path.endsWith("xml")) {
+        val configurator = new JoranConfigurator()
+        configurator.setContext(loggerContext)
+        val text = FileUtils.readString(new File(path))
+          .replaceAll(
+            "ch.qos.logback",
+            "org.apache.streampark.shaded.ch.qos.logback")
+          .replaceAll(
+            "org.slf4j",
+            "org.apache.streampark.shaded.org.slf4j")
+          .replaceAll(
+            "org.apache.log4j",
+            "org.apache.streampark.shaded.org.apache.log4j")
+          .replaceAll(
+            "org.apache.logging.log4j",
+            "org.apache.streampark.shaded.org.apache.logging.log4j")
+
+        val input = new 
ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8))
+        configurator.doConfigure(input)
+      } else throw {
+        new LogbackException("Unexpected filename extension of file [" + 
url.toString + "]. Should be .xml")
+      }
+    }
+
+  }
+
+}
+
+
diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index 7e7e28324..71fdafb52 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -399,6 +399,39 @@
 
         <!--Test dependencies end.-->
 
+        <!--log4j -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+        </dependency>
+        <!--log4j end-->
+
+        <!-- logback -->
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 57fa8a337..33d75d9a6 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -62,6 +62,7 @@
             <version>${jupiter.version}</version>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index 0a6208d0f..034c0ab1d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
DeploymentOptionsInternal, PipelineOptions}
+import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
PipelineOptions}
 import org.apache.flink.kubernetes.KubernetesClusterDescriptor
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 
@@ -36,6 +36,18 @@ import org.apache.streampark.flink.client.bean._
  */
 object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
 
+  override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    if (submitRequest.buildResult != null) {
+      val buildResult = 
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
+        buildResult.podTemplatePaths.foreach(p => {
+          flinkConfig
+            .safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
+            .safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
+            .safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+        })
+    }
+  }
+
   @throws[Exception]
   override def doSubmit(submitRequest: SubmitRequest, flinkConfig: 
Configuration): SubmitResponse = {
 
@@ -51,10 +63,7 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
 
     // add flink pipeline.jars configuration
     flinkConfig.safeSet(PipelineOptions.JARS, 
Lists.newArrayList(buildResult.dockerInnerMainJarPath))
-    // add flink conf configuration, mainly to set the log4j configuration
-    if (!flinkConfig.contains(DeploymentOptionsInternal.CONF_DIR)) {
-      flinkConfig.safeSet(DeploymentOptionsInternal.CONF_DIR, 
s"${submitRequest.flinkVersion.flinkHome}/conf")
-    }
+
     // add flink container image tag to flink configuration
     flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
buildResult.flinkImageTag)
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index eef7d75e7..5426fc6ca 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -87,8 +87,9 @@ object YarnSessionClient extends YarnClientTrait {
       .safeSet(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibs.asJava)
       // flinkDistJar
       .safeSet(YarnConfigOptions.FLINK_DIST_JAR, 
deployRequest.hdfsWorkspace.flinkDistJar)
-      //
+      // yarnDeployment Target
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+      // conf dir
       .safeSet(DeploymentOptionsInternal.CONF_DIR, 
s"${deployRequest.flinkVersion.flinkHome}/conf")
 
     logInfo(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index e40600472..c6a8710c2 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -30,9 +30,8 @@ import 
org.apache.flink.kubernetes.{KubernetesClusterClientFactory, KubernetesCl
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.ServiceExposedType
 
-import org.apache.streampark.common.enums.{ExecutionMode, 
FlinkK8sRestExposedType}
+import org.apache.streampark.common.enums.FlinkK8sRestExposedType
 import org.apache.streampark.flink.kubernetes.IngressController
-import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 import org.apache.streampark.flink.client.bean._
 
 /**
@@ -49,16 +48,9 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
         KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
         
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType))
 
-    if (submitRequest.buildResult != null) {
-      if (submitRequest.executionMode == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
-        val buildResult = 
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
-        buildResult.podTemplatePaths.foreach(p => {
-          flinkConfig
-            .safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
-            .safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
-            .safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
-        })
-      }
+    // add flink conf configuration, mainly to set the log4j configuration
+    if (!flinkConfig.contains(DeploymentOptionsInternal.CONF_DIR)) {
+      flinkConfig.safeSet(DeploymentOptionsInternal.CONF_DIR, 
s"${submitRequest.flinkVersion.flinkHome}/conf")
     }
 
     if (flinkConfig.get(KubernetesConfigOptions.NAMESPACE).isEmpty) {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
index 826179923..5cb843558 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
@@ -99,7 +99,6 @@
             <artifactId>hadoop-yarn-api</artifactId>
             <optional>true</optional>
         </dependency>
-
     </dependencies>
 
 </project>
diff --git a/streampark-common/pom.xml b/streampark-shade/pom.xml
similarity index 78%
copy from streampark-common/pom.xml
copy to streampark-shade/pom.xml
index f072e6348..8a64fd883 100644
--- a/streampark-common/pom.xml
+++ b/streampark-shade/pom.xml
@@ -174,6 +174,46 @@
             <optional>true</optional>
         </dependency>
 
+        <!--log4j -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <!--log4j end-->
+
+        <!-- logback -->
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <!-- logback end -->
+
     </dependencies>
 
     <build>
@@ -203,22 +243,35 @@
                             
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
                             <artifactSet>
                                 <includes>
+                                    <!--jackson-->
                                     
<include>com.fasterxml.jackson.*:*</include>
                                     <include>com.beachape:*</include>
+                                    <!-- logback -->
+                                    <include>org.slf4j:*:*</include>
+                                    
<include>org.apache.logging.log4j:*:*</include>
+                                    <include>ch.qos.logback:*:*</include>
                                 </includes>
                             </artifactSet>
                             <relocations>
                                 <relocation>
-                                    
<pattern>com.fasterxml.jackson.code</pattern>
-                                    
<shadedPattern>${streampark.shaded.package}.com.fasterxml.jackson.code</shadedPattern>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.com.fasterxml.jackson</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.slf4j</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.org.slf4j</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>ch.qos.logback</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.ch.qos.logback</shadedPattern>
                                 </relocation>
                                 <relocation>
-                                    
<pattern>com.fasterxml.jackson.module</pattern>
-                                    
<shadedPattern>${streampark.shaded.package}.com.fasterxml.jackson.module</shadedPattern>
+                                    <pattern>org.apache.logging.log4j</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.org.apache.logging.log4j</shadedPattern>
                                 </relocation>
                                 <relocation>
-                                    
<pattern>com.fasterxml.jackson.databind</pattern>
-                                    
<shadedPattern>${streampark.shaded.package}.com.fasterxml.jackson.databind</shadedPattern>
+                                    <pattern>org.apache.log4j</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.org.apache.log4j</shadedPattern>
                                 </relocation>
                             </relocations>
                             <filters>
diff --git a/streampark-shaded/pom.xml b/streampark-shaded/pom.xml
new file mode 100644
index 000000000..bca520acc
--- /dev/null
+++ b/streampark-shaded/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark</artifactId>
+        <version>2.1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampark-shaded</artifactId>
+    <name>StreamPark : Shaded Parent</name>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>streampark-shaded-slf4j</module>
+        <module>streampark-shaded-jackson</module>
+    </modules>
+
+</project>
diff --git a/streampark-common/pom.xml 
b/streampark-shaded/streampark-shaded-jackson/pom.xml
similarity index 50%
copy from streampark-common/pom.xml
copy to streampark-shaded/streampark-shaded-jackson/pom.xml
index f072e6348..18ff56cdd 100644
--- a/streampark-common/pom.xml
+++ b/streampark-shaded/streampark-shaded-jackson/pom.xml
@@ -20,74 +20,14 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.streampark</groupId>
-        <artifactId>streampark</artifactId>
+        <artifactId>streampark-shaded</artifactId>
         <version>2.1.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>streampark-common_${scala.binary.version}</artifactId>
-    <name>StreamPark : Common</name>
+    <artifactId>streampark-shaded-jackson</artifactId>
+    <name>StreamPark : Shaded Jackson </name>
 
     <dependencies>
-
-        <!-- test -->
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
-            <version>${jupiter.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-params</artifactId>
-            <version>${jupiter.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- enumeratum -->
-        <dependency>
-            <groupId>com.beachape</groupId>
-            <artifactId>enumeratum_${scala.binary.version}</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.github.ben-manes.caffeine</groupId>
-            <artifactId>caffeine</artifactId>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>redis.clients</groupId>
-            <artifactId>jedis</artifactId>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>com.zaxxer</groupId>
-            <artifactId>HikariCP</artifactId>
-            <exclusions>
-                <exclusion>
-                    <artifactId>slf4j-api</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.yaml</groupId>
-            <artifactId>snakeyaml</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.typesafe</groupId>
-            <artifactId>config</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.json4s</groupId>
-            <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
@@ -99,96 +39,15 @@
             
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
             <optional>true</optional>
         </dependency>
-
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
-
-        <!--hbase-->
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <exclusions>
-                <exclusion>
-                    <artifactId>junit</artifactId>
-                    <groupId>junit</groupId>
-                </exclusion>
-            </exclusions>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-client</artifactId>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.mongodb</groupId>
-            <artifactId>mongo-java-driver</artifactId>
-            <version>3.12.2</version>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <version>2.6.0</version>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.ivy</groupId>
-            <artifactId>ivy</artifactId>
-            <version>2.4.0</version>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-resolver</artifactId>
-            <version>4.1.65.Final</version>
-            <optional>true</optional>
-        </dependency>
-
     </dependencies>
 
     <build>
         <plugins>
-
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
             </plugin>
 
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-            </plugin>
-
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
@@ -204,7 +63,6 @@
                             <artifactSet>
                                 <includes>
                                     
<include>com.fasterxml.jackson.*:*</include>
-                                    <include>com.beachape:*</include>
                                 </includes>
                             </artifactSet>
                             <relocations>
diff --git a/streampark-shaded/streampark-shaded-slf4j/pom.xml 
b/streampark-shaded/streampark-shaded-slf4j/pom.xml
new file mode 100644
index 000000000..bd5d0e4cf
--- /dev/null
+++ b/streampark-shaded/streampark-shaded-slf4j/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-shaded</artifactId>
+        <version>2.1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampark-shaded-slf4j</artifactId>
+    <name>StreamPark : Shaded Slf4j </name>
+
+    <dependencies>
+
+        <!--log4j -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <!--log4j end-->
+
+        <!-- logback -->
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <!-- logback end -->
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<createDependencyReducedPom>true</createDependencyReducedPom>
+                            
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.slf4j:*:*</include>
+                                    
<include>org.apache.logging.log4j:*:*</include>
+                                    <include>ch.qos.logback:*:*</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.slf4j</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.org.slf4j</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>ch.qos.logback</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.ch.qos.logback</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.logging.log4j</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.org.apache.logging.log4j</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.log4j</pattern>
+                                    
<shadedPattern>${streampark.shaded.package}.org.apache.log4j</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>
diff --git a/streampark-spark/pom.xml b/streampark-spark/pom.xml
index 26994d236..5519ae06c 100644
--- a/streampark-spark/pom.xml
+++ b/streampark-spark/pom.xml
@@ -39,6 +39,7 @@
             <artifactId>streampark-common_2.12</artifactId>
             <version>${project.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_2.12</artifactId>
@@ -51,28 +52,33 @@
             </exclusions>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.12</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
             <version>${spark.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${hadoop.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-hdfs</artifactId>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-hadoop2-compat</artifactId>
diff --git 
a/streampark-spark/streampark-spark-core/src/main/resources/application.properties
 
b/streampark-spark/streampark-spark-core/src/main/resources/application.properties
deleted file mode 100644
index 0b35d3747..000000000
--- 
a/streampark-spark/streampark-spark-core/src/main/resources/application.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-spark.monitor.congestion.send.api=https://oapi.dingtalk.com/robot/send?access_token=aaaaaaaaaaaa
-spark.monitor.kafka.metadata.broker.list=kafka1:9092,kafka2:9092,kafka3:9092
-spark.monitor.kafka.topic=streampark-spark-batchinfos
diff --git 
a/streampark-spark/streampark-spark-core/src/main/resources/spark-firemonitor 
b/streampark-spark/streampark-spark-core/src/main/resources/spark-firemonitor
deleted file mode 100644
index 2d6e41d61..000000000
--- 
a/streampark-spark/streampark-spark-core/src/main/resources/spark-firemonitor
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if [ -z "${SPARK_HOME}" ]; then
-  # shellcheck disable=SC1090
-  source "$(dirname "$0")"/find-spark-home
-fi
-
-# disable randomized hash for string in Python 3.3+
-export PYTHONHASHSEED=0
-
-exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.YarnAppMonitorCli "$@"
diff --git 
a/streampark-spark/streampark-spark-core/src/main/resources/spark-rpcdemo 
b/streampark-spark/streampark-spark-core/src/main/resources/spark-rpcdemo
deleted file mode 100644
index 120a425f8..000000000
--- a/streampark-spark/streampark-spark-core/src/main/resources/spark-rpcdemo
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if [ -z "${SPARK_HOME}" ]; then
-  # shellcheck disable=SC1090
-  source "$(dirname "$0")"/find-spark-home
-fi
-
-# disable randomized hash for string in Python 3.3+
-export PYTHONHASHSEED=0
-
-exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.rpcDemoCli "$@"
diff --git 
a/streampark-spark/streampark-spark-core/src/main/resources/start-firemonitor-server.sh
 
b/streampark-spark/streampark-spark-core/src/main/resources/start-firemonitor-server.sh
deleted file mode 100644
index bdd97498b..000000000
--- 
a/streampark-spark/streampark-spark-core/src/main/resources/start-firemonitor-server.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts the history server on the machine this script is executed on.
-#
-# Usage: start-history-server.sh
-#
-# Use the SPARK_HISTORY_OPTS environment variable to set history server 
configuration.
-#
-
-if [ -z "${SPARK_HOME}" ]; then
-  # shellcheck disable=SC2155
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/sbin/spark-config.sh"
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh start 
org.apache.spark.YarnAppMonitorSer 1 "$@"
diff --git 
a/streampark-spark/streampark-spark-core/src/main/resources/start-rpcdemo-server.sh
 
b/streampark-spark/streampark-spark-core/src/main/resources/start-rpcdemo-server.sh
deleted file mode 100644
index 5fc46fc48..000000000
--- 
a/streampark-spark/streampark-spark-core/src/main/resources/start-rpcdemo-server.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts the history server on the machine this script is executed on.
-#
-# Usage: start-history-server.sh
-#
-# Use the SPARK_HISTORY_OPTS environment variable to set history server 
configuration.
-#
-
-if [ -z "${SPARK_HOME}" ]; then
-  # shellcheck disable=SC2155
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/sbin/spark-config.sh"
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.rpcDemo 1 "$@"
diff --git 
a/streampark-spark/streampark-spark-core/src/main/resources/stop-firemonitor-server.sh
 
b/streampark-spark/streampark-spark-core/src/main/resources/stop-firemonitor-server.sh
deleted file mode 100644
index 362cb9d02..000000000
--- 
a/streampark-spark/streampark-spark-core/src/main/resources/stop-firemonitor-server.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts the history server on the machine this script is executed on.
-#
-# Usage: start-history-server.sh
-#
-# Use the SPARK_HISTORY_OPTS environment variable to set history server 
configuration.
-#
-
-if [ -z "${SPARK_HOME}" ]; then
-  # shellcheck disable=SC2155
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/sbin/spark-config.sh"
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh stop 
org.apache.spark.YarnAppMonitorSer 1
diff --git 
a/streampark-spark/streampark-spark-core/src/main/resources/stop-rpcdemo-server.sh
 
b/streampark-spark/streampark-spark-core/src/main/resources/stop-rpcdemo-server.sh
deleted file mode 100644
index d7d18346f..000000000
--- 
a/streampark-spark/streampark-spark-core/src/main/resources/stop-rpcdemo-server.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts the history server on the machine this script is executed on.
-#
-# Usage: start-history-server.sh
-#
-# Use the SPARK_HISTORY_OPTS environment variable to set history server 
configuration.
-#
-
-if [ -z "${SPARK_HOME}" ]; then
-  # shellcheck disable=SC2155
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/sbin/spark-config.sh"
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.spark.rpcDemo 1
diff --git 
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
 
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
index 58800dec4..f1efef788 100644
--- 
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
+++ 
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
@@ -19,12 +19,10 @@ package org.apache.streampark.spark.core
 
 import scala.annotation.meta.getter
 import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
-
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.util.{PropertiesUtils, SystemPropertyUtils}
+import org.apache.streampark.common.util.{Logger, PropertiesUtils}
 
 /**
  * <b><code>Spark</code></b>
@@ -32,14 +30,15 @@ import org.apache.streampark.common.util.{PropertiesUtils, 
SystemPropertyUtils}
  * Spark Basic Traits
  * <p/>
  */
-trait Spark {
+trait Spark extends Logger {
 
-  @(transient @getter)
+  @(transient@getter)
   final protected lazy val sparkConf: SparkConf = new SparkConf()
-  @(transient @getter)
-  final protected val sparkListeners = new ArrayBuffer[String]()
 
-  @(transient @getter)
+  @(transient@getter)
+  private[this] final  val sparkListeners = new ArrayBuffer[String]()
+
+  @(transient@getter)
   final protected var sparkSession: SparkSession = _
 
   // Directory of checkpoint
@@ -52,12 +51,35 @@ trait Spark {
    * Entrance
    */
   def main(args: Array[String]): Unit = {
+
     init(args)
+
     config(sparkConf)
-    sparkSession = sparkConf.get("spark.enable.hive.support", 
"false").toLowerCase match {
-      case "true" => 
SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
-      case "false" => SparkSession.builder().config(sparkConf).getOrCreate()
+
+    // 1) system.properties
+    val sysProps = sparkConf.getAllWithPrefix("spark.config.system.properties")
+    if (sysProps != null) {
+      sysProps.foreach(x => {
+        System.getProperties.setProperty(x._1.drop(1), x._2)
+      })
+    }
+
+    val builder = SparkSession.builder().config(sparkConf)
+    val enableHive = sparkConf.getBoolean("spark.config.enable.hive.support", 
defaultValue = false)
+    if (enableHive) {
+      builder.enableHiveSupport()
+    }
+
+    sparkSession = builder.getOrCreate()
+
+    // 2) hive
+    val sparkSql = sparkConf.getAllWithPrefix("spark.config.spark.sql")
+    if (sparkSql != null) {
+      sparkSql.foreach(x => {
+        sparkSession.sparkContext.getConf.set(x._1.drop(1), x._2)
+      })
     }
+
     ready()
     handle()
     start()
@@ -67,39 +89,37 @@ trait Spark {
   /**
    * Initialize sparkConf according to user parameters
    */
-  final def init(args: Array[String]): Unit = {
+  private final def init(args: Array[String]): Unit = {
+
+    logDebug("init application config ....")
 
     var argv = args.toList
 
+    var conf: String = null
+
     while (argv.nonEmpty) {
       argv match {
-        case ("--checkpoint") :: value :: tail =>
+        case "--conf" :: value :: tail =>
+          conf = value
+          argv = tail
+        case "--checkpoint" :: value :: tail =>
           checkpoint = value
           argv = tail
-        case ("--createOnError") :: value :: tail =>
+        case "--createOnError" :: value :: tail =>
           createOnError = value.toBoolean
           argv = tail
         case Nil =>
         case tail =>
-          // scalastyle:off println
-          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
+          logError(s"Unrecognized options: ${tail.mkString(" ")}")
           printUsageAndExit()
       }
     }
 
-    sparkConf.set(KEY_SPARK_USER_ARGS, args.mkString("|"))
-
-    // The default configuration file passed in through vm -Dspark.debug.conf 
is used as local debugging mode
-    val (isDebug, confPath) = SystemPropertyUtils.get(KEY_SPARK_CONF, "") 
match {
-      case "" => (true, sparkConf.get(KEY_SPARK_DEBUG_CONF))
-      case path => (false, path)
-      case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:properties-file error")
-    }
-
-    val localConf = confPath.split("\\.").last match {
-      case "properties" => PropertiesUtils.fromPropertiesFile(confPath)
-      case "yaml" | "yml" => PropertiesUtils.fromYamlFile(confPath)
-      case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:properties-file format error,must be properties or yml")
+    val localConf = conf.split("\\.").last match {
+      case "conf" => PropertiesUtils.fromHoconFile(conf)
+      case "properties" => PropertiesUtils.fromPropertiesFile(conf)
+      case "yaml" | "yml" => PropertiesUtils.fromYamlFile(conf)
+      case _ => throw new IllegalArgumentException("[StreamPark] Usage: config 
file error,must be [properties|yaml|conf]")
     }
 
     localConf.foreach(x => sparkConf.set(x._1, x._2))
@@ -107,19 +127,19 @@ trait Spark {
     val (appMain, appName) = sparkConf.get(KEY_SPARK_MAIN_CLASS, null) match {
       case null | "" => (null, null)
       case other => sparkConf.get(KEY_SPARK_APP_NAME, null) match {
-          case null | "" => (other, other)
-          case name => (other, name)
-        }
+        case null | "" => (other, other)
+        case name => (other, name)
+      }
     }
 
     if (appMain == null) {
-      // scalastyle:off println
-      System.err.println(s"[StreamPark] $KEY_SPARK_MAIN_CLASS must not be 
empty!")
+      logError(s"[StreamPark] $KEY_SPARK_MAIN_CLASS must not be empty!")
       System.exit(1)
     }
 
     // debug mode
-    if (isDebug) {
+    val localMode = sparkConf.get("spark.master", null) == "local"
+    if (localMode) {
       sparkConf.setAppName(s"[LocalDebug] $appName").setMaster("local[*]")
       sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
     }
@@ -127,23 +147,25 @@ trait Spark {
     sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
 
     val extraListeners = sparkListeners.mkString(",") + "," + 
sparkConf.get("spark.extraListeners", "")
-    if (extraListeners != "") sparkConf.set("spark.extraListeners", 
extraListeners)
+    if (extraListeners != "") {
+      sparkConf.set("spark.extraListeners", extraListeners)
+    }
   }
 
   /**
    * The purpose of the config phase is to allow the developer to set more 
parameters (other than the agreed
    * configuration file) by means of hooks.
    * Such as,
-   *  conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
-   *  conf.registerKryoClasses(Array(classOf[User], classOf[Order],...))
+   * conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+   * conf.registerKryoClasses(Array(classOf[User], classOf[Order],...))
    */
-  def config(sparkConf: SparkConf): Unit
+  def config(sparkConf: SparkConf): Unit = {}
 
   /**
    * The ready phase is an entry point for the developer to do other actions 
after the parameters have been set,
    * and is done after initialization and before the program starts.
    */
-  def ready(): Unit
+  def ready(): Unit = {}
 
   /**
    * The handle phase is the entry point to the code written by the developer 
and is the most important phase.
@@ -153,7 +175,7 @@ trait Spark {
   /**
    * The start phase starts the task, which is executed automatically by the 
framework.
    */
-  def start(): Unit
+  def start(): Unit = {}
 
   /**
    * The destroy phase, is the last phase before jvm exits after the program 
has finished running,
@@ -164,9 +186,8 @@ trait Spark {
   /**
    * printUsageAndExit
    */
-  def printUsageAndExit(): Unit = {
-    // scalastyle:off println
-    System.err.println(
+  private[this] def printUsageAndExit(): Unit = {
+    logError(
       """
         |"Usage: Streaming [options]
         |
diff --git 
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/SparkBatch.scala
 
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/SparkBatch.scala
index 8a6e4ea8b..b6a90b705 100644
--- 
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/SparkBatch.scala
+++ 
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/SparkBatch.scala
@@ -19,8 +19,7 @@ package org.apache.streampark.spark.core
 
 import scala.annotation.meta.getter
 
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.SparkContext
 
 /**
  * <b><code>SparkBatch</code></b>
@@ -37,4 +36,5 @@ trait SparkBatch extends Spark {
   override def destroy(): Unit = {
     context.stop()
   }
+
 }

Reply via email to