This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 41b6a551a [Improve] Move FlinkStreamingInitializer from every version
to the base module (#1760)
41b6a551a is described below
commit 41b6a551a251835d0ecd3d7e8561475160c6288d
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Oct 7 23:54:08 2022 +0800
[Improve] Move FlinkStreamingInitializer from every version to the base
module (#1760)
---
.../streampark/common/conf/ConfigConst.scala | 3 -
.../streampark-flink-shims-base/pom.xml | 6 +
.../flink/core/FlinkStreamingInitializer.scala | 17 +-
.../streampark-flink-shims_flink-1.12/pom.xml | 6 -
.../flink/core/FlinkStreamingInitializer.scala | 190 ---------------------
.../streampark-flink-shims_flink-1.13/pom.xml | 6 -
.../flink/core/FlinkStreamingInitializer.scala | 184 --------------------
.../streampark-flink-shims_flink-1.14/pom.xml | 6 -
.../streampark-flink-shims_flink-1.15/pom.xml | 6 -
.../flink/core/FlinkStreamingInitializer.scala | 183 --------------------
.../flink/core/FlinkTableInitializer.scala | 6 +-
11 files changed, 12 insertions(+), 601 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 89bf81cd0..3a500b258 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -116,9 +116,6 @@ object ConfigConst {
val KEY_EXECUTION_RUNTIME_MODE = "flink.execution.runtime-mode"
- // ---watermark---
- val KEY_FLINK_WATERMARK_TIME_CHARACTERISTIC =
"flink.watermark.time.characteristic"
-
// ---table---
val KEY_FLINK_TABLE_PLANNER = "flink.table.planner"
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
index 63ef6ce1b..83ad311db 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
@@ -97,6 +97,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
similarity index 88%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8991f1d77..abb5301ca 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -17,29 +17,20 @@
package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.{ApiType, CheckpointStorage,
RestartStrategy, StateBackend => XStateBackend}
import org.apache.streampark.common.util._
+
import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.common.time.Time
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, CoreOptions}
-import
org.apache.flink.contrib.streaming.state.{DefaultConfigurableOptionsFactory,
EmbeddedRocksDBStateBackend}
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
-import org.apache.flink.runtime.state.storage.{FileSystemCheckpointStorage,
JobManagerCheckpointStorage}
-import org.apache.flink.streaming.api.CheckpointingMode
-import org.apache.flink.streaming.api.environment.CheckpointConfig
-import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
+import org.apache.flink.configuration.CoreOptions
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableConfig
import java.io.File
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JavaHashMap}
import collection.JavaConversions._
import collection.Map
-import util.{Failure, Success, Try}
+import util.Try
private[flink] object FlinkStreamingInitializer {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/pom.xml
index faa76f8b3..4057e5c76 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/pom.xml
@@ -84,12 +84,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <optional>true</optional>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
deleted file mode 100644
index 305018f68..000000000
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.{ApiType, RestartStrategy,
StateBackend => XStateBackend}
-import org.apache.streampark.common.util._
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, CoreOptions}
-import
org.apache.flink.contrib.streaming.state.{DefaultConfigurableOptionsFactory,
RocksDBStateBackend}
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
-import org.apache.flink.runtime.state.memory.MemoryStateBackend
-import org.apache.flink.streaming.api.environment.CheckpointConfig
-import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
-import org.apache.flink.table.api.TableConfig
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JavaHashMap}
-import collection.JavaConversions._
-import collection.Map
-import util.Try
-
-private[flink] object FlinkStreamingInitializer {
-
- private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
- def initialize(args: Array[String],
- config: (StreamExecutionEnvironment, ParameterTool) => Unit):
- (ParameterTool, StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
- flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
- }
-
- def initialize(args: StreamEnvConfig): (ParameterTool,
StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args.args,
ApiType.java)
- flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
- }
-}
-
-
-private[flink] class FlinkStreamingInitializer(args: Array[String], apiType:
ApiType) extends Logger {
-
- var streamEnvConfFunc: (StreamExecutionEnvironment, ParameterTool) => Unit =
_
-
- var tableConfFunc: (TableConfig, ParameterTool) => Unit = _
-
- var javaStreamEnvConfFunc: StreamEnvConfigFunction = _
-
- var javaTableEnvConfFunc: TableEnvConfigFunction = _
-
- lazy val parameter: ParameterTool = initParameter()
-
- private[this] var localStreamEnv: StreamExecutionEnvironment = _
-
- private[this] lazy val defaultFlinkConf: Map[String, String] = {
- parameter.get(KEY_FLINK_CONF(), null) match {
- case null =>
- // start with script
- val flinkHome = System.getenv("FLINK_HOME")
- require(flinkHome != null, "FLINK_HOME not found.")
- logInfo(s"flinkHome: $flinkHome")
- val yaml = new File(s"$flinkHome/conf/flink-conf.yaml")
- PropertiesUtils.loadFlinkConfYaml(yaml)
- case flinkConf =>
- // passed in from streampark console backend
- PropertiesUtils.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
- }
- }
-
- def readFlinkConf(config: String): Map[String, String] = {
- val extension = config.split("\\.").last.toLowerCase
-
- val map = config match {
- case x if x.startsWith("yaml://") =>
- PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
- case x if x.startsWith("prop://") =>
-
PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
- case x if x.startsWith("hdfs://") =>
- /**
- * If the config file is hdfs mode, needs copy the hdfs related
configuration file to `resources` dir
- */
- val text = HdfsUtils.read(x)
- extension match {
- case "properties" => PropertiesUtils.fromPropertiesText(text)
- case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
- case _ => throw new IllegalArgumentException("[StreamPark]
Usage:flink.conf file error,must be properties or yml")
- }
- case _ =>
- val configFile = new File(config)
- require(configFile.exists(), s"[StreamPark] Usage:flink.conf file
$configFile is not found!!!")
- extension match {
- case "properties" =>
PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
- case "yml" | "yaml" =>
PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
- case _ => throw new IllegalArgumentException("[StreamPark]
Usage:flink.conf file error,must be properties or yml")
- }
- }
-
- map
- .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
- .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
- }
-
- def initParameter(): ParameterTool = {
- val argsMap = ParameterTool.fromArgs(args)
- val config = argsMap.get(KEY_APP_CONF(), null) match {
- // scalastyle:off throwerror
- case null | "" => throw new ExceptionInInitializerError("[StreamPark]
Usage:can't fond config,please set \"--conf $path \" in main arguments")
- // scalastyle:on throwerror
- case file => file
- }
- val configArgs = readFlinkConf(config)
- // config priority: explicitly specified priority > project profiles >
system profiles
-
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
- }
-
- def streamEnvironment: StreamExecutionEnvironment = {
- if (localStreamEnv == null) {
- this.synchronized {
- if (localStreamEnv == null) {
- initEnvironment()
- }
- }
- }
- localStreamEnv
- }
-
- def initEnvironment(): Unit = {
- localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
- Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
-
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
- } match {
- case p if p > 0 => localStreamEnv.setParallelism(p)
- case _ => throw new IllegalArgumentException("[StreamPark] parallelism
must be > 0. ")
- }
-
- // Compatible with 1.12 and previous versions (TimeCharacteristic
deprecated in version 1.12)
- if (classOf[TimeCharacteristic].getDeclaredAnnotation(classOf[Deprecated])
== null) {
- val timeCharacteristic =
Try(TimeCharacteristic.valueOf(parameter.get(KEY_FLINK_WATERMARK_TIME_CHARACTERISTIC))).getOrElse(TimeCharacteristic.ProcessingTime)
- localStreamEnv.setStreamTimeCharacteristic(timeCharacteristic)
- }
-
- val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
- localStreamEnv.setRuntimeMode(executionMode)
-
- apiType match {
- case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
- case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, parameter)
- case _ =>
- }
- localStreamEnv.getConfig.setGlobalJobParameters(parameter)
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/pom.xml
index ae7f4def8..746af6472 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/pom.xml
@@ -83,12 +83,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <optional>true</optional>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
deleted file mode 100644
index 14aadd781..000000000
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.{ApiType, CheckpointStorage,
RestartStrategy, StateBackend => XStateBackend}
-import org.apache.streampark.common.util._
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, CoreOptions}
-import
org.apache.flink.contrib.streaming.state.{DefaultConfigurableOptionsFactory,
EmbeddedRocksDBStateBackend}
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
-import org.apache.flink.runtime.state.storage.{FileSystemCheckpointStorage,
JobManagerCheckpointStorage}
-import org.apache.flink.streaming.api.CheckpointingMode
-import org.apache.flink.streaming.api.environment.CheckpointConfig
-import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableConfig
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JavaHashMap}
-import collection.JavaConversions._
-import collection.Map
-import util.{Failure, Success, Try}
-
-private[flink] object FlinkStreamingInitializer {
-
- private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
- def initialize(args: Array[String],
- config: (StreamExecutionEnvironment, ParameterTool) => Unit):
- (ParameterTool, StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
- flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
- }
-
- def initialize(args: StreamEnvConfig): (ParameterTool,
StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args.args,
ApiType.java)
- flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
- }
-}
-
-
-private[flink] class FlinkStreamingInitializer(args: Array[String], apiType:
ApiType) extends Logger {
-
- var streamEnvConfFunc: (StreamExecutionEnvironment, ParameterTool) => Unit =
_
-
- var tableConfFunc: (TableConfig, ParameterTool) => Unit = _
-
- var javaStreamEnvConfFunc: StreamEnvConfigFunction = _
-
- var javaTableEnvConfFunc: TableEnvConfigFunction = _
-
- lazy val parameter: ParameterTool = initParameter()
-
- private[this] var localStreamEnv: StreamExecutionEnvironment = _
-
- private[this] lazy val defaultFlinkConf: Map[String, String] = {
- parameter.get(KEY_FLINK_CONF(), null) match {
- case null =>
- // start with script
- val flinkHome = System.getenv("FLINK_HOME")
- require(flinkHome != null, "FLINK_HOME not found.")
- logInfo(s"flinkHome: $flinkHome")
- val yaml = new File(s"$flinkHome/conf/flink-conf.yaml")
- PropertiesUtils.loadFlinkConfYaml(yaml)
- case flinkConf =>
- // passed in from streampark console backend
- PropertiesUtils.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
- }
- }
-
- def readFlinkConf(config: String): Map[String, String] = {
- val extension = config.split("\\.").last.toLowerCase
-
- val map = config match {
- case x if x.startsWith("yaml://") =>
- PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
- case x if x.startsWith("prop://") =>
-
PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
- case x if x.startsWith("hdfs://") =>
- /**
- * If the config file is hdfs mode, needs copy the hdfs related
configuration file to `resources` dir
- */
- val text = HdfsUtils.read(x)
- extension match {
- case "properties" => PropertiesUtils.fromPropertiesText(text)
- case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
- case _ => throw new IllegalArgumentException("[StreamPark]
Usage:flink.conf file error,must be properties or yml")
- }
- case _ =>
- val configFile = new File(config)
- require(configFile.exists(), s"[StreamPark] Usage:flink.conf file
$configFile is not found!!!")
- extension match {
- case "properties" =>
PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
- case "yml" | "yaml" =>
PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
- case _ => throw new IllegalArgumentException("[StreamPark]
Usage:flink.conf file error,must be properties or yml")
- }
- }
-
- map
- .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
- .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
- }
-
- def initParameter(): ParameterTool = {
- val argsMap = ParameterTool.fromArgs(args)
- val config = argsMap.get(KEY_APP_CONF(), null) match {
- // scalastyle:off throwerror
- case null | "" => throw new ExceptionInInitializerError("[StreamPark]
Usage:can't fond config,please set \"--conf $path \" in main arguments")
- // scalastyle:on throwerror
- case file => file
- }
- val configArgs = readFlinkConf(config)
- // config priority: explicitly specified priority > project profiles >
system profiles
-
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
- }
-
- def streamEnvironment: StreamExecutionEnvironment = {
- if (localStreamEnv == null) {
- this.synchronized {
- if (localStreamEnv == null) {
- initEnvironment()
- }
- }
- }
- localStreamEnv
- }
-
- def initEnvironment(): Unit = {
- localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
- Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
-
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
- } match {
- case p if p > 0 => localStreamEnv.setParallelism(p)
- case _ => throw new IllegalArgumentException("[StreamPark] parallelism
must be > 0. ")
- }
-
- val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
- localStreamEnv.setRuntimeMode(executionMode)
-
- apiType match {
- case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
- case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, parameter)
- case _ =>
- }
- localStreamEnv.getConfig.setGlobalJobParameters(parameter)
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/pom.xml
index 5cb491605..a2caf7c73 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/pom.xml
@@ -83,12 +83,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <optional>true</optional>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
index f82087c97..263806d8a 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
@@ -99,12 +99,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <optional>true</optional>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
deleted file mode 100644
index db8bed611..000000000
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.core
-
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.{ApiType, CheckpointStorage,
RestartStrategy, StateBackend => XStateBackend}
-import org.apache.streampark.common.util._
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.{Configuration, CoreOptions}
-import
org.apache.flink.contrib.streaming.state.{DefaultConfigurableOptionsFactory,
EmbeddedRocksDBStateBackend}
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
-import org.apache.flink.runtime.state.storage.{FileSystemCheckpointStorage,
JobManagerCheckpointStorage}
-import org.apache.flink.streaming.api.CheckpointingMode
-import org.apache.flink.streaming.api.environment.CheckpointConfig
-import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableConfig
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JavaHashMap}
-import scala.collection.JavaConversions._
-import scala.collection.Map
-import scala.util.{Failure, Success, Try}
-
-private[flink] object FlinkStreamingInitializer {
-
- private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
- def initialize(args: Array[String],
- config: (StreamExecutionEnvironment, ParameterTool) => Unit):
- (ParameterTool, StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
- flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
- }
-
- def initialize(args: StreamEnvConfig): (ParameterTool,
StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args.args,
ApiType.java)
- flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
- }
-}
-
-private[flink] class FlinkStreamingInitializer(args: Array[String], apiType:
ApiType) extends Logger {
-
- var streamEnvConfFunc: (StreamExecutionEnvironment, ParameterTool) => Unit =
_
-
- var tableConfFunc: (TableConfig, ParameterTool) => Unit = _
-
- var javaStreamEnvConfFunc: StreamEnvConfigFunction = _
-
- var javaTableEnvConfFunc: TableEnvConfigFunction = _
-
- lazy val parameter: ParameterTool = initParameter()
-
- private[this] var localStreamEnv: StreamExecutionEnvironment = _
-
- private[this] lazy val defaultFlinkConf: Map[String, String] = {
- parameter.get(KEY_FLINK_CONF(), null) match {
- case null =>
- val flinkHome = System.getenv("FLINK_HOME")
- require(flinkHome != null, "FLINK_HOME not found.")
- logInfo(s"flinkHome: $flinkHome")
- val yaml = new File(s"$flinkHome/conf/flink-conf.yaml")
- PropertiesUtils.loadFlinkConfYaml(yaml)
- case flinkConf =>
- PropertiesUtils.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
- }
- }
-
- def readFlinkConf(config: String): Map[String, String] = {
- val extension = config.split("\\.").last.toLowerCase
-
- val map = config match {
- case x if x.startsWith("yaml://") =>
- PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
- case x if x.startsWith("prop://") =>
-
PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
- case x if x.startsWith("hdfs://") =>
- /**
- * If the config file is hdfs mode, needs copy the hdfs related
configuration file to `resources` dir
- */
- val text = HdfsUtils.read(x)
- extension match {
- case "properties" => PropertiesUtils.fromPropertiesText(text)
- case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
- case _ => throw new IllegalArgumentException("[StreamPark]
Usage:flink.conf file error,must be properties or yml")
- }
- case _ =>
- val configFile = new File(config)
- require(configFile.exists(), s"[StreamPark] Usage:flink.conf file
$configFile is not found!!!")
- extension match {
- case "properties" =>
PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
- case "yml" | "yaml" =>
PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
- case _ => throw new IllegalArgumentException("[StreamPark]
Usage:flink.conf file error,must be properties or yml")
- }
- }
-
- map
- .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
- .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
- }
-
- def initParameter(): ParameterTool = {
- val argsMap = ParameterTool.fromArgs(args)
- val config = argsMap.get(KEY_APP_CONF(), null) match {
- // scalastyle:off throwerror
- case null | "" => throw new ExceptionInInitializerError("[StreamPark]
Usage:can't fond config,please set \"--conf $path \" in main arguments")
- // scalastyle:on throwerror
- case file => file
- }
- val configArgs = readFlinkConf(config)
- // config priority: explicitly specified priority > project profiles >
system profiles
-
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs))
- }
-
- def streamEnvironment: StreamExecutionEnvironment = {
- if (localStreamEnv == null) {
- this.synchronized {
- if (localStreamEnv == null) {
- initEnvironment()
- }
- }
- }
- localStreamEnv
- }
-
- def initEnvironment(): Unit = {
- localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
- Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
-
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
- } match {
- case p if p > 0 => localStreamEnv.setParallelism(p)
- case _ => throw new IllegalArgumentException("[StreamPark] parallelism
must be > 0. ")
- }
-
- val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
- localStreamEnv.setRuntimeMode(executionMode)
-
- apiType match {
- case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
- case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, parameter)
- case _ =>
- }
- localStreamEnv.getConfig.setGlobalJobParameters(parameter)
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index eb78a17b7..6a8207f5d 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,15 +17,13 @@
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF,
KEY_APP_NAME, KEY_EXECUTION_RUNTIME_MODE, KEY_FLINK_APP_NAME,
KEY_FLINK_PARALLELISM, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG,
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF,
KEY_APP_NAME, KEY_FLINK_APP_NAME, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG,
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
import org.apache.streampark.common.enums.{ApiType, TableMode}
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
-import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.{CoreOptions, PipelineOptions}
+import org.apache.flink.configuration.PipelineOptions
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableEnvironment}