This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch flink-conf
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit ba12778d041b866241ce03709c5ff4c05a4748a1
Author: benjobs <[email protected]>
AuthorDate: Fri Sep 13 08:22:40 2024 +0800

    [Improve] custom-code job read conf from jar support
---
 .../apache/streampark/common/util/FileUtils.scala  |  16 +++
 .../impl/KubernetesNativeSessionClient.scala       |   2 +-
 .../flink/client/impl/YarnPerJobClient.scala       |   2 +-
 .../flink/client/trait/FlinkClientTrait.scala      |   6 +-
 .../streampark/flink/core/EnhancerImplicit.scala   |   2 +-
 .../flink/core/{conf => }/FlinkConfiguration.scala |   4 +-
 .../streampark/flink/core/FlinkSqlExecutor.scala   |   2 +-
 .../streampark/flink/core/FlinkSqlValidator.scala  |   2 +-
 .../flink/core/FlinkStreamTableTrait.scala         |   2 +-
 .../flink/core/FlinkStreamingInitializer.scala     | 107 ++++++++++++---------
 .../flink/core/FlinkTableInitializer.scala         |  63 ++++++------
 .../streampark/flink/core/FlinkTableTrait.scala    |   2 +-
 .../streampark/flink/core/SqlCommandParser.scala   |   2 +-
 .../{core => deployment}/FlinkClientTrait.scala    |   2 +-
 .../FlinkKubernetesClientTrait.scala               |   2 +-
 .../YarnClusterDescriptorTrait.scala               |   3 +-
 .../flink/core/YarnClusterDescriptorWrapper.scala  |  22 -----
 .../flink/deployment}/FlinkClusterClient.scala     |   2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |   2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |   2 +-
 .../flink/deployment}/FlinkClusterClient.scala     |   2 +-
 .../FlinkKubernetesClient.scala                    |   3 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |   2 +-
 .../flink/deployment}/FlinkClusterClient.scala     |   2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |   2 +-
 .../YarnClusterDescriptorWrapper.scala             |   2 +-
 .../{core => deployment}/FlinkClusterClient.scala  |   2 +-
 .../FlinkKubernetesClient.scala                    |   2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |   2 +-
 .../flink/deployment}/FlinkClusterClient.scala     |   2 +-
 .../FlinkKubernetesClient.scala                    |   2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |   2 +-
 .../flink/deployment}/FlinkClusterClient.scala     |   2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |   2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |   2 +-
 .../flink/deployment}/FlinkClusterClient.scala     |   2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |   4 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |   2 +-
 .../streampark/flink/core/FlinkClusterClient.scala |  49 ----------
 .../flink/deployment}/FlinkClusterClient.scala     |   2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |   2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |   5 +-
 .../streampark/flink/core/FlinkClusterClient.scala |  49 ----------
 .../flink/core/FlinkKubernetesClient.scala         |  31 ------
 .../streampark/flink/core/StreamTableContext.scala |   2 +
 .../streampark/flink/core/TableContext.scala       |   2 +
 .../flink/deployment}/FlinkClusterClient.scala     |   2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |   2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |   2 +-
 49 files changed, 158 insertions(+), 276 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 77618fbd4..d3eed9d2f 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -170,4 +170,20 @@ object FileUtils {
     buffer.toString()
   }
 
