This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 41b6a551a [Improve] Move FlinkStreamingInitializer from every version 
to the base module (#1760)
41b6a551a is described below

commit 41b6a551a251835d0ecd3d7e8561475160c6288d
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Oct 7 23:54:08 2022 +0800

    [Improve] Move FlinkStreamingInitializer from every version to the base 
module (#1760)
---
 .../streampark/common/conf/ConfigConst.scala       |   3 -
 .../streampark-flink-shims-base/pom.xml            |   6 +
 .../flink/core/FlinkStreamingInitializer.scala     |  17 +-
 .../streampark-flink-shims_flink-1.12/pom.xml      |   6 -
 .../flink/core/FlinkStreamingInitializer.scala     | 190 ---------------------
 .../streampark-flink-shims_flink-1.13/pom.xml      |   6 -
 .../flink/core/FlinkStreamingInitializer.scala     | 184 --------------------
 .../streampark-flink-shims_flink-1.14/pom.xml      |   6 -
 .../streampark-flink-shims_flink-1.15/pom.xml      |   6 -
 .../flink/core/FlinkStreamingInitializer.scala     | 183 --------------------
 .../flink/core/FlinkTableInitializer.scala         |   6 +-
 11 files changed, 12 insertions(+), 601 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 89bf81cd0..3a500b258 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -116,9 +116,6 @@ object ConfigConst {
 
   val KEY_EXECUTION_RUNTIME_MODE = "flink.execution.runtime-mode"
 
-  // ---watermark---
-  val KEY_FLINK_WATERMARK_TIME_CHARACTERISTIC = 
"flink.watermark.time.characteristic"
-
   // ---table---
   val KEY_FLINK_TABLE_PLANNER = "flink.table.planner"
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
index 63ef6ce1b..83ad311db 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/pom.xml
@@ -97,6 +97,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
similarity index 88%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8991f1d77..abb5301ca 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -17,29 +17,20 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.{ApiType, CheckpointStorage, 
RestartStrategy, StateBackend => XStateBackend}
 import org.apache.streampark.common.util._
+
 import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, CoreOptions}
-import 
org.apache.flink.contrib.streaming.state.{DefaultConfigurableOptionsFactory, 
EmbeddedRocksDBStateBackend}
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
-import org.apache.flink.runtime.state.storage.{FileSystemCheckpointStorage, 
JobManagerCheckpointStorage}
-import org.apache.flink.streaming.api.CheckpointingMode
-import org.apache.flink.streaming.api.environment.CheckpointConfig
-import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
+import org.apache.flink.configuration.CoreOptions
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableConfig
 
 import java.io.File
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JavaHashMap}
 import collection.JavaConversions._
 import collection.Map
-import util.{Failure, Success, Try}
+import util.Try
 
 private[flink] object FlinkStreamingInitializer {
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/pom.xml
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/pom.xml
index faa76f8b3..4057e5c76 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/pom.xml
@@ -84,12 +84,6 @@
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <optional>true</optional>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
deleted file mode 100644
index 305018f68..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.{ApiType, RestartStrategy, 
StateBackend => XStateBackend}
-import org.apache.streampark.common.util._
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, CoreOptions}
-import 
org.apache.flink.contrib.streaming.state.{DefaultConfigurableOptionsFactory, 
RocksDBStateBackend}
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
-import org.apache.flink.runtime.state.memory.MemoryStateBackend
-import org.apache.flink.streaming.api.environment.CheckpointConfig
-import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
-import org.apache.flink.table.api.TableConfig
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JavaHashMap}
-import collection.JavaConversions._
-import collection.Map
-import util.Try
-
-private[flink] object FlinkStreamingInitializer {
-
-  private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
-  def initialize(args: Array[String],
-                 config: (StreamExecutionEnvironment, ParameterTool) => Unit):
-  (ParameterTool, StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
-          flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
-  }
-
-  def initialize(args: StreamEnvConfig): (ParameterTool, 
StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args.args, 
ApiType.java)
-          flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
-  }
-}
-
-
-private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: 
ApiType) extends Logger {
-
-  var streamEnvConfFunc: (StreamExecutionEnvironment, ParameterTool) => Unit = 
_
-
-  var tableConfFunc: (TableConfig, ParameterTool) => Unit = _
-
-  var javaStreamEnvConfFunc: StreamEnvConfigFunction = _
-
-  var javaTableEnvConfFunc: TableEnvConfigFunction = _
-
-  lazy val parameter: ParameterTool = initParameter()
-
-  private[this] var localStreamEnv: StreamExecutionEnvironment = _
-
-  private[this] lazy val defaultFlinkConf: Map[String, String] = {
-    parameter.get(KEY_FLINK_CONF(), null) match {
-      case null =>
-        // start with script
-        val flinkHome = System.getenv("FLINK_HOME")
-        require(flinkHome != null, "FLINK_HOME not found.")
-        logInfo(s"flinkHome: $flinkHome")
-        val yaml = new File(s"$flinkHome/conf/flink-conf.yaml")
-        PropertiesUtils.loadFlinkConfYaml(yaml)
-      case flinkConf =>
-        // passed in from streampark console backend
-        PropertiesUtils.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
-    }
-  }
-
-  def readFlinkConf(config: String): Map[String, String] = {
-    val extension = config.split("\\.").last.toLowerCase
-
-    val map = config match {
-      case x if x.startsWith("yaml://") =>
-        PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
-      case x if x.startsWith("prop://") =>
-        
PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
-      case x if x.startsWith("hdfs://") =>
-        /**
-         * If the config file is hdfs mode, needs copy the hdfs related 
configuration file to `resources` dir
-         */
-        val text = HdfsUtils.read(x)
-        extension match {
-          case "properties" => PropertiesUtils.fromPropertiesText(text)
-          case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
-          case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:flink.conf file error,must be properties or yml")
-        }
-      case _ =>
-        val configFile = new File(config)
-        require(configFile.exists(), s"[StreamPark] Usage:flink.conf file 
$configFile is not found!!!")
-        extension match {
-          case "properties" => 
PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
-          case "yml" | "yaml" => 
PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
-          case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:flink.conf file error,must be properties or yml")
-        }
-    }
-
-    map
-      .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
-      .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
-  }
-
-  def initParameter(): ParameterTool = {
-    val argsMap = ParameterTool.fromArgs(args)
-    val config = argsMap.get(KEY_APP_CONF(), null) match {
-      // scalastyle:off throwerror
-      case null | "" => throw new ExceptionInInitializerError("[StreamPark] 
Usage:can't fond config,please set \"--conf $path \" in main arguments")
-      // scalastyle:on throwerror
-      case file => file
-    }
-    val configArgs = readFlinkConf(config)
-    // config priority: explicitly specified priority > project profiles > 
system profiles
-    
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
-  }
-
-  def streamEnvironment: StreamExecutionEnvironment = {
-    if (localStreamEnv == null) {
-      this.synchronized {
-        if (localStreamEnv == null) {
-          initEnvironment()
-        }
-      }
-    }
-    localStreamEnv
-  }
-
-  def initEnvironment(): Unit = {
-    localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
-    Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
-      
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
-    } match {
-      case p if p > 0 => localStreamEnv.setParallelism(p)
-      case _ => throw new IllegalArgumentException("[StreamPark] parallelism 
must be > 0. ")
-    }
-
-    // Compatible with 1.12 and previous versions (TimeCharacteristic 
deprecated in version 1.12)
-    if (classOf[TimeCharacteristic].getDeclaredAnnotation(classOf[Deprecated]) 
== null) {
-      val timeCharacteristic = 
Try(TimeCharacteristic.valueOf(parameter.get(KEY_FLINK_WATERMARK_TIME_CHARACTERISTIC))).getOrElse(TimeCharacteristic.ProcessingTime)
-      localStreamEnv.setStreamTimeCharacteristic(timeCharacteristic)
-    }
-
-    val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
-    localStreamEnv.setRuntimeMode(executionMode)
-
-    apiType match {
-      case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
-      case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, parameter)
-      case _ =>
-    }
-    localStreamEnv.getConfig.setGlobalJobParameters(parameter)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/pom.xml
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/pom.xml
index ae7f4def8..746af6472 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/pom.xml
@@ -83,12 +83,6 @@
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <optional>true</optional>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
deleted file mode 100644
index 14aadd781..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.{ApiType, CheckpointStorage, 
RestartStrategy, StateBackend => XStateBackend}
-import org.apache.streampark.common.util._
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, CoreOptions}
-import 
org.apache.flink.contrib.streaming.state.{DefaultConfigurableOptionsFactory, 
EmbeddedRocksDBStateBackend}
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
-import org.apache.flink.runtime.state.storage.{FileSystemCheckpointStorage, 
JobManagerCheckpointStorage}
-import org.apache.flink.streaming.api.CheckpointingMode
-import org.apache.flink.streaming.api.environment.CheckpointConfig
-import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableConfig
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JavaHashMap}
-import collection.JavaConversions._
-import collection.Map
-import util.{Failure, Success, Try}
-
-private[flink] object FlinkStreamingInitializer {
-
-  private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
-  def initialize(args: Array[String],
-                 config: (StreamExecutionEnvironment, ParameterTool) => Unit):
-  (ParameterTool, StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
-          flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
-  }
-
-  def initialize(args: StreamEnvConfig): (ParameterTool, 
StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args.args, 
ApiType.java)
-          flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
-  }
-}
-
-
-private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: 
ApiType) extends Logger {
-
-  var streamEnvConfFunc: (StreamExecutionEnvironment, ParameterTool) => Unit = 
_
-
-  var tableConfFunc: (TableConfig, ParameterTool) => Unit = _
-
-  var javaStreamEnvConfFunc: StreamEnvConfigFunction = _
-
-  var javaTableEnvConfFunc: TableEnvConfigFunction = _
-
-  lazy val parameter: ParameterTool = initParameter()
-
-  private[this] var localStreamEnv: StreamExecutionEnvironment = _
-
-  private[this] lazy val defaultFlinkConf: Map[String, String] = {
-    parameter.get(KEY_FLINK_CONF(), null) match {
-      case null =>
-        // start with script
-        val flinkHome = System.getenv("FLINK_HOME")
-        require(flinkHome != null, "FLINK_HOME not found.")
-        logInfo(s"flinkHome: $flinkHome")
-        val yaml = new File(s"$flinkHome/conf/flink-conf.yaml")
-        PropertiesUtils.loadFlinkConfYaml(yaml)
-      case flinkConf =>
-        // passed in from streampark console backend
-        PropertiesUtils.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
-    }
-  }
-
-  def readFlinkConf(config: String): Map[String, String] = {
-    val extension = config.split("\\.").last.toLowerCase
-
-    val map = config match {
-      case x if x.startsWith("yaml://") =>
-        PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
-      case x if x.startsWith("prop://") =>
-        
PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
-      case x if x.startsWith("hdfs://") =>
-        /**
-         * If the config file is hdfs mode, needs copy the hdfs related 
configuration file to `resources` dir
-         */
-        val text = HdfsUtils.read(x)
-        extension match {
-          case "properties" => PropertiesUtils.fromPropertiesText(text)
-          case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
-          case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:flink.conf file error,must be properties or yml")
-        }
-      case _ =>
-        val configFile = new File(config)
-        require(configFile.exists(), s"[StreamPark] Usage:flink.conf file 
$configFile is not found!!!")
-        extension match {
-          case "properties" => 
PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
-          case "yml" | "yaml" => 
PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
-          case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:flink.conf file error,must be properties or yml")
-        }
-    }
-
-    map
-      .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
-      .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
-  }
-
-  def initParameter(): ParameterTool = {
-    val argsMap = ParameterTool.fromArgs(args)
-    val config = argsMap.get(KEY_APP_CONF(), null) match {
-      // scalastyle:off throwerror
-      case null | "" => throw new ExceptionInInitializerError("[StreamPark] 
Usage:can't fond config,please set \"--conf $path \" in main arguments")
-      // scalastyle:on throwerror
-      case file => file
-    }
-    val configArgs = readFlinkConf(config)
-    // config priority: explicitly specified priority > project profiles > 
system profiles
-    
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
-  }
-
-  def streamEnvironment: StreamExecutionEnvironment = {
-    if (localStreamEnv == null) {
-      this.synchronized {
-        if (localStreamEnv == null) {
-          initEnvironment()
-        }
-      }
-    }
-    localStreamEnv
-  }
-
-  def initEnvironment(): Unit = {
-    localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
-    Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
-      
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
-    } match {
-      case p if p > 0 => localStreamEnv.setParallelism(p)
-      case _ => throw new IllegalArgumentException("[StreamPark] parallelism 
must be > 0. ")
-    }
-
-    val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
-    localStreamEnv.setRuntimeMode(executionMode)
-
-    apiType match {
-      case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
-      case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, parameter)
-      case _ =>
-    }
-    localStreamEnv.getConfig.setGlobalJobParameters(parameter)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/pom.xml
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/pom.xml
index 5cb491605..a2caf7c73 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/pom.xml
@@ -83,12 +83,6 @@
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <optional>true</optional>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
index f82087c97..263806d8a 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
@@ -99,12 +99,6 @@
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <optional>true</optional>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-kubernetes</artifactId>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
deleted file mode 100644
index db8bed611..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.core
-
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.{ApiType, CheckpointStorage, 
RestartStrategy, StateBackend => XStateBackend}
-import org.apache.streampark.common.util._
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.{Configuration, CoreOptions}
-import 
org.apache.flink.contrib.streaming.state.{DefaultConfigurableOptionsFactory, 
EmbeddedRocksDBStateBackend}
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
-import org.apache.flink.runtime.state.storage.{FileSystemCheckpointStorage, 
JobManagerCheckpointStorage}
-import org.apache.flink.streaming.api.CheckpointingMode
-import org.apache.flink.streaming.api.environment.CheckpointConfig
-import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableConfig
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JavaHashMap}
-import scala.collection.JavaConversions._
-import scala.collection.Map
-import scala.util.{Failure, Success, Try}
-
-private[flink] object FlinkStreamingInitializer {
-
-  private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
-  def initialize(args: Array[String],
-                 config: (StreamExecutionEnvironment, ParameterTool) => Unit):
-  (ParameterTool, StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
-          flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
-  }
-
-  def initialize(args: StreamEnvConfig): (ParameterTool, 
StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args.args, 
ApiType.java)
-          flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
-  }
-}
-
-private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: 
ApiType) extends Logger {
-
-  var streamEnvConfFunc: (StreamExecutionEnvironment, ParameterTool) => Unit = 
_
-
-  var tableConfFunc: (TableConfig, ParameterTool) => Unit = _
-
-  var javaStreamEnvConfFunc: StreamEnvConfigFunction = _
-
-  var javaTableEnvConfFunc: TableEnvConfigFunction = _
-
-  lazy val parameter: ParameterTool = initParameter()
-
-  private[this] var localStreamEnv: StreamExecutionEnvironment = _
-
-  private[this] lazy val defaultFlinkConf: Map[String, String] = {
-    parameter.get(KEY_FLINK_CONF(), null) match {
-      case null =>
-        val flinkHome = System.getenv("FLINK_HOME")
-        require(flinkHome != null, "FLINK_HOME not found.")
-        logInfo(s"flinkHome: $flinkHome")
-        val yaml = new File(s"$flinkHome/conf/flink-conf.yaml")
-        PropertiesUtils.loadFlinkConfYaml(yaml)
-      case flinkConf =>
-        PropertiesUtils.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
-    }
-  }
-
-  def readFlinkConf(config: String): Map[String, String] = {
-    val extension = config.split("\\.").last.toLowerCase
-
-    val map = config match {
-      case x if x.startsWith("yaml://") =>
-        PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
-      case x if x.startsWith("prop://") =>
-        
PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
-      case x if x.startsWith("hdfs://") =>
-        /**
-         * If the config file is hdfs mode, needs copy the hdfs related 
configuration file to `resources` dir
-         */
-        val text = HdfsUtils.read(x)
-        extension match {
-          case "properties" => PropertiesUtils.fromPropertiesText(text)
-          case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
-          case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:flink.conf file error,must be properties or yml")
-        }
-      case _ =>
-        val configFile = new File(config)
-        require(configFile.exists(), s"[StreamPark] Usage:flink.conf file 
$configFile is not found!!!")
-        extension match {
-          case "properties" => 
PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
-          case "yml" | "yaml" => 
PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
-          case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:flink.conf file error,must be properties or yml")
-        }
-    }
-
-    map
-      .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
-      .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
-  }
-
-  def initParameter(): ParameterTool = {
-    val argsMap = ParameterTool.fromArgs(args)
-    val config = argsMap.get(KEY_APP_CONF(), null) match {
-      // scalastyle:off throwerror
-      case null | "" => throw new ExceptionInInitializerError("[StreamPark] 
Usage:can't fond config,please set \"--conf $path \" in main arguments")
-      // scalastyle:on throwerror
-      case file => file
-    }
-    val configArgs = readFlinkConf(config)
-    // config priority: explicitly specified priority > project profiles > 
system profiles
-    
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs))
-  }
-
-  def streamEnvironment: StreamExecutionEnvironment = {
-    if (localStreamEnv == null) {
-      this.synchronized {
-        if (localStreamEnv == null) {
-          initEnvironment()
-        }
-      }
-    }
-    localStreamEnv
-  }
-
-  def initEnvironment(): Unit = {
-    localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
-    Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
-      
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
-    } match {
-      case p if p > 0 => localStreamEnv.setParallelism(p)
-      case _ => throw new IllegalArgumentException("[StreamPark] parallelism 
must be > 0. ")
-    }
-
-    val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
-    localStreamEnv.setRuntimeMode(executionMode)
-
-    apiType match {
-      case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
-      case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, parameter)
-      case _ =>
-    }
-    localStreamEnv.getConfig.setGlobalJobParameters(parameter)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index eb78a17b7..6a8207f5d 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,15 +17,13 @@
 
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, 
KEY_APP_NAME, KEY_EXECUTION_RUNTIME_MODE, KEY_FLINK_APP_NAME, 
KEY_FLINK_PARALLELISM, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG, 
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, 
KEY_APP_NAME, KEY_FLINK_APP_NAME, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG, 
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
 import org.apache.streampark.common.enums.{ApiType, TableMode}
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
-import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.{CoreOptions, PipelineOptions}
+import org.apache.flink.configuration.PipelineOptions
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}


Reply via email to