This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new e5f191a18 [Improve] shaded log4j dependencies (#2616)
e5f191a18 is described below
commit e5f191a18a4536dd966a2f0c36604a89990f90a8
Author: benjobs <[email protected]>
AuthorDate: Thu Apr 13 21:56:54 2023 +0800
[Improve] shaded log4j dependencies (#2616)
* [Improve] Flink CONF_DIR improvement
* [Improve] shaded log4j dependencies
---------
Co-authored-by: benjobs <[email protected]>
---
pom.xml | 81 ++++-------
streampark-common/pom.xml | 7 +
.../streampark/common/conf/ConfigConst.scala | 6 -
.../streampark/common/util/ClassLoaderUtils.scala | 2 -
.../apache/streampark/common/util/FileUtils.scala | 3 +-
.../org/apache/streampark/common/util/Logger.scala | 123 +++++++++++++----
.../streampark-console-service/pom.xml | 33 +++++
streampark-flink/pom.xml | 1 +
.../impl/KubernetesNativeApplicationClient.scala | 19 ++-
.../flink/client/impl/YarnSessionClient.scala | 3 +-
.../client/trait/KubernetesNativeClientTrait.scala | 16 +--
.../streampark-flink-shims-base/pom.xml | 1 -
{streampark-common => streampark-shade}/pom.xml | 65 ++++++++-
streampark-shaded/pom.xml | 36 +++++
.../streampark-shaded-jackson}/pom.xml | 148 +--------------------
streampark-shaded/streampark-shaded-slf4j/pom.xml | 135 +++++++++++++++++++
streampark-spark/pom.xml | 6 +
.../src/main/resources/application.properties | 20 ---
.../src/main/resources/spark-firemonitor | 28 ----
.../src/main/resources/spark-rpcdemo | 28 ----
.../src/main/resources/start-firemonitor-server.sh | 37 ------
.../src/main/resources/start-rpcdemo-server.sh | 37 ------
.../src/main/resources/stop-firemonitor-server.sh | 37 ------
.../src/main/resources/stop-rpcdemo-server.sh | 37 ------
.../org/apache/streampark/spark/core/Spark.scala | 109 +++++++++------
.../apache/streampark/spark/core/SparkBatch.scala | 4 +-
26 files changed, 492 insertions(+), 530 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1ddb914c9..f4414e15a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
<modules>
<module>streampark-common</module>
+ <module>streampark-shaded</module>
<module>streampark-flink</module>
<module>streampark-storage</module>
<module>streampark-console</module>
@@ -101,13 +102,11 @@
<hive.version>2.3.4</hive.version>
<hadoop.version>2.10.2</hadoop.version>
<hbase.version>2.1.10</hbase.version>
- <zkclient.version>0.11</zkclient.version>
- <curator.version>4.2.0</curator.version>
<redis.version>3.3.0</redis.version>
<es.version>6.2.3</es.version>
<influxdb.version>2.17</influxdb.version>
<protobuf.version>2.5.0</protobuf.version>
- <slf4j.version>1.7.30</slf4j.version>
+ <slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<logback.version>1.2.11</logback.version>
<grpc.version>1.15.0</grpc.version>
@@ -534,12 +533,6 @@
<version>${slf4j.version}</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
@@ -564,12 +557,32 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -637,60 +650,12 @@
<scope>provided</scope>
</dependency>
- <!--log4j -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
+ <optional>true</optional>
</dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- </dependency>
- <!--log4j end-->
-
- <!-- logback -->
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- logback end -->
</dependencies>
<build>
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index f072e6348..0f6a72f64 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -174,6 +174,13 @@
<optional>true</optional>
</dependency>
+ <!--log4j -->
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-shaded-slf4j</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 029b258f6..d31049166 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -83,12 +83,6 @@ object ConfigConst {
// spark
- val KEY_SPARK_USER_ARGS = "spark.user.args"
-
- val KEY_SPARK_CONF = "spark.conf"
-
- val KEY_SPARK_DEBUG_CONF = "spark.debug.conf"
-
val KEY_SPARK_MAIN_CLASS = "spark.main.class"
val KEY_SPARK_APP_NAME = "spark.app.name"
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
index f0766e6c0..001d69c76 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
@@ -20,8 +20,6 @@ import java.io.File
import java.net.{URL, URLClassLoader}
import java.util.function.Supplier
-import scala.util.Try
-
object ClassLoaderUtils extends Logger {
private[this] val originalClassLoader: ClassLoader =
Thread.currentThread().getContextClassLoader
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 1676d1bff..dc39a1865 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -23,7 +23,7 @@ import java.util.Scanner
import scala.collection.JavaConversions._
import scala.collection.mutable
-object FileUtils extends org.apache.commons.io.FileUtils {
+object FileUtils {
private[this] def bytesToHexString(src: Array[Byte]): String = {
val stringBuilder = new mutable.StringBuilder
@@ -151,3 +151,4 @@ object FileUtils extends org.apache.commons.io.FileUtils {
}
}
+
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index b6281961d..da04653b8 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -16,83 +16,160 @@
*/
package org.apache.streampark.common.util
-import org.slf4j.{Logger => SlfLogger, LoggerFactory}
-import org.slf4j.impl.StaticLoggerBinder
+import org.apache.streampark.shaded.ch.qos.logback.classic.LoggerContext
+import
org.apache.streampark.shaded.ch.qos.logback.classic.joran.JoranConfigurator
+import
org.apache.streampark.shaded.ch.qos.logback.classic.util.ContextSelectorStaticBinder
+import
org.apache.streampark.shaded.ch.qos.logback.classic.util.{ContextInitializer =>
LogBackContextInitializer}
+import org.apache.streampark.shaded.ch.qos.logback.core.{CoreConstants,
LogbackException}
+import org.apache.streampark.shaded.ch.qos.logback.core.status.StatusUtil
+import org.apache.streampark.shaded.ch.qos.logback.core.util.StatusPrinter
+
+import org.apache.streampark.shaded.org.slf4j.{ILoggerFactory, Logger =>
Slf4JLogger}
+import org.apache.streampark.shaded.org.slf4j.spi.LoggerFactoryBinder
+
+import java.io.{ByteArrayInputStream, File}
+import java.net.URL
+import java.nio.charset.StandardCharsets
+import scala.util.Try
+import scala.util.Success
+import scala.util.Failure
trait Logger {
- @transient private[this] var _logger: SlfLogger = _
+ @transient private[this] var _logger: Slf4JLogger = _
private[this] val prefix = "[StreamPark]"
protected def logName: String = this.getClass.getName.stripSuffix("$")
- protected def logger: SlfLogger = {
+ protected def logger: Slf4JLogger = {
if (_logger == null) {
- initializeLogIfNecessary(false)
- _logger = LoggerFactory.getLogger(logName)
+ initializeLogging()
+ val factory = LoggerFactory.getLoggerFactory()
+ _logger = factory.getLogger(logName)
}
_logger
}
def logInfo(msg: => String) {
- if (logger.isInfoEnabled) logger.info(s"$prefix $msg")
+ logger.info(s"$prefix $msg")
}
def logInfo(msg: => String, throwable: Throwable) {
- if (logger.isInfoEnabled) logger.info(s"$prefix $msg", throwable)
+ logger.info(s"$prefix $msg", throwable)
}
def logDebug(msg: => String) {
- if (logger.isDebugEnabled) logger.debug(s"$prefix $msg")
+ logger.debug(s"$prefix $msg")
}
def logDebug(msg: => String, throwable: Throwable) {
- if (logger.isDebugEnabled) logger.debug(s"$prefix $msg", throwable)
+ logger.debug(s"$prefix $msg", throwable)
}
def logTrace(msg: => String) {
- if (logger.isTraceEnabled) logger.trace(s"$prefix $msg")
+ logger.trace(s"$prefix $msg")
}
def logTrace(msg: => String, throwable: Throwable) {
- if (logger.isTraceEnabled) logger.trace(s"$prefix $msg", throwable)
+ logger.trace(s"$prefix $msg", throwable)
}
def logWarn(msg: => String) {
- if (logger.isWarnEnabled) logger.warn(s"$prefix $msg")
+ logger.warn(s"$prefix $msg")
}
def logWarn(msg: => String, throwable: Throwable) {
- if (logger.isWarnEnabled) logger.warn(s"$prefix $msg", throwable)
+ logger.warn(s"$prefix $msg", throwable)
}
def logError(msg: => String) {
- if (logger.isErrorEnabled) logger.error(s"$prefix $msg")
+ logger.error(s"$prefix $msg")
}
def logError(msg: => String, throwable: Throwable) {
- if (logger.isErrorEnabled) logger.error(s"$prefix $msg", throwable)
+ logger.error(s"$prefix $msg", throwable)
}
- protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
+ protected def initializeLogging(): Unit = {
if (!Logger.initialized) {
Logger.initLock.synchronized {
if (!Logger.initialized) {
- initializeLogging(isInterpreter)
+ Logger.initialized = true
+ logger
}
}
}
}
- private def initializeLogging(isInterpreter: Boolean): Unit = {
- StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
- Logger.initialized = true
- logger
- }
}
private object Logger {
@volatile private var initialized = false
val initLock = new Object()
}
+
+private[this] object LoggerFactory extends LoggerFactoryBinder {
+
+ private lazy val contextSelectorBinder: ContextSelectorStaticBinder = {
+ val defaultLoggerContext = new LoggerContext
+ Try(new ContextInitializer(defaultLoggerContext).autoConfig()) match {
+ case Success(s) => s
+ case Failure(e) =>
+ val msg = "Failed to auto configure default logger context"
+ // scalastyle:off println
+ System.err.println(msg)
+ // scalastyle:off println
+ System.err.println("Reported exception:")
+ e.printStackTrace()
+ }
+ if (!StatusUtil.contextHasStatusListener(defaultLoggerContext)) {
+ StatusPrinter.printInCaseOfErrorsOrWarnings(defaultLoggerContext)
+ }
+ val selectorBinder = new ContextSelectorStaticBinder()
+ selectorBinder.init(defaultLoggerContext, new Object())
+ selectorBinder
+ }
+
+ override def getLoggerFactory: ILoggerFactory = {
+ if (contextSelectorBinder.getContextSelector == null) {
+ throw new IllegalStateException("contextSelector cannot be null. See
also " + CoreConstants.CODES_URL + "#null_CS")
+ }
+ contextSelectorBinder.getContextSelector.getLoggerContext
+ }
+
+ override def getLoggerFactoryClassStr: String =
contextSelectorBinder.getClass.getName
+
+ private class ContextInitializer(loggerContext: LoggerContext) extends
LogBackContextInitializer(loggerContext) {
+ override def configureByResource(url: URL): Unit = {
+ Utils.notNull(url, "URL argument cannot be null")
+ val path = url.getPath
+ if (path.endsWith("xml")) {
+ val configurator = new JoranConfigurator()
+ configurator.setContext(loggerContext)
+ val text = FileUtils.readString(new File(path))
+ .replaceAll(
+ "ch.qos.logback",
+ "org.apache.streampark.shaded.ch.qos.logback")
+ .replaceAll(
+ "org.slf4j",
+ "org.apache.streampark.shaded.org.slf4j")
+ .replaceAll(
+ "org.apache.log4j",
+ "org.apache.streampark.shaded.org.apache.log4j")
+ .replaceAll(
+ "org.apache.logging.log4j",
+ "org.apache.streampark.shaded.org.apache.logging.log4j")
+
+ val input = new
ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8))
+ configurator.doConfigure(input)
+ } else throw {
+ new LogbackException("Unexpected filename extension of file [" +
url.toString + "]. Should be .xml")
+ }
+ }
+
+ }
+
+}
+
+
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index 7e7e28324..71fdafb52 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -399,6 +399,39 @@
<!--Test dependencies end.-->
+ <!--log4j -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <!--log4j end-->
+
+ <!-- logback -->
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 57fa8a337..33d75d9a6 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -62,6 +62,7 @@
<version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
+
</dependencies>
<build>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index 0a6208d0f..034c0ab1d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.configuration.{Configuration, DeploymentOptions,
DeploymentOptionsInternal, PipelineOptions}
+import org.apache.flink.configuration.{Configuration, DeploymentOptions,
PipelineOptions}
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
@@ -36,6 +36,18 @@ import org.apache.streampark.flink.client.bean._
*/
object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
+ override def setConfig(submitRequest: SubmitRequest, flinkConfig:
Configuration): Unit = {
+ if (submitRequest.buildResult != null) {
+ val buildResult =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
+ buildResult.podTemplatePaths.foreach(p => {
+ flinkConfig
+ .safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
+ .safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
+ .safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+ })
+ }
+ }
+
@throws[Exception]
override def doSubmit(submitRequest: SubmitRequest, flinkConfig:
Configuration): SubmitResponse = {
@@ -51,10 +63,7 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
// add flink pipeline.jars configuration
flinkConfig.safeSet(PipelineOptions.JARS,
Lists.newArrayList(buildResult.dockerInnerMainJarPath))
- // add flink conf configuration, mainly to set the log4j configuration
- if (!flinkConfig.contains(DeploymentOptionsInternal.CONF_DIR)) {
- flinkConfig.safeSet(DeploymentOptionsInternal.CONF_DIR,
s"${submitRequest.flinkVersion.flinkHome}/conf")
- }
+
// add flink container image tag to flink configuration
flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
buildResult.flinkImageTag)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index eef7d75e7..5426fc6ca 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -87,8 +87,9 @@ object YarnSessionClient extends YarnClientTrait {
.safeSet(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibs.asJava)
// flinkDistJar
.safeSet(YarnConfigOptions.FLINK_DIST_JAR,
deployRequest.hdfsWorkspace.flinkDistJar)
- //
+ // yarnDeployment Target
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+ // conf dir
.safeSet(DeploymentOptionsInternal.CONF_DIR,
s"${deployRequest.flinkVersion.flinkHome}/conf")
logInfo(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index e40600472..c6a8710c2 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -30,9 +30,8 @@ import
org.apache.flink.kubernetes.{KubernetesClusterClientFactory, KubernetesCl
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.ServiceExposedType
-import org.apache.streampark.common.enums.{ExecutionMode,
FlinkK8sRestExposedType}
+import org.apache.streampark.common.enums.FlinkK8sRestExposedType
import org.apache.streampark.flink.kubernetes.IngressController
-import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
import org.apache.streampark.flink.client.bean._
/**
@@ -49,16 +48,9 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType))
- if (submitRequest.buildResult != null) {
- if (submitRequest.executionMode ==
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
- val buildResult =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
- buildResult.podTemplatePaths.foreach(p => {
- flinkConfig
- .safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
- .safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
- .safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
- })
- }
+ // add flink conf configuration, mainly to set the log4j configuration
+ if (!flinkConfig.contains(DeploymentOptionsInternal.CONF_DIR)) {
+ flinkConfig.safeSet(DeploymentOptionsInternal.CONF_DIR,
s"${submitRequest.flinkVersion.flinkHome}/conf")
}
if (flinkConfig.get(KubernetesConfigOptions.NAMESPACE).isEmpty) {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
index 826179923..5cb843558 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
@@ -99,7 +99,6 @@
<artifactId>hadoop-yarn-api</artifactId>
<optional>true</optional>
</dependency>
-
</dependencies>
</project>
diff --git a/streampark-common/pom.xml b/streampark-shade/pom.xml
similarity index 78%
copy from streampark-common/pom.xml
copy to streampark-shade/pom.xml
index f072e6348..8a64fd883 100644
--- a/streampark-common/pom.xml
+++ b/streampark-shade/pom.xml
@@ -174,6 +174,46 @@
<optional>true</optional>
</dependency>
+ <!--log4j -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <!--log4j end-->
+
+ <!-- logback -->
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <!-- logback end -->
+
</dependencies>
<build>
@@ -203,22 +243,35 @@
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
<artifactSet>
<includes>
+ <!--jackson-->
<include>com.fasterxml.jackson.*:*</include>
<include>com.beachape:*</include>
+ <!-- logback -->
+ <include>org.slf4j:*:*</include>
+
<include>org.apache.logging.log4j:*:*</include>
+ <include>ch.qos.logback:*:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
-
<pattern>com.fasterxml.jackson.code</pattern>
-
<shadedPattern>${streampark.shaded.package}.com.fasterxml.jackson.code</shadedPattern>
+ <pattern>com.fasterxml.jackson</pattern>
+
<shadedPattern>${streampark.shaded.package}.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.slf4j</pattern>
+
<shadedPattern>${streampark.shaded.package}.org.slf4j</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>ch.qos.logback</pattern>
+
<shadedPattern>${streampark.shaded.package}.ch.qos.logback</shadedPattern>
</relocation>
<relocation>
-
<pattern>com.fasterxml.jackson.module</pattern>
-
<shadedPattern>${streampark.shaded.package}.com.fasterxml.jackson.module</shadedPattern>
+ <pattern>org.apache.logging.log4j</pattern>
+
<shadedPattern>${streampark.shaded.package}.org.apache.logging.log4j</shadedPattern>
</relocation>
<relocation>
-
<pattern>com.fasterxml.jackson.databind</pattern>
-
<shadedPattern>${streampark.shaded.package}.com.fasterxml.jackson.databind</shadedPattern>
+ <pattern>org.apache.log4j</pattern>
+
<shadedPattern>${streampark.shaded.package}.org.apache.log4j</shadedPattern>
</relocation>
</relocations>
<filters>
diff --git a/streampark-shaded/pom.xml b/streampark-shaded/pom.xml
new file mode 100644
index 000000000..bca520acc
--- /dev/null
+++ b/streampark-shaded/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark</artifactId>
+ <version>2.1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampark-shaded</artifactId>
+ <name>StreamPark : Shaded Parent</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>streampark-shaded-slf4j</module>
+ <module>streampark-shaded-jackson</module>
+ </modules>
+
+</project>
diff --git a/streampark-common/pom.xml
b/streampark-shaded/streampark-shaded-jackson/pom.xml
similarity index 50%
copy from streampark-common/pom.xml
copy to streampark-shaded/streampark-shaded-jackson/pom.xml
index f072e6348..18ff56cdd 100644
--- a/streampark-common/pom.xml
+++ b/streampark-shaded/streampark-shaded-jackson/pom.xml
@@ -20,74 +20,14 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampark</groupId>
- <artifactId>streampark</artifactId>
+ <artifactId>streampark-shaded</artifactId>
<version>2.1.0-SNAPSHOT</version>
</parent>
- <artifactId>streampark-common_${scala.binary.version}</artifactId>
- <name>StreamPark : Common</name>
+ <artifactId>streampark-shaded-jackson</artifactId>
+ <name>StreamPark : Shaded Jackson </name>
<dependencies>
-
- <!-- test -->
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <version>${jupiter.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-params</artifactId>
- <version>${jupiter.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- enumeratum -->
- <dependency>
- <groupId>com.beachape</groupId>
- <artifactId>enumeratum_${scala.binary.version}</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>com.zaxxer</groupId>
- <artifactId>HikariCP</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-api</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- </dependency>
-
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
@@ -99,96 +39,15 @@
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<optional>true</optional>
</dependency>
-
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
-
- <!--hbase-->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>junit</artifactId>
- <groupId>junit</groupId>
- </exclusion>
- </exclusions>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- <version>3.12.2</version>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>2.6.0</version>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ivy</groupId>
- <artifactId>ivy</artifactId>
- <version>2.4.0</version>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-resolver</artifactId>
- <version>4.1.65.Final</version>
- <optional>true</optional>
- </dependency>
-
</dependencies>
<build>
<plugins>
-
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- </plugin>
-
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
@@ -204,7 +63,6 @@
<artifactSet>
<includes>
<include>com.fasterxml.jackson.*:*</include>
- <include>com.beachape:*</include>
</includes>
</artifactSet>
<relocations>
diff --git a/streampark-shaded/streampark-shaded-slf4j/pom.xml
b/streampark-shaded/streampark-shaded-slf4j/pom.xml
new file mode 100644
index 000000000..bd5d0e4cf
--- /dev/null
+++ b/streampark-shaded/streampark-shaded-slf4j/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-shaded</artifactId>
+ <version>2.1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampark-shaded-slf4j</artifactId>
+ <name>StreamPark : Shaded Slf4j </name>
+
+ <dependencies>
+
+ <!--log4j -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <!--log4j end-->
+
+ <!-- logback -->
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <!-- logback end -->
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<createDependencyReducedPom>true</createDependencyReducedPom>
+
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+ <artifactSet>
+ <includes>
+ <include>org.slf4j:*:*</include>
+
<include>org.apache.logging.log4j:*:*</include>
+ <include>ch.qos.logback:*:*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>org.slf4j</pattern>
+
<shadedPattern>${streampark.shaded.package}.org.slf4j</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>ch.qos.logback</pattern>
+
<shadedPattern>${streampark.shaded.package}.ch.qos.logback</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.logging.log4j</pattern>
+
<shadedPattern>${streampark.shaded.package}.org.apache.logging.log4j</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.log4j</pattern>
+
<shadedPattern>${streampark.shaded.package}.org.apache.log4j</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
diff --git a/streampark-spark/pom.xml b/streampark-spark/pom.xml
index 26994d236..5519ae06c 100644
--- a/streampark-spark/pom.xml
+++ b/streampark-spark/pom.xml
@@ -39,6 +39,7 @@
<artifactId>streampark-common_2.12</artifactId>
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
@@ -51,28 +52,33 @@
</exclusions>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
diff --git
a/streampark-spark/streampark-spark-core/src/main/resources/application.properties
b/streampark-spark/streampark-spark-core/src/main/resources/application.properties
deleted file mode 100644
index 0b35d3747..000000000
---
a/streampark-spark/streampark-spark-core/src/main/resources/application.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-spark.monitor.congestion.send.api=https://oapi.dingtalk.com/robot/send?access_token=aaaaaaaaaaaa
-spark.monitor.kafka.metadata.broker.list=kafka1:9092,kafka2:9092,kafka3:9092
-spark.monitor.kafka.topic=streampark-spark-batchinfos
diff --git
a/streampark-spark/streampark-spark-core/src/main/resources/spark-firemonitor
b/streampark-spark/streampark-spark-core/src/main/resources/spark-firemonitor
deleted file mode 100644
index 2d6e41d61..000000000
---
a/streampark-spark/streampark-spark-core/src/main/resources/spark-firemonitor
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if [ -z "${SPARK_HOME}" ]; then
- # shellcheck disable=SC1090
- source "$(dirname "$0")"/find-spark-home
-fi
-
-# disable randomized hash for string in Python 3.3+
-export PYTHONHASHSEED=0
-
-exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.YarnAppMonitorCli "$@"
diff --git
a/streampark-spark/streampark-spark-core/src/main/resources/spark-rpcdemo
b/streampark-spark/streampark-spark-core/src/main/resources/spark-rpcdemo
deleted file mode 100644
index 120a425f8..000000000
--- a/streampark-spark/streampark-spark-core/src/main/resources/spark-rpcdemo
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if [ -z "${SPARK_HOME}" ]; then
- # shellcheck disable=SC1090
- source "$(dirname "$0")"/find-spark-home
-fi
-
-# disable randomized hash for string in Python 3.3+
-export PYTHONHASHSEED=0
-
-exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.rpcDemoCli "$@"
diff --git
a/streampark-spark/streampark-spark-core/src/main/resources/start-firemonitor-server.sh
b/streampark-spark/streampark-spark-core/src/main/resources/start-firemonitor-server.sh
deleted file mode 100644
index bdd97498b..000000000
---
a/streampark-spark/streampark-spark-core/src/main/resources/start-firemonitor-server.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts the history server on the machine this script is executed on.
-#
-# Usage: start-history-server.sh
-#
-# Use the SPARK_HISTORY_OPTS environment variable to set history server
configuration.
-#
-
-if [ -z "${SPARK_HOME}" ]; then
- # shellcheck disable=SC2155
- export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/sbin/spark-config.sh"
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh start
org.apache.spark.YarnAppMonitorSer 1 "$@"
diff --git
a/streampark-spark/streampark-spark-core/src/main/resources/start-rpcdemo-server.sh
b/streampark-spark/streampark-spark-core/src/main/resources/start-rpcdemo-server.sh
deleted file mode 100644
index 5fc46fc48..000000000
---
a/streampark-spark/streampark-spark-core/src/main/resources/start-rpcdemo-server.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts the history server on the machine this script is executed on.
-#
-# Usage: start-history-server.sh
-#
-# Use the SPARK_HISTORY_OPTS environment variable to set history server
configuration.
-#
-
-if [ -z "${SPARK_HOME}" ]; then
- # shellcheck disable=SC2155
- export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/sbin/spark-config.sh"
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.rpcDemo 1 "$@"
diff --git
a/streampark-spark/streampark-spark-core/src/main/resources/stop-firemonitor-server.sh
b/streampark-spark/streampark-spark-core/src/main/resources/stop-firemonitor-server.sh
deleted file mode 100644
index 362cb9d02..000000000
---
a/streampark-spark/streampark-spark-core/src/main/resources/stop-firemonitor-server.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts the history server on the machine this script is executed on.
-#
-# Usage: start-history-server.sh
-#
-# Use the SPARK_HISTORY_OPTS environment variable to set history server
configuration.
-#
-
-if [ -z "${SPARK_HOME}" ]; then
- # shellcheck disable=SC2155
- export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/sbin/spark-config.sh"
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh stop
org.apache.spark.YarnAppMonitorSer 1
diff --git
a/streampark-spark/streampark-spark-core/src/main/resources/stop-rpcdemo-server.sh
b/streampark-spark/streampark-spark-core/src/main/resources/stop-rpcdemo-server.sh
deleted file mode 100644
index d7d18346f..000000000
---
a/streampark-spark/streampark-spark-core/src/main/resources/stop-rpcdemo-server.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts the history server on the machine this script is executed on.
-#
-# Usage: start-history-server.sh
-#
-# Use the SPARK_HISTORY_OPTS environment variable to set history server
configuration.
-#
-
-if [ -z "${SPARK_HOME}" ]; then
- # shellcheck disable=SC2155
- export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/sbin/spark-config.sh"
-# shellcheck disable=SC1090
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.spark.rpcDemo 1
diff --git
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
index 58800dec4..f1efef788 100644
---
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
+++
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
@@ -19,12 +19,10 @@ package org.apache.streampark.spark.core
import scala.annotation.meta.getter
import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
-
import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.util.{PropertiesUtils, SystemPropertyUtils}
+import org.apache.streampark.common.util.{Logger, PropertiesUtils}
/**
* <b><code>Spark</code></b>
@@ -32,14 +30,15 @@ import org.apache.streampark.common.util.{PropertiesUtils,
SystemPropertyUtils}
* Spark Basic Traits
* <p/>
*/
-trait Spark {
+trait Spark extends Logger {
- @(transient @getter)
+ @(transient@getter)
final protected lazy val sparkConf: SparkConf = new SparkConf()
- @(transient @getter)
- final protected val sparkListeners = new ArrayBuffer[String]()
- @(transient @getter)
+ @(transient@getter)
+ private[this] final val sparkListeners = new ArrayBuffer[String]()
+
+ @(transient@getter)
final protected var sparkSession: SparkSession = _
// Directory of checkpoint
@@ -52,12 +51,35 @@ trait Spark {
* Entrance
*/
def main(args: Array[String]): Unit = {
+
init(args)
+
config(sparkConf)
- sparkSession = sparkConf.get("spark.enable.hive.support",
"false").toLowerCase match {
- case "true" =>
SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
- case "false" => SparkSession.builder().config(sparkConf).getOrCreate()
+
+ // 1) system.properties
+ val sysProps = sparkConf.getAllWithPrefix("spark.config.system.properties")
+ if (sysProps != null) {
+ sysProps.foreach(x => {
+ System.getProperties.setProperty(x._1.drop(1), x._2)
+ })
+ }
+
+ val builder = SparkSession.builder().config(sparkConf)
+ val enableHive = sparkConf.getBoolean("spark.config.enable.hive.support",
defaultValue = false)
+ if (enableHive) {
+ builder.enableHiveSupport()
+ }
+
+ sparkSession = builder.getOrCreate()
+
+ // 2) hive
+ val sparkSql = sparkConf.getAllWithPrefix("spark.config.spark.sql")
+ if (sparkSql != null) {
+ sparkSql.foreach(x => {
+ sparkSession.sparkContext.getConf.set(x._1.drop(1), x._2)
+ })
}
+
ready()
handle()
start()
@@ -67,39 +89,37 @@ trait Spark {
/**
* Initialize sparkConf according to user parameters
*/
- final def init(args: Array[String]): Unit = {
+ private final def init(args: Array[String]): Unit = {
+
+ logDebug("init application config ....")
var argv = args.toList
+ var conf: String = null
+
while (argv.nonEmpty) {
argv match {
- case ("--checkpoint") :: value :: tail =>
+ case "--conf" :: value :: tail =>
+ conf = value
+ argv = tail
+ case "--checkpoint" :: value :: tail =>
checkpoint = value
argv = tail
- case ("--createOnError") :: value :: tail =>
+ case "--createOnError" :: value :: tail =>
createOnError = value.toBoolean
argv = tail
case Nil =>
case tail =>
- // scalastyle:off println
- System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
+ logError(s"Unrecognized options: ${tail.mkString(" ")}")
printUsageAndExit()
}
}
- sparkConf.set(KEY_SPARK_USER_ARGS, args.mkString("|"))
-
- // The default configuration file passed in through vm -Dspark.debug.conf
is used as local debugging mode
- val (isDebug, confPath) = SystemPropertyUtils.get(KEY_SPARK_CONF, "")
match {
- case "" => (true, sparkConf.get(KEY_SPARK_DEBUG_CONF))
- case path => (false, path)
- case _ => throw new IllegalArgumentException("[StreamPark]
Usage:properties-file error")
- }
-
- val localConf = confPath.split("\\.").last match {
- case "properties" => PropertiesUtils.fromPropertiesFile(confPath)
- case "yaml" | "yml" => PropertiesUtils.fromYamlFile(confPath)
- case _ => throw new IllegalArgumentException("[StreamPark]
Usage:properties-file format error,must be properties or yml")
+ val localConf = conf.split("\\.").last match {
+ case "conf" => PropertiesUtils.fromHoconFile(conf)
+ case "properties" => PropertiesUtils.fromPropertiesFile(conf)
+ case "yaml" | "yml" => PropertiesUtils.fromYamlFile(conf)
+ case _ => throw new IllegalArgumentException("[StreamPark] Usage: config
file error,must be [properties|yaml|conf]")
}
localConf.foreach(x => sparkConf.set(x._1, x._2))
@@ -107,19 +127,19 @@ trait Spark {
val (appMain, appName) = sparkConf.get(KEY_SPARK_MAIN_CLASS, null) match {
case null | "" => (null, null)
case other => sparkConf.get(KEY_SPARK_APP_NAME, null) match {
- case null | "" => (other, other)
- case name => (other, name)
- }
+ case null | "" => (other, other)
+ case name => (other, name)
+ }
}
if (appMain == null) {
- // scalastyle:off println
- System.err.println(s"[StreamPark] $KEY_SPARK_MAIN_CLASS must not be
empty!")
+ logError(s"[StreamPark] $KEY_SPARK_MAIN_CLASS must not be empty!")
System.exit(1)
}
// debug mode
- if (isDebug) {
+ val localMode = sparkConf.get("spark.master", null) == "local"
+ if (localMode) {
sparkConf.setAppName(s"[LocalDebug] $appName").setMaster("local[*]")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
}
@@ -127,23 +147,25 @@ trait Spark {
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val extraListeners = sparkListeners.mkString(",") + "," +
sparkConf.get("spark.extraListeners", "")
- if (extraListeners != "") sparkConf.set("spark.extraListeners",
extraListeners)
+ if (extraListeners != "") {
+ sparkConf.set("spark.extraListeners", extraListeners)
+ }
}
/**
* The purpose of the config phase is to allow the developer to set more
parameters (other than the agreed
* configuration file) by means of hooks.
* Such as,
- * conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
- * conf.registerKryoClasses(Array(classOf[User], classOf[Order],...))
+ * conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ * conf.registerKryoClasses(Array(classOf[User], classOf[Order],...))
*/
- def config(sparkConf: SparkConf): Unit
+ def config(sparkConf: SparkConf): Unit = {}
/**
* The ready phase is an entry point for the developer to do other actions
after the parameters have been set,
* and is done after initialization and before the program starts.
*/
- def ready(): Unit
+ def ready(): Unit = {}
/**
* The handle phase is the entry point to the code written by the developer
and is the most important phase.
@@ -153,7 +175,7 @@ trait Spark {
/**
* The start phase starts the task, which is executed automatically by the
framework.
*/
- def start(): Unit
+ def start(): Unit = {}
/**
* The destroy phase, is the last phase before jvm exits after the program
has finished running,
@@ -164,9 +186,8 @@ trait Spark {
/**
* printUsageAndExit
*/
- def printUsageAndExit(): Unit = {
- // scalastyle:off println
- System.err.println(
+ private[this] def printUsageAndExit(): Unit = {
+ logError(
"""
|"Usage: Streaming [options]
|
diff --git
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/SparkBatch.scala
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/SparkBatch.scala
index 8a6e4ea8b..b6a90b705 100644
---
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/SparkBatch.scala
+++
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/SparkBatch.scala
@@ -19,8 +19,7 @@ package org.apache.streampark.spark.core
import scala.annotation.meta.getter
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.SparkContext
/**
* <b><code>SparkBatch</code></b>
@@ -37,4 +36,5 @@ trait SparkBatch extends Spark {
override def destroy(): Unit = {
context.stop()
}
+
}