This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 28c9fe806 [Improve] custom-code job read conf from jar support (#4055)
28c9fe806 is described below
commit 28c9fe806e7f022f66a7d2418bbe477133dea793
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 14 10:23:27 2024 +0800
[Improve] custom-code job read conf from jar support (#4055)
* [Improve] custom-code job read conf from jar support
* [Improve] read conf bug fixed.
* [Improve] execution-runtime-mode bug fixed.
---
.../apache/streampark/common/util/FileUtils.scala | 16 ++++
.../console/core/service/SavepointService.java | 2 +
.../core/service/impl/ApplicationServiceImpl.java | 2 +-
.../core/service/impl/SavepointServiceImpl.java | 24 +++---
.../console/core/task/CheckpointProcessor.java | 2 +-
.../src/design/index.less | 5 +-
.../src/design/swal2.less | 9 +-
.../src/design/var/index.less | 2 +-
.../src/layouts/default/header/MultipleHeader.vue | 4 +-
.../src/views/flink/app/hooks/useApp.tsx | 12 +--
.../impl/KubernetesNativeSessionClient.scala | 2 +-
.../flink/client/impl/YarnPerJobClient.scala | 2 +-
.../flink/client/trait/FlinkClientTrait.scala | 6 +-
.../streampark/flink/core/scala/FlinkTable.scala | 1 -
.../streampark/flink/core/EnhancerImplicit.scala | 2 +-
.../flink/core/{conf => }/FlinkConfiguration.scala | 4 +-
.../streampark/flink/core/FlinkSqlExecutor.scala | 2 +-
.../streampark/flink/core/FlinkSqlValidator.scala | 2 +-
.../flink/core/FlinkStreamTableTrait.scala | 2 +-
.../flink/core/FlinkStreamingInitializer.scala | 98 ++++++++++++----------
.../flink/core/FlinkTableInitializer.scala | 98 +++++++++-------------
.../streampark/flink/core/FlinkTableTrait.scala | 2 +-
.../streampark/flink/core/SqlCommandParser.scala | 2 +-
.../{core => deployment}/FlinkClientTrait.scala | 2 +-
.../FlinkKubernetesClientTrait.scala | 2 +-
.../YarnClusterDescriptorTrait.scala | 3 +-
.../flink/deployment}/FlinkClusterClient.scala | 2 +-
.../flink/deployment}/FlinkKubernetesClient.scala | 2 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 2 +-
.../{core => deployment}/FlinkClusterClient.scala | 2 +-
.../flink/deployment}/FlinkKubernetesClient.scala | 3 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 2 +-
.../flink/core/YarnClusterDescriptorWrapper.scala | 22 -----
.../flink/deployment}/FlinkClusterClient.scala | 2 +-
.../FlinkKubernetesClient.scala | 2 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 2 +-
.../{core => deployment}/FlinkClusterClient.scala | 2 +-
.../FlinkKubernetesClient.scala | 2 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 2 +-
.../{core => deployment}/FlinkClusterClient.scala | 2 +-
.../FlinkKubernetesClient.scala | 2 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 2 +-
.../{core => deployment}/FlinkClusterClient.scala | 2 +-
.../FlinkKubernetesClient.scala | 2 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 2 +-
.../{core => deployment}/FlinkClusterClient.scala | 2 +-
.../flink/deployment}/FlinkKubernetesClient.scala | 4 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 2 +-
.../streampark/flink/core/FlinkClusterClient.scala | 49 -----------
.../flink/deployment}/FlinkClusterClient.scala | 2 +-
.../FlinkKubernetesClient.scala | 2 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 5 +-
.../streampark/flink/core/FlinkClusterClient.scala | 49 -----------
.../flink/core/FlinkKubernetesClient.scala | 31 -------
.../streampark/flink/core/StreamTableContext.scala | 2 +
.../streampark/flink/core/TableContext.scala | 2 +
.../flink/deployment}/FlinkClusterClient.scala | 2 +-
.../flink/deployment}/FlinkKubernetesClient.scala | 2 +-
.../deployment}/YarnClusterDescriptorWrapper.scala | 2 +-
.../apache/streampark/flink/cli/SqlClient.scala | 4 +-
60 files changed, 201 insertions(+), 326 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 77618fbd4..d3eed9d2f 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -170,4 +170,20 @@ object FileUtils {
buffer.toString()
}
+ @throws[IOException]
+ def readString(in: InputStream): String = {
+ require(in != null)
+ val scanner = new Scanner(in)
+ val buffer = new mutable.StringBuilder()
+ if (scanner.hasNextLine) {
+ buffer.append(scanner.nextLine())
+ }
+ while (scanner.hasNextLine) {
+ buffer.append("\r\n")
+ buffer.append(scanner.nextLine())
+ }
+ Utils.close(scanner)
+ buffer.toString()
+ }
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
index f40e8e7a6..d00728563 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
@@ -44,4 +44,6 @@ public interface SavepointService extends IService<Savepoint>
{
String getSavePointPath(Application app) throws Exception;
String processPath(String path, String jobName, Long jobId);
+
+ void saveSavePoint(Savepoint savepoint);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 0d2f3decd..2062baee3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1444,7 +1444,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
savepoint.setType(CheckPointType.SAVEPOINT.get());
savepoint.setCreateTime(new Date());
savepoint.setTriggerTime(triggerTime);
- savepointService.save(savepoint);
+ savepointService.saveSavePoint(savepoint);
}
if (application.isKubernetesModeJob()) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index 977afbb51..a1455b281 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -69,6 +69,7 @@ import javax.annotation.Nullable;
import java.net.URI;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@@ -111,15 +112,12 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ThreadUtils.threadFactory("streampark-flink-savepoint-trigger"));
+
@Autowired private SavepointMapper savepointMapper;
@Override
public void expire(Long appId) {
- Savepoint savepoint = new Savepoint();
- savepoint.setLatest(false);
- LambdaQueryWrapper<Savepoint> queryWrapper =
- new LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId, appId);
- this.update(savepoint, queryWrapper);
+ savepointMapper.cleanLatest(appId);
}
private void expire(Savepoint entity) {
@@ -226,12 +224,13 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
LambdaQueryWrapper<Savepoint> queryWrapper =
new LambdaQueryWrapper<Savepoint>()
.eq(Savepoint::getAppId, id)
- .eq(Savepoint::getLatest, true);
- Savepoint savepoint = this.baseMapper.selectOne(queryWrapper);
- if (savepoint == null) {
- savepoint = this.baseMapper.findLatestByTime(id);
+ .eq(Savepoint::getLatest, true)
+ .orderByDesc(Savepoint::getCreateTime);
+ List<Savepoint> savepointList = this.baseMapper.selectList(queryWrapper);
+ if (!savepointList.isEmpty()) {
+ return savepointList.get(0);
}
- return savepoint;
+ return this.baseMapper.findLatestByTime(id);
}
@Override
@@ -315,11 +314,10 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
}
@Override
- public boolean save(Savepoint savepoint) {
+ public void saveSavePoint(Savepoint savepoint) {
this.expire(savepoint);
- this.expire(savepoint.getAppId());
this.cleanLatest(savepoint.getAppId());
- return super.save(savepoint);
+ super.save(savepoint);
}
private void cleanLatest(Long appId) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 8a4c5b09c..ce3694473 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -181,7 +181,7 @@ public class CheckpointProcessor {
savepoint.setPath(checkPoint.getExternalPath());
savepoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
savepoint.setCreateTime(new Date());
- savepointService.save(savepoint);
+ savepointService.saveSavePoint(savepoint);
}
public static class Counter {
diff --git a/streampark-console/streampark-console-webapp/src/design/index.less
b/streampark-console/streampark-console-webapp/src/design/index.less
index ac3e23a64..d787ae632 100644
--- a/streampark-console/streampark-console-webapp/src/design/index.less
+++ b/streampark-console/streampark-console-webapp/src/design/index.less
@@ -102,6 +102,7 @@ span {
.ant-modal-content,
.ant-tree-checkbox-inner,
.ant-table,
+.ant-card,
.ant-alert,
.bold-tag,
.bold-tag > .ant-tag,
@@ -109,6 +110,8 @@ span {
.ant-btn-group,
textarea.ant-input,
.ant-select-selector,
+.streampark-page-wrapper,
+.streampark-page-wrapper-content,
.ant-upload.ant-upload-drag {
- border-radius: 0 !important;
+ border-radius: 1px !important;
}
diff --git a/streampark-console/streampark-console-webapp/src/design/swal2.less
b/streampark-console/streampark-console-webapp/src/design/swal2.less
index fdc00ec4b..9129e8ca9 100644
--- a/streampark-console/streampark-console-webapp/src/design/swal2.less
+++ b/streampark-console/streampark-console-webapp/src/design/swal2.less
@@ -8,10 +8,17 @@
padding: 16px;
overflow: auto;
line-height: 1.45;
- border-radius: 6px;
+ border-radius: 2px;
}
}
+.swal2-container,
+.swal2-popup,
+.swal2-confirm,
+.swal2-styled {
+ border-radius: 1px !important;
+}
+
[data-theme="dark"] {
.swal2-popup.swal2-toast {
diff --git
a/streampark-console/streampark-console-webapp/src/design/var/index.less
b/streampark-console/streampark-console-webapp/src/design/var/index.less
index 4dbb4af32..3c21dca9b 100644
--- a/streampark-console/streampark-console-webapp/src/design/var/index.less
+++ b/streampark-console/streampark-console-webapp/src/design/var/index.less
@@ -8,7 +8,7 @@
@multiple-height: 30px;
// headers
-@header-height: 50px;
+@header-height: 64px;
// logo width
@logo-width: 48px;
diff --git
a/streampark-console/streampark-console-webapp/src/layouts/default/header/MultipleHeader.vue
b/streampark-console/streampark-console-webapp/src/layouts/default/header/MultipleHeader.vue
index c852b7ee7..e493a2713 100644
---
a/streampark-console/streampark-console-webapp/src/layouts/default/header/MultipleHeader.vue
+++
b/streampark-console/streampark-console-webapp/src/layouts/default/header/MultipleHeader.vue
@@ -19,9 +19,9 @@
import { useDesign } from '/@/hooks/web/useDesign';
import { useLayoutHeight } from '../content/useContentViewHeight';
- const HEADER_HEIGHT = 48;
-
+ const HEADER_HEIGHT = 64;
const TABS_HEIGHT = 32;
+
export default defineComponent({
name: 'LayoutMultipleHeader',
components: { LayoutHeader, MultipleTabs },
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index 0e5ccb2ac..b0b627545 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -226,11 +226,13 @@ export const useFlinkApplication = (openStartModal: Fn)
=> {
],
content: () => {
return (
- <Form class="!pt-40px">
+ <Form
+ class="!pt-40px"
+ layout='vertical'
+ baseColProps = {{ span: 20, offset: 2 }}
+ >
<Form.Item
label="Job Name"
- labelCol={{ lg: { span: 5 }, sm: { span: 5 } }}
- wrapperCol={{ lg: { span: 18 }, sm: { span: 18 } }}
validateStatus={unref(validateStatus)}
help={help}
rules={[{ required: true }]}
@@ -319,8 +321,8 @@ export const useFlinkApplication = (openStartModal: Fn) => {
class="!pt-40px"
ref={mappingRef}
name="mappingForm"
- labelCol={{ lg: { span: 5 }, sm: { span: 5 } }}
- wrapperCol={{ lg: { span: 18 }, sm: { span: 18 } }}
+ baseColProps = {{ span: 20, offset: 2 }}
+ layout='vertical'
v-model:model={formValue}
>
<Form.Item label="Job Name">
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index c9f7cd2a5..4bd37db9a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
-import org.apache.streampark.flink.core.FlinkKubernetesClient
+import org.apache.streampark.flink.deployment.FlinkKubernetesClient
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.model.ClusterKey
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 9d3b8c6f9..a685e861c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.client.impl
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.core.YarnClusterDescriptorWrapper
+import org.apache.streampark.flink.deployment.YarnClusterDescriptorWrapper
import org.apache.streampark.flink.util.FlinkUtils
import org.apache.flink.client.program.PackagedProgram
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index df61ee9d0..675b0ea08 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,12 +18,11 @@
package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode}
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode}
import org.apache.streampark.common.util.{DeflaterUtils, Logger,
PropertiesUtils, Utils}
import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
+import org.apache.streampark.flink.deployment.FlinkClusterClient
import com.google.common.collect.Lists
import org.apache.commons.cli.{CommandLine, Options}
@@ -36,7 +35,6 @@ import
org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.{ClusterClient, PackagedProgram,
PackagedProgramUtils}
import org.apache.flink.configuration._
import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
-import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
import java.io.File
diff --git
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
index 2df1c6443..ef2de3be0 100644
---
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
+++
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.flink.core.{FlinkTableInitializer, TableContext}
-import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api.TableConfig
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index 528e47907..a42c0e669 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -27,7 +27,7 @@ import
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import scala.util.Try
-object EnhancerImplicit {
+private[flink] object EnhancerImplicit {
implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
similarity index 91%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
index f7aa97fb4..f519cfb64 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core.conf
+package org.apache.streampark.flink.core
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
-case class FlinkConfiguration(
+private[flink] case class FlinkConfiguration(
parameter: ParameterTool,
envConfig: Configuration,
tableConfig: Configuration)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 58b0f6747..e46c6449e 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -31,7 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable
import scala.util.Try
-object FlinkSqlExecutor extends Logger {
+private[flink] object FlinkSqlExecutor extends Logger {
private[this] val lock = new ReentrantReadWriteLock().writeLock
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 70101672e..5841d5cd3 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -33,7 +33,7 @@ import
org.apache.flink.table.planner.delegation.FlinkSqlParserFactories
import scala.util.{Failure, Try}
-object FlinkSqlValidator extends Logger {
+private[flink] object FlinkSqlValidator extends Logger {
private[this] val FLINK112_CALCITE_PARSER_CLASS =
"org.apache.flink.table.planner.calcite.CalciteParser"
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index a50116925..b0a746005 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -17,8 +17,8 @@
package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.flink.core.EnhancerImplicit._
+import EnhancerImplicit._
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.api.common.{JobExecutionResult, RuntimeExecutionMode}
import org.apache.flink.api.common.cache.DistributedCache
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index b3ee24c33..47a5bce6b 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -20,10 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util._
-import org.apache.streampark.flink.core.conf.FlinkConfiguration
-import collection.{mutable, Map}
-import collection.JavaConversions._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment
=> JavaStreamEnv}
@@ -32,6 +29,10 @@ import org.apache.flink.table.api.TableConfig
import java.io.File
+import scala.collection.{mutable, Map}
+import scala.collection.JavaConversions._
+import scala.util.Try
+
private[flink] object FlinkStreamingInitializer {
def initialize(args: Array[String], config: (StreamExecutionEnvironment,
ParameterTool) => Unit)
@@ -80,60 +81,73 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
def initParameter(): FlinkConfiguration = {
val argsMap = ParameterTool.fromArgs(args)
- val config = argsMap.get(KEY_APP_CONF(), null) match {
- case null | "" =>
- throw new ExceptionInInitializerError(
- "[StreamPark] Usage:can't fond config,please set \"--conf $path \"
in main arguments")
- case file => file
+ val configMap = parseConfig(argsMap)
+ if (!argsMap.has(KEY_APP_CONF()) && configMap.isEmpty) {
+ throw new IllegalArgumentException(
+ "[StreamPark] Usage:can't fond config, please set \"--conf $path \" in
main arguments")
}
- val configMap = parseConfig(config)
- val properConf = extractConfigByPrefix(configMap,
KEY_FLINK_PROPERTY_PREFIX)
+ val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
-
// config priority: explicitly specified priority > project profiles >
system profiles
val parameter = ParameterTool
.fromSystemProperties()
- .mergeWith(ParameterTool.fromMap(properConf))
+ .mergeWith(ParameterTool.fromMap(flinkConf))
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(argsMap)
- val envConfig = Configuration.fromMap(properConf)
+ val envConfig = Configuration.fromMap(flinkConf)
FlinkConfiguration(parameter, envConfig, null)
}
- def parseConfig(config: String): Map[String, String] = {
-
- lazy val content = DeflaterUtils.unzipString(config.drop(7))
-
- def readConfig(text: String): Map[String, String] = {
- val format = config.split("\\.").last.toLowerCase
- format match {
- case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
- case "conf" => PropertiesUtils.fromHoconText(text)
- case "properties" => PropertiesUtils.fromPropertiesText(text)
- case _ =>
- throw new IllegalArgumentException(
- "[StreamPark] Usage: application config file error,must be
[yaml|conf|properties]")
- }
+ def parseConfig(parameterTool: ParameterTool): Map[String, String] = {
+ parameterTool.get(KEY_APP_CONF(), null) match {
+ case null | "" =>
+ logWarn("Usage:can't fond config, now trying to find from jar")
+ val configs =
+ Set("application.yml", "application.yaml", "application.conf",
"application.properties")
+ configs
+ .find(
+ f => Try(this.getClass.getClassLoader.getResource(f).getPath !=
null).getOrElse(false))
+ .map(
+ f => {
+ val input = this.getClass.getClassLoader.getResourceAsStream(f)
+ val content = FileUtils.readString(input)
+ val format = f.split("\\.").last.toLowerCase
+ readConfig(format, content)
+ })
+ .getOrElse(return Map.empty[String, String])
+ case config =>
+ lazy val content = DeflaterUtils.unzipString(config.drop(7))
+ lazy val format = config.split("\\.").last.toLowerCase
+ val configAsMap = config match {
+ case x if x.startsWith("yaml://") =>
PropertiesUtils.fromYamlText(content)
+ case x if x.startsWith("conf://") =>
PropertiesUtils.fromHoconText(content)
+ case x if x.startsWith("prop://") =>
PropertiesUtils.fromPropertiesText(content)
+ case x if x.startsWith("hdfs://") =>
+ // If the configuration file with the hdfs, user will need to copy
the hdfs-related configuration files under the resources dir
+ val text = HdfsUtils.read(x)
+ readConfig(format, text)
+ case _ =>
+ val configFile = new File(config)
+ require(
+ configFile.exists(),
+ s"[StreamPark] Usage: application config file: $configFile is
not found!!!")
+ val text = FileUtils.readString(configFile)
+ readConfig(format, text)
+ }
+ configAsMap.filter(_._2.nonEmpty)
}
+ }
- val map = config match {
- case x if x.startsWith("yaml://") =>
PropertiesUtils.fromYamlText(content)
- case x if x.startsWith("conf://") =>
PropertiesUtils.fromHoconText(content)
- case x if x.startsWith("prop://") =>
PropertiesUtils.fromPropertiesText(content)
- case x if x.startsWith("hdfs://") =>
- // If the configuration file with the hdfs, user will need to copy the
hdfs-related configuration files under the resources dir
- val text = HdfsUtils.read(x)
- readConfig(text)
+ private[this] def readConfig(format: String, text: String): Map[String,
String] = {
+ format match {
+ case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+ case "conf" => PropertiesUtils.fromHoconText(text)
+ case "properties" => PropertiesUtils.fromPropertiesText(text)
case _ =>
- val configFile = new File(config)
- require(
- configFile.exists(),
- s"[StreamPark] Usage: application config file: $configFile is not
found!!!")
- val text = FileUtils.readString(configFile)
- readConfig(text)
+ throw new IllegalArgumentException(
+ "[StreamPark] Usage: application config file error,must be
[yaml|conf|properties]")
}
- map.filter(_._2.nonEmpty)
}
def extractConfigByPrefix(configMap: Map[String, String], prefix: String):
Map[String, String] = {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index a7d52749d..56267cbda 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -19,9 +19,8 @@ package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.{ApiType, PlannerType}
import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
+import org.apache.streampark.common.util.DeflaterUtils
import org.apache.streampark.flink.core.EnhancerImplicit._
-import org.apache.streampark.flink.core.conf.FlinkConfiguration
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
@@ -29,10 +28,8 @@ import
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableEnvironment}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
-import java.io.File
-
-import scala.collection.{mutable, Map}
import scala.collection.JavaConversions._
+import scala.collection.Map
import scala.util.{Failure, Success, Try}
private[flink] object FlinkTableInitializer {
@@ -176,63 +173,46 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
/** In case of table SQL, the parameter conf is not required, it depends on
the developer. */
override def initParameter(): FlinkConfiguration = {
- val configuration = {
- val argsMap = ParameterTool.fromArgs(args)
- argsMap.get(KEY_APP_CONF(), null) match {
- case null | "" =>
- logWarn("Usage:can't fond config,you can set \"--conf $path \" in
main arguments")
- val parameter =
ParameterTool.fromSystemProperties().mergeWith(argsMap)
- FlinkConfiguration(parameter, new Configuration(), new
Configuration())
- case file =>
- val configMap = parseConfig(file)
- // set sql..
- val sqlConf = mutable.Map[String, String]()
- configMap.foreach(
- x => {
- if (x._1.startsWith(KEY_SQL_PREFIX)) {
- sqlConf += x._1.drop(KEY_SQL_PREFIX.length) -> x._2
- }
- })
-
- // config priority: explicitly specified priority > project profiles
> system profiles
- val properConf = extractConfigByPrefix(configMap,
KEY_FLINK_PROPERTY_PREFIX)
- val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
- val tableConf = extractConfigByPrefix(configMap,
KEY_FLINK_TABLE_PREFIX)
-
- val tableConfig = Configuration.fromMap(tableConf)
- val envConfig = Configuration.fromMap(properConf)
-
- val parameter = ParameterTool
- .fromSystemProperties()
- .mergeWith(ParameterTool.fromMap(properConf))
- .mergeWith(ParameterTool.fromMap(tableConf))
- .mergeWith(ParameterTool.fromMap(appConf))
- .mergeWith(ParameterTool.fromMap(sqlConf))
- .mergeWith(argsMap)
-
- FlinkConfiguration(parameter, envConfig, tableConfig)
+ val cliParameterTool = ParameterTool.fromArgs(args)
+ val configMap = super.parseConfig(cliParameterTool)
+ val flinkConfiguration = {
+ if (!cliParameterTool.has(KEY_APP_CONF()) && configMap.isEmpty) {
+ logWarn(
+ "Usage: can't fond config, this config is not required, you can set
\"--conf $path \" in main arguments")
+ FlinkConfiguration(cliParameterTool, new Configuration(), new
Configuration())
+ } else {
+ // config priority: explicitly specified priority > project profiles >
system profiles
+ val flinkConf = extractConfigByPrefix(configMap,
KEY_FLINK_PROPERTY_PREFIX)
+ val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+ val tableConf = extractConfigByPrefix(configMap,
KEY_FLINK_TABLE_PREFIX)
+ val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX)
+
+ val envConfig = Configuration.fromMap(flinkConf)
+ val tableConfig = Configuration.fromMap(tableConf)
+
+ val parameterTool = ParameterTool
+ .fromSystemProperties()
+ .mergeWith(ParameterTool.fromMap(flinkConf))
+ .mergeWith(ParameterTool.fromMap(appConf))
+ .mergeWith(ParameterTool.fromMap(tableConf))
+ .mergeWith(ParameterTool.fromMap(sqlConf))
+
+ FlinkConfiguration(parameterTool, envConfig, tableConfig)
}
}
- configuration.parameter.get(KEY_FLINK_SQL()) match {
- case null => configuration
- case param =>
- // for streampark-console
- Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) =>
- configuration.copy(parameter = configuration.parameter.mergeWith(
- ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value))))
- case Failure(_) =>
- val sqlFile = new File(param)
- Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) =>
- configuration.copy(parameter =
-
configuration.parameter.mergeWith(ParameterTool.fromMap(value)))
- case Failure(e) =>
- new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- configuration
- }
- }
+ if (cliParameterTool.has(KEY_FLINK_SQL())) {
+ // for streampark-console
+ Try(DeflaterUtils.unzipString(cliParameterTool.get(KEY_FLINK_SQL())))
match {
+ case Success(value) =>
+ flinkConfiguration.copy(parameter =
flinkConfiguration.parameter.mergeWith(
+ ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value))))
+ case Failure(e) =>
+ new IllegalArgumentException(s"[StreamPark] --sql parsing failed.
$e")
+ flinkConfiguration
+ }
+ } else {
+ flinkConfiguration
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 755c40103..f73844b54 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -17,8 +17,8 @@
package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst.printLogo
-import org.apache.streampark.flink.core.EnhancerImplicit._
+import EnhancerImplicit._
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api._
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 1f00d0fce..653c43035 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -30,7 +30,7 @@ import scala.collection.{immutable, mutable}
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
-object SqlCommandParser extends Logger {
+private[flink] object SqlCommandParser extends Logger {
def parseSQL(
sql: String,
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
index 6d8393ee8..0d5095b4a 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
index 2103a6353..611d3f09d 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
similarity index 94%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
index cf97dd98b..c11a51f75 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala
@@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.yarn.YarnClusterDescriptor
import java.io.File
import java.util
-import java.util.List
class YarnClusterDescriptorTrait(yarnClusterDescriptor: YarnClusterDescriptor)
{
def addShipFiles(shipFiles: util.List[File]) =
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..6fc4d1614 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a8b0ba5c6..105a4d9a4 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.yarn.YarnClusterDescriptor
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..5919bf84d 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
copy from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
copy to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.yarn.YarnClusterDescriptor
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
deleted file mode 100644
index a8b0ba5c6..000000000
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.yarn.YarnClusterDescriptor
-
-class YarnClusterDescriptorWrapper(yarnClusterDescriptor:
YarnClusterDescriptor)
- extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 6beb92a8a..6fd62c1fb 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 83e9b4d89..6fc4d1614 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a8b0ba5c6..105a4d9a4 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.yarn.YarnClusterDescriptor
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
copy from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
copy to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index 89184c756..3ed7e7bbc 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.yarn.YarnClusterDescriptor
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.yarn.YarnClusterDescriptor
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.yarn.YarnClusterDescriptor
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 90%
copy from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
copy to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..f2a4cc51e 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
+
+import org.apache.streampark.flink.deployment.FlinkKubernetesClientTrait
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 95%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index 9d3393ec7..ad6e0afd9 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.yarn.YarnClusterDescriptor
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
deleted file mode 100644
index 4f6336f5a..000000000
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.core.execution.SavepointFormatType
-
-import java.util.concurrent.CompletableFuture
-
-class FlinkClusterClient[T](clusterClient: ClusterClient[T])
- extends FlinkClientTrait[T](clusterClient) {
-
- override def triggerSavepoint(jobID: JobID, savepointDir: String):
CompletableFuture[String] = {
- clusterClient.triggerSavepoint(jobID, savepointDir,
SavepointFormatType.DEFAULT)
- }
-
- override def cancelWithSavepoint(
- jobID: JobID,
- savepointDirectory: String): CompletableFuture[String] = {
- clusterClient.cancelWithSavepoint(jobID, savepointDirectory,
SavepointFormatType.DEFAULT)
- }
-
- override def stopWithSavepoint(
- jobID: JobID,
- advanceToEndOfEventTime: Boolean,
- savepointDirectory: String): CompletableFuture[String] = {
- clusterClient.stopWithSavepoint(
- jobID,
- advanceToEndOfEventTime,
- savepointDirectory,
- SavepointFormatType.DEFAULT)
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
copy from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
copy to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 93%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a09b86844..2fef79071 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
-import collection.JavaConversions._
import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.hadoop.fs.Path
import java.io.File
import java.util
+import scala.collection.JavaConversions._
+
class YarnClusterDescriptorWrapper(yarnClusterDescriptor:
YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
deleted file mode 100644
index 4f6336f5a..000000000
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.core.execution.SavepointFormatType
-
-import java.util.concurrent.CompletableFuture
-
-class FlinkClusterClient[T](clusterClient: ClusterClient[T])
- extends FlinkClientTrait[T](clusterClient) {
-
- override def triggerSavepoint(jobID: JobID, savepointDir: String):
CompletableFuture[String] = {
- clusterClient.triggerSavepoint(jobID, savepointDir,
SavepointFormatType.DEFAULT)
- }
-
- override def cancelWithSavepoint(
- jobID: JobID,
- savepointDirectory: String): CompletableFuture[String] = {
- clusterClient.cancelWithSavepoint(jobID, savepointDirectory,
SavepointFormatType.DEFAULT)
- }
-
- override def stopWithSavepoint(
- jobID: JobID,
- advanceToEndOfEventTime: Boolean,
- savepointDirectory: String): CompletableFuture[String] = {
- clusterClient.stopWithSavepoint(
- jobID,
- advanceToEndOfEventTime,
- savepointDirectory,
- SavepointFormatType.DEFAULT)
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
deleted file mode 100644
index f388c8e9f..000000000
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.core
-
-import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
-import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
-
-import java.util.Optional
-
-class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
- extends FlinkKubernetesClientTrait(kubeClient) {
-
- override def getService(serviceName: String): Optional[KubernetesService] = {
- kubeClient.getService(serviceName)
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index 65f715c75..81a855c7a 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -17,6 +17,8 @@
package org.apache.streampark.flink.core
+import org.apache.streampark.flink.core.FlinkTableInitializer
+
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat,
PlanReference, Schema, Table, TableDescriptor, TableResult}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index e8f704f39..7aff52560 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -17,6 +17,8 @@
package org.apache.streampark.flink.core
+import org.apache.streampark.flink.core.FlinkTableInitializer
+
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat,
PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.CatalogDescriptor
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
similarity index 97%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
index 4f6336f5a..fd1e58803 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
index f388c8e9f..59b9a517c 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
rename to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
index a09b86844..72cd605c8 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.core
+package org.apache.streampark.flink.deployment
import collection.JavaConversions._
import org.apache.flink.yarn.YarnClusterDescriptor
diff --git
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index c4be0ab58..c80d520f9 100644
---
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -55,7 +55,7 @@ object SqlClient extends App {
case Some(e) =>
// 1) flink sql execution.runtime-mode has highest priority
val m = e.operands(1).toUpperCase()
- arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+ arguments += s"--${ExecutionOptions.RUNTIME_MODE.key()} $m"
m
case None =>
// 2) dynamic properties execution.runtime-mode
@@ -68,7 +68,7 @@ object SqlClient extends App {
// 3) application conf execution.runtime-mode
parameter.getOrElse(KEY_FLINK_TABLE_MODE,
defaultMode).toUpperCase()
}
- arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+ arguments += s"--${ExecutionOptions.RUNTIME_MODE.key()} $m"
m
case m => m
}