This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 28c9fe806 [Improve] custom-code job read conf from jar support (#4055)
28c9fe806 is described below

commit 28c9fe806e7f022f66a7d2418bbe477133dea793
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 14 10:23:27 2024 +0800

    [Improve] custom-code job read conf from jar support (#4055)
    
    * [Improve] custom-code job read conf from jar support
    
    * [Improve] read conf bug fixed.
    
    * [Improve] execution-runtime-mode bug fixed.
---
 .../apache/streampark/common/util/FileUtils.scala  | 16 ++++
 .../console/core/service/SavepointService.java     |  2 +
 .../core/service/impl/ApplicationServiceImpl.java  |  2 +-
 .../core/service/impl/SavepointServiceImpl.java    | 24 +++---
 .../console/core/task/CheckpointProcessor.java     |  2 +-
 .../src/design/index.less                          |  5 +-
 .../src/design/swal2.less                          |  9 +-
 .../src/design/var/index.less                      |  2 +-
 .../src/layouts/default/header/MultipleHeader.vue  |  4 +-
 .../src/views/flink/app/hooks/useApp.tsx           | 12 +--
 .../impl/KubernetesNativeSessionClient.scala       |  2 +-
 .../flink/client/impl/YarnPerJobClient.scala       |  2 +-
 .../flink/client/trait/FlinkClientTrait.scala      |  6 +-
 .../streampark/flink/core/scala/FlinkTable.scala   |  1 -
 .../streampark/flink/core/EnhancerImplicit.scala   |  2 +-
 .../flink/core/{conf => }/FlinkConfiguration.scala |  4 +-
 .../streampark/flink/core/FlinkSqlExecutor.scala   |  2 +-
 .../streampark/flink/core/FlinkSqlValidator.scala  |  2 +-
 .../flink/core/FlinkStreamTableTrait.scala         |  2 +-
 .../flink/core/FlinkStreamingInitializer.scala     | 98 ++++++++++++----------
 .../flink/core/FlinkTableInitializer.scala         | 98 +++++++++-------------
 .../streampark/flink/core/FlinkTableTrait.scala    |  2 +-
 .../streampark/flink/core/SqlCommandParser.scala   |  2 +-
 .../{core => deployment}/FlinkClientTrait.scala    |  2 +-
 .../FlinkKubernetesClientTrait.scala               |  2 +-
 .../YarnClusterDescriptorTrait.scala               |  3 +-
 .../flink/deployment}/FlinkClusterClient.scala     |  2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |  2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  2 +-
 .../{core => deployment}/FlinkClusterClient.scala  |  2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |  3 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  2 +-
 .../flink/core/YarnClusterDescriptorWrapper.scala  | 22 -----
 .../flink/deployment}/FlinkClusterClient.scala     |  2 +-
 .../FlinkKubernetesClient.scala                    |  2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  2 +-
 .../{core => deployment}/FlinkClusterClient.scala  |  2 +-
 .../FlinkKubernetesClient.scala                    |  2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  2 +-
 .../{core => deployment}/FlinkClusterClient.scala  |  2 +-
 .../FlinkKubernetesClient.scala                    |  2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  2 +-
 .../{core => deployment}/FlinkClusterClient.scala  |  2 +-
 .../FlinkKubernetesClient.scala                    |  2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  2 +-
 .../{core => deployment}/FlinkClusterClient.scala  |  2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |  4 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  2 +-
 .../streampark/flink/core/FlinkClusterClient.scala | 49 -----------
 .../flink/deployment}/FlinkClusterClient.scala     |  2 +-
 .../FlinkKubernetesClient.scala                    |  2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  5 +-
 .../streampark/flink/core/FlinkClusterClient.scala | 49 -----------
 .../flink/core/FlinkKubernetesClient.scala         | 31 -------
 .../streampark/flink/core/StreamTableContext.scala |  2 +
 .../streampark/flink/core/TableContext.scala       |  2 +
 .../flink/deployment}/FlinkClusterClient.scala     |  2 +-
 .../flink/deployment}/FlinkKubernetesClient.scala  |  2 +-
 .../deployment}/YarnClusterDescriptorWrapper.scala |  2 +-
 .../apache/streampark/flink/cli/SqlClient.scala    |  4 +-
 60 files changed, 201 insertions(+), 326 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 77618fbd4..d3eed9d2f 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -170,4 +170,20 @@ object FileUtils {
     buffer.toString()
   }
 