+  @throws[IOException]
+  def readString(in: InputStream): String = {
+    require(in != null)
+    val scanner = new Scanner(in)
+    val buffer = new mutable.StringBuilder()
+    if (scanner.hasNextLine) {
+      buffer.append(scanner.nextLine())
+    }
+    while (scanner.hasNextLine) {
+      buffer.append("\r\n")
+      buffer.append(scanner.nextLine())
+    }
+    Utils.close(scanner)
+    buffer.toString()
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index c9f7cd2a5..4bd37db9a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.util.{Logger, Utils}
 import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
-import org.apache.streampark.flink.core.FlinkKubernetesClient
+import org.apache.streampark.flink.deployment.FlinkKubernetesClient
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 9d3b8c6f9..a685e861c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.client.impl
 
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.core.YarnClusterDescriptorWrapper
+import org.apache.streampark.flink.deployment.YarnClusterDescriptorWrapper
 import org.apache.streampark.flink.util.FlinkUtils
 
 import org.apache.flink.client.program.PackagedProgram
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index df61ee9d0..675b0ea08 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,12 +18,11 @@
 package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode}
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode}
 import org.apache.streampark.common.util.{DeflaterUtils, Logger, 
PropertiesUtils, Utils}
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
+import org.apache.streampark.flink.deployment.FlinkClusterClient
 
 import com.google.common.collect.Lists
 import org.apache.commons.cli.{CommandLine, Options}
@@ -36,7 +35,6 @@ import 
org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram, 
PackagedProgramUtils}
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
-import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import java.io.File
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index 528e47907..a42c0e669 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 
 import scala.util.Try
 