+  @throws[IOException]
+  def readString(in: InputStream): String = {
+    require(in != null)
+    val scanner = new Scanner(in)
+    val buffer = new mutable.StringBuilder()
+    if (scanner.hasNextLine) {
+      buffer.append(scanner.nextLine())
+    }
+    while (scanner.hasNextLine) {
+      buffer.append("\r\n")
+      buffer.append(scanner.nextLine())
+    }
+    Utils.close(scanner)
+    buffer.toString()
+  }
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
index f40e8e7a6..d00728563 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
@@ -44,4 +44,6 @@ public interface SavepointService extends IService<Savepoint> 
{
   String getSavePointPath(Application app) throws Exception;
 
   String processPath(String path, String jobName, Long jobId);
+
+  void saveSavePoint(Savepoint savepoint);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 0d2f3decd..2062baee3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1444,7 +1444,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             savepoint.setType(CheckPointType.SAVEPOINT.get());
             savepoint.setCreateTime(new Date());
             savepoint.setTriggerTime(triggerTime);
-            savepointService.save(savepoint);
+            savepointService.saveSavePoint(savepoint);
           }
 
           if (application.isKubernetesModeJob()) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index 977afbb51..a1455b281 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -69,6 +69,7 @@ import javax.annotation.Nullable;
 import java.net.URI;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -111,15 +112,12 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
           TimeUnit.SECONDS,
           new LinkedBlockingQueue<>(),
           ThreadUtils.threadFactory("streampark-flink-savepoint-trigger"));
+
   @Autowired private SavepointMapper savepointMapper;
 
   @Override
   public void expire(Long appId) {
-    Savepoint savepoint = new Savepoint();
-    savepoint.setLatest(false);
-    LambdaQueryWrapper<Savepoint> queryWrapper =
-        new LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId, appId);
-    this.update(savepoint, queryWrapper);
+    savepointMapper.cleanLatest(appId);
   }
 
   private void expire(Savepoint entity) {
@@ -226,12 +224,13 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     LambdaQueryWrapper<Savepoint> queryWrapper =
         new LambdaQueryWrapper<Savepoint>()
             .eq(Savepoint::getAppId, id)
-            .eq(Savepoint::getLatest, true);
-    Savepoint savepoint = this.baseMapper.selectOne(queryWrapper);
-    if (savepoint == null) {
-      savepoint = this.baseMapper.findLatestByTime(id);
+            .eq(Savepoint::getLatest, true)
+            .orderByDesc(Savepoint::getCreateTime);
+    List<Savepoint> savepointList = this.baseMapper.selectList(queryWrapper);
+    if (!savepointList.isEmpty()) {
+      return savepointList.get(0);
     }
-    return savepoint;
+    return this.baseMapper.findLatestByTime(id);
   }
 
   @Override
@@ -315,11 +314,10 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
   }
 
   @Override
-  public boolean save(Savepoint savepoint) {
+  public void saveSavePoint(Savepoint savepoint) {
     this.expire(savepoint);
-    this.expire(savepoint.getAppId());
     this.cleanLatest(savepoint.getAppId());
-    return super.save(savepoint);
+    super.save(savepoint);
   }
 
   private void cleanLatest(Long appId) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 8a4c5b09c..ce3694473 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -181,7 +181,7 @@ public class CheckpointProcessor {
     savepoint.setPath(checkPoint.getExternalPath());
     savepoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
     savepoint.setCreateTime(new Date());
-    savepointService.save(savepoint);
+    savepointService.saveSavePoint(savepoint);
   }
 
   public static class Counter {
diff --git a/streampark-console/streampark-console-webapp/src/design/index.less 
b/streampark-console/streampark-console-webapp/src/design/index.less
index ac3e23a64..d787ae632 100644
--- a/streampark-console/streampark-console-webapp/src/design/index.less
+++ b/streampark-console/streampark-console-webapp/src/design/index.less
@@ -102,6 +102,7 @@ span {
 .ant-modal-content,
 .ant-tree-checkbox-inner,
 .ant-table,
+.ant-card,
 .ant-alert,
 .bold-tag,
 .bold-tag > .ant-tag,
@@ -109,6 +110,8 @@ span {
 .ant-btn-group,
 textarea.ant-input,
 .ant-select-selector,
+.streampark-page-wrapper,
+.streampark-page-wrapper-content,
 .ant-upload.ant-upload-drag {
-  border-radius: 0 !important;
+  border-radius: 1px !important;
 }
diff --git a/streampark-console/streampark-console-webapp/src/design/swal2.less 
b/streampark-console/streampark-console-webapp/src/design/swal2.less
index fdc00ec4b..9129e8ca9 100644
--- a/streampark-console/streampark-console-webapp/src/design/swal2.less
+++ b/streampark-console/streampark-console-webapp/src/design/swal2.less
@@ -8,10 +8,17 @@
     padding: 16px;
     overflow: auto;
     line-height: 1.45;
-    border-radius: 6px;
+    border-radius: 2px;
   }
 }
 
+.swal2-container,
+.swal2-popup,
+.swal2-confirm,
+.swal2-styled {
+  border-radius: 1px !important;
+}
+
 [data-theme="dark"] {
 
   .swal2-popup.swal2-toast {
diff --git 
a/streampark-console/streampark-console-webapp/src/design/var/index.less 
b/streampark-console/streampark-console-webapp/src/design/var/index.less
index 4dbb4af32..3c21dca9b 100644
--- a/streampark-console/streampark-console-webapp/src/design/var/index.less
+++ b/streampark-console/streampark-console-webapp/src/design/var/index.less
@@ -8,7 +8,7 @@
 @multiple-height: 30px;
 
 // headers
-@header-height: 50px;
+@header-height: 64px;
 
 // logo width
 @logo-width: 48px;
diff --git 
a/streampark-console/streampark-console-webapp/src/layouts/default/header/MultipleHeader.vue
 
b/streampark-console/streampark-console-webapp/src/layouts/default/header/MultipleHeader.vue
index c852b7ee7..e493a2713 100644
--- 
a/streampark-console/streampark-console-webapp/src/layouts/default/header/MultipleHeader.vue
+++ 
b/streampark-console/streampark-console-webapp/src/layouts/default/header/MultipleHeader.vue
@@ -19,9 +19,9 @@
   import { useDesign } from '/@/hooks/web/useDesign';
   import { useLayoutHeight } from '../content/useContentViewHeight';
 
-  const HEADER_HEIGHT = 48;
-
+  const HEADER_HEIGHT = 64;
   const TABS_HEIGHT = 32;
+
   export default defineComponent({
     name: 'LayoutMultipleHeader',
     components: { LayoutHeader, MultipleTabs },
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index 0e5ccb2ac..b0b627545 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -226,11 +226,13 @@ export const useFlinkApplication = (openStartModal: Fn) 
=> {
       ],
       content: () => {
         return (
-          <Form class="!pt-40px">
+          <Form
+            class="!pt-40px"
+            layout='vertical'
+            baseColProps = {{ span: 20, offset: 2 }}
+          >
             <Form.Item
               label="Job Name"
-              labelCol={{ lg: { span: 5 }, sm: { span: 5 } }}
-              wrapperCol={{ lg: { span: 18 }, sm: { span: 18 } }}
               validateStatus={unref(validateStatus)}
               help={help}
               rules={[{ required: true }]}
@@ -319,8 +321,8 @@ export const useFlinkApplication = (openStartModal: Fn) => {
             class="!pt-40px"
             ref={mappingRef}
             name="mappingForm"
-            labelCol={{ lg: { span: 5 }, sm: { span: 5 } }}
-            wrapperCol={{ lg: { span: 18 }, sm: { span: 18 } }}
+            baseColProps = {{ span: 20, offset: 2 }}
+            layout='vertical'
             v-model:model={formValue}
           >
             <Form.Item label="Job Name">
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index c9f7cd2a5..4bd37db9a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.util.{Logger, Utils}
 import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
-import org.apache.streampark.flink.core.FlinkKubernetesClient
+import org.apache.streampark.flink.deployment.FlinkKubernetesClient
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 9d3b8c6f9..a685e861c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.client.impl
 
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.core.YarnClusterDescriptorWrapper
+import org.apache.streampark.flink.deployment.YarnClusterDescriptorWrapper
 import org.apache.streampark.flink.util.FlinkUtils
 
 import org.apache.flink.client.program.PackagedProgram
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index df61ee9d0..675b0ea08 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,12 +18,11 @@
 package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode}
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode}
 import org.apache.streampark.common.util.{DeflaterUtils, Logger, 
PropertiesUtils, Utils}
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
+import org.apache.streampark.flink.deployment.FlinkClusterClient
 
 import com.google.common.collect.Lists
 import org.apache.commons.cli.{CommandLine, Options}
@@ -36,7 +35,6 @@ import 
org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram, 
PackagedProgramUtils}
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
-import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import java.io.File
diff --git 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
index 2df1c6443..ef2de3be0 100644
--- 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
+++ 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
 import org.apache.streampark.flink.core.{FlinkTableInitializer, TableContext}
 
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api.TableConfig
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index 528e47907..a42c0e669 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 
 import scala.util.Try
 
-object EnhancerImplicit {
+private[flink] object EnhancerImplicit {
 
   implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
similarity index 91%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
index f7aa97fb4..f519cfb64 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core.conf
+package org.apache.streampark.flink.core
 
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.Configuration
 
-case class FlinkConfiguration(
+private[flink] case class FlinkConfiguration(
     parameter: ParameterTool,
     envConfig: Configuration,
     tableConfig: Configuration)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 58b0f6747..e46c6449e 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -31,7 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import scala.collection.mutable
 import scala.util.Try
 
-object FlinkSqlExecutor extends Logger {
+private[flink] object FlinkSqlExecutor extends Logger {
 
   private[this] val lock = new ReentrantReadWriteLock().writeLock
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 70101672e..5841d5cd3 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -33,7 +33,7 @@ import 
org.apache.flink.table.planner.delegation.FlinkSqlParserFactories
 
 import scala.util.{Failure, Try}
 
-object FlinkSqlValidator extends Logger {
+private[flink] object FlinkSqlValidator extends Logger {
 
   private[this] val FLINK112_CALCITE_PARSER_CLASS =
     "org.apache.flink.table.planner.calcite.CalciteParser"
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index a50116925..b0a746005 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -17,8 +17,8 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.flink.core.EnhancerImplicit._
 
+import EnhancerImplicit._
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.api.common.{JobExecutionResult, RuntimeExecutionMode}
 import org.apache.flink.api.common.cache.DistributedCache
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index b3ee24c33..47a5bce6b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -20,10 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util._
-import org.apache.streampark.flink.core.conf.FlinkConfiguration
 
-import collection.{mutable, Map}
-import collection.JavaConversions._
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamEnv}
@@ -32,6 +29,10 @@ import org.apache.flink.table.api.TableConfig
 
 import java.io.File
 
+import scala.collection.{mutable, Map}
+import scala.collection.JavaConversions._
+import scala.util.Try
+
 private[flink] object FlinkStreamingInitializer {
 
   def initialize(args: Array[String], config: (StreamExecutionEnvironment, 
ParameterTool) => Unit)
@@ -80,60 +81,73 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
 
   def initParameter(): FlinkConfiguration = {
     val argsMap = ParameterTool.fromArgs(args)
-    val config = argsMap.get(KEY_APP_CONF(), null) match {
-      case null | "" =>
-        throw new ExceptionInInitializerError(
-          "[StreamPark] Usage:can't fond config,please set \"--conf $path \" 
in main arguments")
-      case file => file
+    val configMap = parseConfig(argsMap)
+    if (!argsMap.has(KEY_APP_CONF()) && configMap.isEmpty) {
+      throw new IllegalArgumentException(
+        "[StreamPark] Usage:can't fond config, please set \"--conf $path \" in 
main arguments")
     }
-    val configMap = parseConfig(config)
-    val properConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
+    val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
     val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
-
     // config priority: explicitly specified priority > project profiles > 
system profiles
     val parameter = ParameterTool
       .fromSystemProperties()
-      .mergeWith(ParameterTool.fromMap(properConf))
+      .mergeWith(ParameterTool.fromMap(flinkConf))
       .mergeWith(ParameterTool.fromMap(appConf))
       .mergeWith(argsMap)
 
-    val envConfig = Configuration.fromMap(properConf)
+    val envConfig = Configuration.fromMap(flinkConf)
     FlinkConfiguration(parameter, envConfig, null)
   }
 
-  def parseConfig(config: String): Map[String, String] = {
-
-    lazy val content = DeflaterUtils.unzipString(config.drop(7))
-
-    def readConfig(text: String): Map[String, String] = {
-      val format = config.split("\\.").last.toLowerCase
-      format match {
-        case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
-        case "conf" => PropertiesUtils.fromHoconText(text)
-        case "properties" => PropertiesUtils.fromPropertiesText(text)
-        case _ =>
-          throw new IllegalArgumentException(
-            "[StreamPark] Usage: application config file error,must be 
[yaml|conf|properties]")
-      }
+  def parseConfig(parameterTool: ParameterTool): Map[String, String] = {
+    parameterTool.get(KEY_APP_CONF(), null) match {
+      case null | "" =>
+        logWarn("Usage:can't fond config, now trying to find from jar")
+        val configs =
+          Set("application.yml", "application.yaml", "application.conf", 
"application.properties")
+        configs
+          .find(
+            f => Try(this.getClass.getClassLoader.getResource(f).getPath != 
null).getOrElse(false))
+          .map(
+            f => {
+              val input = this.getClass.getClassLoader.getResourceAsStream(f)
+              val content = FileUtils.readString(input)
+              val format = f.split("\\.").last.toLowerCase
+              readConfig(format, content)
+            })
+          .getOrElse(return Map.empty[String, String])
+      case config =>
+        lazy val content = DeflaterUtils.unzipString(config.drop(7))
+        lazy val format = config.split("\\.").last.toLowerCase
+        val configAsMap = config match {
+          case x if x.startsWith("yaml://") => 
PropertiesUtils.fromYamlText(content)
+          case x if x.startsWith("conf://") => 
PropertiesUtils.fromHoconText(content)
+          case x if x.startsWith("prop://") => 
PropertiesUtils.fromPropertiesText(content)
+          case x if x.startsWith("hdfs://") =>
+            // If the configuration file with the hdfs, user will need to copy 
the hdfs-related configuration files under the resources dir
+            val text = HdfsUtils.read(x)
+            readConfig(format, text)
+          case _ =>
+            val configFile = new File(config)
+            require(
+              configFile.exists(),
+              s"[StreamPark] Usage: application config file: $configFile is 
not found!!!")
+            val text = FileUtils.readString(configFile)
+            readConfig(format, text)
+        }
+        configAsMap.filter(_._2.nonEmpty)
     }
+  }
 
-    val map = config match {
-      case x if x.startsWith("yaml://") => 
PropertiesUtils.fromYamlText(content)
-      case x if x.startsWith("conf://") => 
PropertiesUtils.fromHoconText(content)
-      case x if x.startsWith("prop://") => 
PropertiesUtils.fromPropertiesText(content)
-      case x if x.startsWith("hdfs://") =>
-        // If the configuration file with the hdfs, user will need to copy the 
hdfs-related configuration files under the resources dir
-        val text = HdfsUtils.read(x)
-        readConfig(text)
+  private[this] def readConfig(format: String, text: String): Map[String, 
String] = {
+    format match {
+      case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+      case "conf" => PropertiesUtils.fromHoconText(text)
+      case "properties" => PropertiesUtils.fromPropertiesText(text)
       case _ =>
-        val configFile = new File(config)
-        require(
-          configFile.exists(),
-          s"[StreamPark] Usage: application config file: $configFile is not 
found!!!")
-        val text = FileUtils.readString(configFile)
-        readConfig(text)
+        throw new IllegalArgumentException(
+          "[StreamPark] Usage: application config file error,must be 
[yaml|conf|properties]")
     }
-    map.filter(_._2.nonEmpty)
   }
 
   def extractConfigByPrefix(configMap: Map[String, String], prefix: String): 
Map[String, String] = {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index a7d52749d..56267cbda 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -19,9 +19,8 @@ package org.apache.streampark.flink.core
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.enums.{ApiType, PlannerType}
 import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
+import org.apache.streampark.common.util.DeflaterUtils
 import org.apache.streampark.flink.core.EnhancerImplicit._
-import org.apache.streampark.flink.core.conf.FlinkConfiguration
 
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.Configuration
@@ -29,10 +28,8 @@ import 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 
-import java.io.File
-
-import scala.collection.{mutable, Map}
 import scala.collection.JavaConversions._
+import scala.collection.Map
 import scala.util.{Failure, Success, Try}
 
 private[flink] object FlinkTableInitializer {
@@ -176,63 +173,46 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
   /** In case of table SQL, the parameter conf is not required, it depends on 
the developer. */
 
   override def initParameter(): FlinkConfiguration = {
-    val configuration = {
-      val argsMap = ParameterTool.fromArgs(args)
-      argsMap.get(KEY_APP_CONF(), null) match {
-        case null | "" =>
-          logWarn("Usage:can't fond config,you can set \"--conf $path \" in 
main arguments")
-          val parameter = 
ParameterTool.fromSystemProperties().mergeWith(argsMap)
-          FlinkConfiguration(parameter, new Configuration(), new 
Configuration())
-        case file =>
-          val configMap = parseConfig(file)
-          // set sql..
-          val sqlConf = mutable.Map[String, String]()
-          configMap.foreach(
-            x => {
-              if (x._1.startsWith(KEY_SQL_PREFIX)) {
-                sqlConf += x._1.drop(KEY_SQL_PREFIX.length) -> x._2
-              }
-            })
-
-          // config priority: explicitly specified priority > project profiles 
> system profiles
-          val properConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
-          val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
-          val tableConf = extractConfigByPrefix(configMap, 
KEY_FLINK_TABLE_PREFIX)
-
-          val tableConfig = Configuration.fromMap(tableConf)
-          val envConfig = Configuration.fromMap(properConf)
-
-          val parameter = ParameterTool
-            .fromSystemProperties()
-            .mergeWith(ParameterTool.fromMap(properConf))
-            .mergeWith(ParameterTool.fromMap(tableConf))
-            .mergeWith(ParameterTool.fromMap(appConf))
-            .mergeWith(ParameterTool.fromMap(sqlConf))
-            .mergeWith(argsMap)
-
-          FlinkConfiguration(parameter, envConfig, tableConfig)
+    val cliParameterTool = ParameterTool.fromArgs(args)
+    val configMap = super.parseConfig(cliParameterTool)
+    val flinkConfiguration = {
+      if (!cliParameterTool.has(KEY_APP_CONF()) && configMap.isEmpty) {
+        logWarn(
+          "Usage: can't fond config, this config is not required, you can set 
\"--conf $path \" in main arguments")
+        FlinkConfiguration(cliParameterTool, new Configuration(), new 
Configuration())
+      } else {
+        // config priority: explicitly specified priority > project profiles > 
system profiles
+        val flinkConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
+        val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+        val tableConf = extractConfigByPrefix(configMap, 
KEY_FLINK_TABLE_PREFIX)
+        val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX)
+
+        val envConfig = Configuration.fromMap(flinkConf)
+        val tableConfig = Configuration.fromMap(tableConf)
+
+        val parameterTool = ParameterTool
+          .fromSystemProperties()
+          .mergeWith(ParameterTool.fromMap(flinkConf))
+          .mergeWith(ParameterTool.fromMap(appConf))
+          .mergeWith(ParameterTool.fromMap(tableConf))
+          .mergeWith(ParameterTool.fromMap(sqlConf))
+
+        FlinkConfiguration(parameterTool, envConfig, tableConfig)
       }
     }
 
-    configuration.parameter.get(KEY_FLINK_SQL()) match {
-      case null => configuration
-      case param =>
-        // for streampark-console
-        Try(DeflaterUtils.unzipString(param)) match {
-          case Success(value) =>
-            configuration.copy(parameter = configuration.parameter.mergeWith(
-              ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value))))
-          case Failure(_) =>
-            val sqlFile = new File(param)
-            Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
-              case Success(value) =>
-                configuration.copy(parameter =
-                  
configuration.parameter.mergeWith(ParameterTool.fromMap(value)))
-              case Failure(e) =>
-                new IllegalArgumentException(s"[StreamPark] init sql error.$e")
-                configuration
-            }
-        }
+    if (cliParameterTool.has(KEY_FLINK_SQL())) {
+      // for streampark-console
+      Try(DeflaterUtils.unzipString(cliParameterTool.get(KEY_FLINK_SQL()))) 
match {
+        case Success(value) =>
+          flinkConfiguration.copy(parameter = 
flinkConfiguration.parameter.mergeWith(
+            ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value))))
+        case Failure(e) =>
+          new IllegalArgumentException(s"[StreamPark] --sql parsing failed. 
$e")
+          flinkConfiguration
+      }
+    } else {
+      flinkConfiguration
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 755c40103..f73844b54 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -17,8 +17,8 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigConst.printLogo
-import org.apache.streampark.flink.core.EnhancerImplicit._
 
+import EnhancerImplicit._
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api._
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 1f00d0fce..653c43035 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -30,7 +30,7 @@ import scala.collection.{immutable, mutable}
 import scala.collection.mutable.ListBuffer
 import scala.util.control.Breaks.{break, breakable}
 
-object SqlCommandParser extends Logger {
+private[flink] object SqlCommandParser extends Logger {
 
   def parseSQL(
       sql: String,
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
index 6d8393ee8..0d5095b4a 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
index 2103a6353..611d3f09d 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
similarity index 94%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
index cf97dd98b..c11a51f75 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
 import java.io.File
 import java.util
-import java.util.List
 
 class YarnClusterDescriptorTrait(yarnClusterDescriptor: YarnClusterDescriptor) 
{
   def addShipFiles(shipFiles: util.List[File]) =
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.client.program.ClusterClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..6fc4d1614 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a8b0ba5c6..105a4d9a4 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.client.program.ClusterClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..5919bf84d 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
deleted file mode 100644
index a8b0ba5c6..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.yarn.YarnClusterDescriptor
-
-class YarnClusterDescriptorWrapper(yarnClusterDescriptor: 
YarnClusterDescriptor)
-  extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.client.program.ClusterClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..6fc4d1614 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a8b0ba5c6..105a4d9a4 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 89184c756..3ed7e7bbc 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 90%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..f2a4cc51e 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
+
+import org.apache.streampark.flink.deployment.FlinkKubernetesClientTrait
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.yarn.YarnClusterDescriptor
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
deleted file mode 100644
index 4f6336f5a..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.core.execution.SavepointFormatType
-
-import java.util.concurrent.CompletableFuture
-
-class FlinkClusterClient[T](clusterClient: ClusterClient[T])
-  extends FlinkClientTrait[T](clusterClient) {
-
-  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
-    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
-  }
-
-  override def cancelWithSavepoint(
-      jobID: JobID,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
-  }
-
-  override def stopWithSavepoint(
-      jobID: JobID,
-      advanceToEndOfEventTime: Boolean,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.stopWithSavepoint(
-      jobID,
-      advanceToEndOfEventTime,
-      savepointDirectory,
-      SavepointFormatType.DEFAULT)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 93%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a09b86844..2fef79071 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
-import collection.JavaConversions._
 import org.apache.flink.yarn.YarnClusterDescriptor
 import org.apache.hadoop.fs.Path
 
 import java.io.File
 import java.util
 
+import scala.collection.JavaConversions._
+
 class YarnClusterDescriptorWrapper(yarnClusterDescriptor: 
YarnClusterDescriptor)
   extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
deleted file mode 100644
index 4f6336f5a..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.core.execution.SavepointFormatType
-
-import java.util.concurrent.CompletableFuture
-
-class FlinkClusterClient[T](clusterClient: ClusterClient[T])
-  extends FlinkClientTrait[T](clusterClient) {
-
-  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
-    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
-  }
-
-  override def cancelWithSavepoint(
-      jobID: JobID,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
-  }
-
-  override def stopWithSavepoint(
-      jobID: JobID,
-      advanceToEndOfEventTime: Boolean,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.stopWithSavepoint(
-      jobID,
-      advanceToEndOfEventTime,
-      savepointDirectory,
-      SavepointFormatType.DEFAULT)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
deleted file mode 100644
index f388c8e9f..000000000
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
-import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
-
-import java.util.Optional
-
-class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
-  extends FlinkKubernetesClientTrait(kubeClient) {
-
-  override def getService(serviceName: String): Optional[KubernetesService] = {
-    kubeClient.getService(serviceName)
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index 65f715c75..81a855c7a 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.flink.core
 
+import org.apache.streampark.flink.core.FlinkTableInitializer
+
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, 
PlanReference, Schema, Table, TableDescriptor, TableResult}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index e8f704f39..7aff52560 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.flink.core
 
+import org.apache.streampark.flink.core.FlinkTableInitializer
+
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, 
PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
 import org.apache.flink.table.catalog.CatalogDescriptor
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.program.ClusterClient
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a09b86844..72cd605c8 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
 
 import collection.JavaConversions._
 import org.apache.flink.yarn.YarnClusterDescriptor
diff --git 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index c4be0ab58..c80d520f9 100644
--- 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -55,7 +55,7 @@ object SqlClient extends App {
     case Some(e) =>
       // 1) flink sql execution.runtime-mode has highest priority
       val m = e.operands(1).toUpperCase()
-      arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+      arguments += s"--${ExecutionOptions.RUNTIME_MODE.key()} $m"
       m
     case None =>
       // 2) dynamic properties execution.runtime-mode
@@ -68,7 +68,7 @@ object SqlClient extends App {
               // 3) application conf execution.runtime-mode
               parameter.getOrElse(KEY_FLINK_TABLE_MODE, 
defaultMode).toUpperCase()
           }
-          arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+          arguments += s"--${ExecutionOptions.RUNTIME_MODE.key()} $m"
           m
         case m => m
       }


Reply via email to