-object EnhancerImplicit {
+private[flink] object EnhancerImplicit {
 
   implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
similarity index 91%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
index f7aa97fb4..f519cfb64 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core.conf
+package org.apache.streampark.flink.core
 
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.Configuration
 
-case class FlinkConfiguration(
+private[flink] case class FlinkConfiguration(
     parameter: ParameterTool,
     envConfig: Configuration,
     tableConfig: Configuration)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 58b0f6747..e46c6449e 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -31,7 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import scala.collection.mutable
 import scala.util.Try
 
-object FlinkSqlExecutor extends Logger {
+private[flink] object FlinkSqlExecutor extends Logger {
 
   private[this] val lock = new ReentrantReadWriteLock().writeLock
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 70101672e..5841d5cd3 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -33,7 +33,7 @@ import 
org.apache.flink.table.planner.delegation.FlinkSqlParserFactories
 
 import scala.util.{Failure, Try}
 
-object FlinkSqlValidator extends Logger {
+private[flink] object FlinkSqlValidator extends Logger {
 
   private[this] val FLINK112_CALCITE_PARSER_CLASS =
     "org.apache.flink.table.planner.calcite.CalciteParser"
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index a50116925..b0a746005 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -17,8 +17,8 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.flink.core.EnhancerImplicit._
 
+import EnhancerImplicit._
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.api.common.{JobExecutionResult, RuntimeExecutionMode}
 import org.apache.flink.api.common.cache.DistributedCache
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index b3ee24c33..604551d05 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -20,10 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util._
-import org.apache.streampark.flink.core.conf.FlinkConfiguration
 
-import collection.{mutable, Map}
-import collection.JavaConversions._
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamEnv}
@@ -32,6 +29,10 @@ import org.apache.flink.table.api.TableConfig
 
 import java.io.File
 
+import scala.collection.{mutable, Map}
+import scala.collection.JavaConversions._
+import scala.util.Try
+
 private[flink] object FlinkStreamingInitializer {
 
   def initialize(args: Array[String], config: (StreamExecutionEnvironment, 
ParameterTool) => Unit)
@@ -79,61 +80,79 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
   lazy val configuration: FlinkConfiguration = initParameter()
 
   def initParameter(): FlinkConfiguration = {
-    val argsMap = ParameterTool.fromArgs(args)
-    val config = argsMap.get(KEY_APP_CONF(), null) match {
-      case null | "" =>
-        throw new ExceptionInInitializerError(
-          "[StreamPark] Usage:can't fond config,please set \"--conf $path \" 
in main arguments")
-      case file => file
+    val configMap = parseConfig()
+    if (configMap.isEmpty) {
+      throw new IllegalArgumentException(
+        "[StreamPark] Usage:can't fond config,please set \"--conf $path \" in 
main arguments")
     }
-    val configMap = parseConfig(config)
-    val properConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
+    val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
     val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
-
     // config priority: explicitly specified priority > project profiles > 
system profiles
     val parameter = ParameterTool
       .fromSystemProperties()
-      .mergeWith(ParameterTool.fromMap(properConf))
+      .mergeWith(ParameterTool.fromMap(flinkConf))
       .mergeWith(ParameterTool.fromMap(appConf))
-      .mergeWith(argsMap)
 
-    val envConfig = Configuration.fromMap(properConf)
+    val envConfig = Configuration.fromMap(flinkConf)
     FlinkConfiguration(parameter, envConfig, null)
   }
 
-  def parseConfig(config: String): Map[String, String] = {
-
-    lazy val content = DeflaterUtils.unzipString(config.drop(7))
-
-    def readConfig(text: String): Map[String, String] = {
-      val format = config.split("\\.").last.toLowerCase
-      format match {
-        case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
-        case "conf" => PropertiesUtils.fromHoconText(text)
-        case "properties" => PropertiesUtils.fromPropertiesText(text)
-        case _ =>
-          throw new IllegalArgumentException(
-            "[StreamPark] Usage: application config file error,must be 
[yaml|conf|properties]")
-      }
+  def parseConfig(): Map[String, String] = {
+    val argsMap = ParameterTool.fromArgs(args)
+    val configAsMap = argsMap.get(KEY_APP_CONF(), null) match {
+      case null | "" =>
+        logWarn("[StreamPark] Usage:can't fond config, Now try to find from 
jar")
+        val propFormats =
+          Set("application.yml", "application.yaml", "application.conf", 
"application.properties")
+        propFormats
+          .find(
+            f => {
+              
Try(this.getClass.getClassLoader.getResource(f)).getOrElse(false) == true
+            })
+          .map(
+            f => {
+              val input = this.getClass.getClassLoader.getResourceAsStream(f)
+              val content = FileUtils.readString(input)
+              val format = f.split("\\.").last.toLowerCase
+              readConfig(format, content)
+            })
+          .getOrElse(return Map.empty[String, String])
+
+      case config =>
+        lazy val content = DeflaterUtils.unzipString(config.drop(7))
+        lazy val format = config.split("\\.").last.toLowerCase
+        val map = config match {
+          case x if x.startsWith("yaml://") => 
PropertiesUtils.fromYamlText(content)
+          case x if x.startsWith("conf://") => 
PropertiesUtils.fromHoconText(content)
+          case x if x.startsWith("prop://") => 
PropertiesUtils.fromPropertiesText(content)
+          case x if x.startsWith("hdfs://") =>
+            // If the configuration file with the hdfs, user will need to copy 
the hdfs-related configuration files under the resources dir
+            val text = HdfsUtils.read(x)
+            readConfig(format, text)
+          case _ =>
+            val configFile = new File(config)
+            require(
+              configFile.exists(),
+              s"[StreamPark] Usage: application config file: $configFile is 
not found!!!")
+            val text = FileUtils.readString(configFile)
+            readConfig(format, text)
+        }
+        map
     }
+    // overview config...
+    configAsMap.putAll(argsMap.toMap.filter(_._1 != KEY_APP_CONF()))
+    configAsMap.filter(_._2.nonEmpty)
+  }
 
-    val map = config match {
-      case x if x.startsWith("yaml://") => 
PropertiesUtils.fromYamlText(content)
-      case x if x.startsWith("conf://") => 
PropertiesUtils.fromHoconText(content)
-      case x if x.startsWith("prop://") => 
PropertiesUtils.fromPropertiesText(content)
-      case x if x.startsWith("hdfs://") =>
-        // If the configuration file with the hdfs, user will need to copy the 
hdfs-related configuration files under the resources dir
-        val text = HdfsUtils.read(x)
-        readConfig(text)
+  private[this] def readConfig(format: String, text: String): Map[String, 
String] = {
+    format match {
+      case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+      case "conf" => PropertiesUtils.fromHoconText(text)
+      case "properties" => PropertiesUtils.fromPropertiesText(text)
       case _ =>
-        val configFile = new File(config)
-        require(
-          configFile.exists(),
-          s"[StreamPark] Usage: application config file: $configFile is not 
found!!!")
-        val text = FileUtils.readString(configFile)
-        readConfig(text)
+        throw new IllegalArgumentException(
+          "[StreamPark] Usage: application config file error,must be 
[yaml|conf|properties]")
     }
-    map.filter(_._2.nonEmpty)
   }
 
   def extractConfigByPrefix(configMap: Map[String, String], prefix: String): 
Map[String, String] = {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index a7d52749d..208566ae3 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.enums.{ApiType, 
PlannerType}
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.streampark.flink.core.EnhancerImplicit._
-import org.apache.streampark.flink.core.conf.FlinkConfiguration
 
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.Configuration
@@ -177,41 +176,35 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
 
   override def initParameter(): FlinkConfiguration = {
     val configuration = {
-      val argsMap = ParameterTool.fromArgs(args)
-      argsMap.get(KEY_APP_CONF(), null) match {
-        case null | "" =>
-          logWarn("Usage:can't fond config,you can set \"--conf $path \" in 
main arguments")
-          val parameter = 
ParameterTool.fromSystemProperties().mergeWith(argsMap)
-          FlinkConfiguration(parameter, new Configuration(), new 
Configuration())
-        case file =>
-          val configMap = parseConfig(file)
-          // set sql..
-          val sqlConf = mutable.Map[String, String]()
-          configMap.foreach(
-            x => {
-              if (x._1.startsWith(KEY_SQL_PREFIX)) {
-                sqlConf += x._1.drop(KEY_SQL_PREFIX.length) -> x._2
-              }
-            })
-
-          // config priority: explicitly specified priority > project profiles 
> system profiles
-          val properConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
-          val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
-          val tableConf = extractConfigByPrefix(configMap, 
KEY_FLINK_TABLE_PREFIX)
-
-          val tableConfig = Configuration.fromMap(tableConf)
-          val envConfig = Configuration.fromMap(properConf)
-
-          val parameter = ParameterTool
-            .fromSystemProperties()
-            .mergeWith(ParameterTool.fromMap(properConf))
-            .mergeWith(ParameterTool.fromMap(tableConf))
-            .mergeWith(ParameterTool.fromMap(appConf))
-            .mergeWith(ParameterTool.fromMap(sqlConf))
-            .mergeWith(argsMap)
-
-          FlinkConfiguration(parameter, envConfig, tableConfig)
+      val configMap = parseConfig()
+      if (configMap.isEmpty) {
+        logWarn("Usage:can't fond config,you can set \"--conf $path \" in main 
arguments")
       }
+      // set sql..
+      val sqlConf = mutable.Map[String, String]()
+      configMap.foreach(
+        x => {
+          if (x._1.startsWith(KEY_SQL_PREFIX)) {
+            sqlConf += x._1.drop(KEY_SQL_PREFIX.length) -> x._2
+          }
+        })
+
+      // config priority: explicitly specified priority > project profiles > 
system profiles
+      val flinkConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
+      val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+      val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
+
+      val tableConfig = Configuration.fromMap(tableConf)
+      val envConfig = Configuration.fromMap(flinkConf)
+
+      val parameter = ParameterTool
+        .fromSystemProperties()
+        .mergeWith(ParameterTool.fromMap(flinkConf))
+        .mergeWith(ParameterTool.fromMap(tableConf))
+        .mergeWith(ParameterTool.fromMap(appConf))
+        .mergeWith(ParameterTool.fromMap(sqlConf))
+
+      FlinkConfiguration(parameter, envConfig, tableConfig)
     }
 
     configuration.parameter.get(KEY_FLINK_SQL()) match {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 755c40103..f73844b54 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -17,8 +17,8 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigConst.printLogo
-import org.apache.streampark.flink.core.EnhancerImplicit._
 
+import EnhancerImplicit._
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api._
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 1f00d0fce..653c43035 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -30,7 +30,7 @@ import scala.collection.{immutable, mutable}
 import scala.collection.mutable.ListBuffer
 import scala.util.control.Breaks.{break, breakable}
 
-object SqlCommandParser extends Logger {
+private[flink] object SqlCommandParser extends Logger {
 
   def parseSQL(
       sql: String,
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
index 6d8393ee8..0d5095b4a 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
index 2103a6353..611d3f09d 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
similarity index 94%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
index cf97dd98b..c11a51f75 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
 import java.io.File
 import java.util
-import java.util.List
 
 class YarnClusterDescriptorTrait(yarnClusterDescriptor: YarnClusterDescriptor) 
{
   def addShipFiles(shipFiles: util.List[File]) =
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
deleted file mode 100644
index a8b0ba5c6..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.yarn.YarnClusterDescriptor
-
-class YarnClusterDescriptorWrapper(yarnClusterDescriptor: 
YarnClusterDescriptor)
-  extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.client.program.ClusterClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..6fc4d1614 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a8b0ba5c6..105a4d9a4 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.client.program.ClusterClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..5919bf84d 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.client.program.ClusterClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..6fc4d1614 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a8b0ba5c6..105a4d9a4 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 89184c756..3ed7e7bbc 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 90%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..f2a4cc51e 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
+
+import org.apache.streampark.flink.deployment.FlinkKubernetesClientTrait
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
deleted file mode 100644
index 4f6336f5a..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.core.execution.SavepointFormatType
-
-import java.util.concurrent.CompletableFuture
-
-class FlinkClusterClient[T](clusterClient: ClusterClient[T])
-  extends FlinkClientTrait[T](clusterClient) {
-
-  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
-    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
-  }
-
-  override def cancelWithSavepoint(
-      jobID: JobID,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
-  }
-
-  override def stopWithSavepoint(
-      jobID: JobID,
-      advanceToEndOfEventTime: Boolean,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.stopWithSavepoint(
-      jobID,
-      advanceToEndOfEventTime,
-      savepointDirectory,
-      SavepointFormatType.DEFAULT)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 93%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a09b86844..2fef79071 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
-import collection.JavaConversions._
 import org.apache.flink.yarn.YarnClusterDescriptor
 import org.apache.hadoop.fs.Path
 
 import java.io.File
 import java.util
 
+import scala.collection.JavaConversions._
+
 class YarnClusterDescriptorWrapper(yarnClusterDescriptor: 
YarnClusterDescriptor)
   extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
deleted file mode 100644
index 4f6336f5a..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.core.execution.SavepointFormatType
-
-import java.util.concurrent.CompletableFuture
-
-class FlinkClusterClient[T](clusterClient: ClusterClient[T])
-  extends FlinkClientTrait[T](clusterClient) {
-
-  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
-    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
-  }
-
-  override def cancelWithSavepoint(
-      jobID: JobID,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
-  }
-
-  override def stopWithSavepoint(
-      jobID: JobID,
-      advanceToEndOfEventTime: Boolean,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.stopWithSavepoint(
-      jobID,
-      advanceToEndOfEventTime,
-      savepointDirectory,
-      SavepointFormatType.DEFAULT)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
deleted file mode 100644
index f388c8e9f..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
-import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
-
-import java.util.Optional
-
-class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
-  extends FlinkKubernetesClientTrait(kubeClient) {
-
-  override def getService(serviceName: String): Optional[KubernetesService] = {
-    kubeClient.getService(serviceName)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index 65f715c75..81a855c7a 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.flink.core
 
+import org.apache.streampark.flink.core.FlinkTableInitializer
+
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, 
PlanReference, Schema, Table, TableDescriptor, TableResult}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index e8f704f39..7aff52560 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.flink.core
 
+import org.apache.streampark.flink.core.FlinkTableInitializer
+
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, 
PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
 import org.apache.flink.table.catalog.CatalogDescriptor
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a09b86844..72cd605c8 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import collection.JavaConversions._
 import org.apache.flink.yarn.YarnClusterDescriptor

Reply via email to