This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch flink-conf in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit ba12778d041b866241ce03709c5ff4c05a4748a1 Author: benjobs <[email protected]> AuthorDate: Fri Sep 13 08:22:40 2024 +0800 [Improve] custom-code job read conf from jar support --- .../apache/streampark/common/util/FileUtils.scala | 16 +++ .../impl/KubernetesNativeSessionClient.scala | 2 +- .../flink/client/impl/YarnPerJobClient.scala | 2 +- .../flink/client/trait/FlinkClientTrait.scala | 6 +- .../streampark/flink/core/EnhancerImplicit.scala | 2 +- .../flink/core/{conf => }/FlinkConfiguration.scala | 4 +- .../streampark/flink/core/FlinkSqlExecutor.scala | 2 +- .../streampark/flink/core/FlinkSqlValidator.scala | 2 +- .../flink/core/FlinkStreamTableTrait.scala | 2 +- .../flink/core/FlinkStreamingInitializer.scala | 107 ++++++++++++--------- .../flink/core/FlinkTableInitializer.scala | 63 ++++++------ .../streampark/flink/core/FlinkTableTrait.scala | 2 +- .../streampark/flink/core/SqlCommandParser.scala | 2 +- .../{core => deployment}/FlinkClientTrait.scala | 2 +- .../FlinkKubernetesClientTrait.scala | 2 +- .../YarnClusterDescriptorTrait.scala | 3 +- .../flink/core/YarnClusterDescriptorWrapper.scala | 22 ----- .../flink/deployment}/FlinkClusterClient.scala | 2 +- .../flink/deployment}/FlinkKubernetesClient.scala | 2 +- .../deployment}/YarnClusterDescriptorWrapper.scala | 2 +- .../flink/deployment}/FlinkClusterClient.scala | 2 +- .../FlinkKubernetesClient.scala | 3 +- .../deployment}/YarnClusterDescriptorWrapper.scala | 2 +- .../flink/deployment}/FlinkClusterClient.scala | 2 +- .../flink/deployment}/FlinkKubernetesClient.scala | 2 +- .../YarnClusterDescriptorWrapper.scala | 2 +- .../{core => deployment}/FlinkClusterClient.scala | 2 +- .../FlinkKubernetesClient.scala | 2 +- .../deployment}/YarnClusterDescriptorWrapper.scala | 2 +- .../flink/deployment}/FlinkClusterClient.scala | 2 +- .../FlinkKubernetesClient.scala | 2 +- .../deployment}/YarnClusterDescriptorWrapper.scala | 2 +- .../flink/deployment}/FlinkClusterClient.scala | 2 +- .../flink/deployment}/FlinkKubernetesClient.scala | 2 +- .../deployment}/YarnClusterDescriptorWrapper.scala | 2 +- .../flink/deployment}/FlinkClusterClient.scala | 2 +- .../flink/deployment}/FlinkKubernetesClient.scala | 4 +- .../deployment}/YarnClusterDescriptorWrapper.scala | 2 +- .../streampark/flink/core/FlinkClusterClient.scala | 49 ---------- .../flink/deployment}/FlinkClusterClient.scala | 2 +- .../flink/deployment}/FlinkKubernetesClient.scala | 2 +- .../deployment}/YarnClusterDescriptorWrapper.scala | 5 +- .../streampark/flink/core/FlinkClusterClient.scala | 49 ---------- .../flink/core/FlinkKubernetesClient.scala | 31 ------ .../streampark/flink/core/StreamTableContext.scala | 2 + .../streampark/flink/core/TableContext.scala | 2 + .../flink/deployment}/FlinkClusterClient.scala | 2 +- .../flink/deployment}/FlinkKubernetesClient.scala | 2 +- .../deployment}/YarnClusterDescriptorWrapper.scala | 2 +- 49 files changed, 158 insertions(+), 276 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index 77618fbd4..d3eed9d2f 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -170,4 +170,20 @@ object FileUtils { buffer.toString() } + @throws[IOException] + def readString(in: InputStream): String = { + require(in != null) + val scanner = new Scanner(in) + val buffer = new mutable.StringBuilder() + if (scanner.hasNextLine) { + buffer.append(scanner.nextLine()) + } + while (scanner.hasNextLine) { + buffer.append("\r\n") + buffer.append(scanner.nextLine()) + } + Utils.close(scanner) + buffer.toString() + } + } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index c9f7cd2a5..4bd37db9a 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -22,7 +22,7 @@ import org.apache.streampark.common.util.{Logger, Utils} import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper -import org.apache.streampark.flink.core.FlinkKubernetesClient +import org.apache.streampark.flink.deployment.FlinkKubernetesClient import org.apache.streampark.flink.kubernetes.KubernetesRetriever import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode import org.apache.streampark.flink.kubernetes.model.ClusterKey diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala index 9d3b8c6f9..a685e861c 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala @@ -19,7 +19,7 @@ package org.apache.streampark.flink.client.impl import org.apache.streampark.flink.client.`trait`.YarnClientTrait import org.apache.streampark.flink.client.bean._ -import org.apache.streampark.flink.core.YarnClusterDescriptorWrapper +import org.apache.streampark.flink.deployment.YarnClusterDescriptorWrapper import org.apache.streampark.flink.util.FlinkUtils import org.apache.flink.client.program.PackagedProgram diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index df61ee9d0..675b0ea08 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -18,12 +18,11 @@ package org.apache.streampark.flink.client.`trait` import org.apache.streampark.common.conf.ConfigConst._ -import org.apache.streampark.common.conf.Workspace -import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode} +import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode} import org.apache.streampark.common.util.{DeflaterUtils, Logger, PropertiesUtils, Utils} import org.apache.streampark.flink.client.bean._ -import org.apache.streampark.flink.core.FlinkClusterClient import org.apache.streampark.flink.core.conf.FlinkRunOption +import org.apache.streampark.flink.deployment.FlinkClusterClient import com.google.common.collect.Lists import org.apache.commons.cli.{CommandLine, Options} @@ -36,7 +35,6 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration import org.apache.flink.client.program.{ClusterClient, PackagedProgram, PackagedProgramUtils} import org.apache.flink.configuration._ import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions} -import org.apache.flink.util.FlinkException import org.apache.flink.util.Preconditions.checkNotNull import java.io.File diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala index 528e47907..a42c0e669 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import scala.util.Try -object EnhancerImplicit { +private[flink] object EnhancerImplicit { implicit class EnhanceParameterTool(parameterTool: ParameterTool) { diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala similarity index 91% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala index f7aa97fb4..f519cfb64 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkConfiguration.scala @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core.conf +package org.apache.streampark.flink.core import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration.Configuration -case class FlinkConfiguration( +private[flink] case class FlinkConfiguration( parameter: ParameterTool, envConfig: Configuration, tableConfig: Configuration) diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala index 58b0f6747..e46c6449e 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala @@ -31,7 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.mutable import scala.util.Try -object FlinkSqlExecutor extends Logger { +private[flink] object FlinkSqlExecutor extends Logger { private[this] val lock = new ReentrantReadWriteLock().writeLock diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala index 70101672e..5841d5cd3 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala @@ -33,7 +33,7 @@ import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories import scala.util.{Failure, Try} -object FlinkSqlValidator extends Logger { +private[flink] object FlinkSqlValidator extends Logger { private[this] val FLINK112_CALCITE_PARSER_CLASS = "org.apache.flink.table.planner.calcite.CalciteParser" diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala index a50116925..b0a746005 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala @@ -17,8 +17,8 @@ package org.apache.streampark.flink.core import org.apache.streampark.common.conf.ConfigConst._ -import org.apache.streampark.flink.core.EnhancerImplicit._ +import EnhancerImplicit._ import com.esotericsoftware.kryo.Serializer import org.apache.flink.api.common.{JobExecutionResult, RuntimeExecutionMode} import org.apache.flink.api.common.cache.DistributedCache diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index b3ee24c33..604551d05 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -20,10 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst._ import org.apache.streampark.common.enums.ApiType import org.apache.streampark.common.enums.ApiType.ApiType import org.apache.streampark.common.util._ -import org.apache.streampark.flink.core.conf.FlinkConfiguration -import collection.{mutable, Map} -import collection.JavaConversions._ import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamEnv} @@ -32,6 +29,10 @@ import org.apache.flink.table.api.TableConfig import java.io.File +import scala.collection.{mutable, Map} +import scala.collection.JavaConversions._ +import scala.util.Try + private[flink] object FlinkStreamingInitializer { def initialize(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit) @@ -79,61 +80,79 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api lazy val configuration: FlinkConfiguration = initParameter() def initParameter(): FlinkConfiguration = { - val argsMap = ParameterTool.fromArgs(args) - val config = argsMap.get(KEY_APP_CONF(), null) match { - case null | "" => - throw new ExceptionInInitializerError( - "[StreamPark] Usage:can't fond config,please set \"--conf $path \" in main arguments") - case file => file + val configMap = parseConfig() + if (configMap.isEmpty) { + throw new IllegalArgumentException( + "[StreamPark] Usage:can't fond config,please set \"--conf $path \" in main arguments") } - val configMap = parseConfig(config) - val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) + val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX) - // config priority: explicitly specified priority > project profiles > system profiles val parameter = ParameterTool .fromSystemProperties() - .mergeWith(ParameterTool.fromMap(properConf)) + .mergeWith(ParameterTool.fromMap(flinkConf)) .mergeWith(ParameterTool.fromMap(appConf)) - .mergeWith(argsMap) - val envConfig = Configuration.fromMap(properConf) + val envConfig = Configuration.fromMap(flinkConf) FlinkConfiguration(parameter, envConfig, null) } - def parseConfig(config: String): Map[String, String] = { - - lazy val content = DeflaterUtils.unzipString(config.drop(7)) - - def readConfig(text: String): Map[String, String] = { - val format = config.split("\\.").last.toLowerCase - format match { - case "yml" | "yaml" => PropertiesUtils.fromYamlText(text) - case "conf" => PropertiesUtils.fromHoconText(text) - case "properties" => PropertiesUtils.fromPropertiesText(text) - case _ => - throw new IllegalArgumentException( - "[StreamPark] Usage: application config file error,must be [yaml|conf|properties]") - } + def parseConfig(): Map[String, String] = { + val argsMap = ParameterTool.fromArgs(args) + val configAsMap = argsMap.get(KEY_APP_CONF(), null) match { + case null | "" => + logWarn("[StreamPark] Usage:can't fond config, Now try to find from jar") + val propFormats = + Set("application.yml", "application.yaml", "application.conf", "application.properties") + propFormats + .find( + f => { + Try(this.getClass.getClassLoader.getResource(f)).getOrElse(false) == true + }) + .map( + f => { + val input = this.getClass.getClassLoader.getResourceAsStream(f) + val content = FileUtils.readString(input) + val format = f.split("\\.").last.toLowerCase + readConfig(format, content) + }) + .getOrElse(return Map.empty[String, String]) + + case config => + lazy val content = DeflaterUtils.unzipString(config.drop(7)) + lazy val format = config.split("\\.").last.toLowerCase + val map = config match { + case x if x.startsWith("yaml://") => PropertiesUtils.fromYamlText(content) + case x if x.startsWith("conf://") => PropertiesUtils.fromHoconText(content) + case x if x.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content) + case x if x.startsWith("hdfs://") => + // If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir + val text = HdfsUtils.read(x) + readConfig(format, text) + case _ => + val configFile = new File(config) + require( + configFile.exists(), + s"[StreamPark] Usage: application config file: $configFile is not found!!!") + val text = FileUtils.readString(configFile) + readConfig(format, text) + } + map } + // overview config... + configAsMap.putAll(argsMap.toMap.filter(_._1 != KEY_APP_CONF())) + configAsMap.filter(_._2.nonEmpty) + } - val map = config match { - case x if x.startsWith("yaml://") => PropertiesUtils.fromYamlText(content) - case x if x.startsWith("conf://") => PropertiesUtils.fromHoconText(content) - case x if x.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content) - case x if x.startsWith("hdfs://") => - // If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir - val text = HdfsUtils.read(x) - readConfig(text) + private[this] def readConfig(format: String, text: String): Map[String, String] = { + format match { + case "yml" | "yaml" => PropertiesUtils.fromYamlText(text) + case "conf" => PropertiesUtils.fromHoconText(text) + case "properties" => PropertiesUtils.fromPropertiesText(text) case _ => - val configFile = new File(config) - require( - configFile.exists(), - s"[StreamPark] Usage: application config file: $configFile is not found!!!") - val text = FileUtils.readString(configFile) - readConfig(text) + throw new IllegalArgumentException( + "[StreamPark] Usage: application config file error,must be [yaml|conf|properties]") } - map.filter(_._2.nonEmpty) } def extractConfigByPrefix(configMap: Map[String, String], prefix: String): Map[String, String] = { diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index a7d52749d..208566ae3 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -21,7 +21,6 @@ import org.apache.streampark.common.enums.{ApiType, PlannerType} import org.apache.streampark.common.enums.ApiType.ApiType import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils} import org.apache.streampark.flink.core.EnhancerImplicit._ -import org.apache.streampark.flink.core.conf.FlinkConfiguration import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration.Configuration @@ -177,41 +176,35 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType override def initParameter(): FlinkConfiguration = { val configuration = { - val argsMap = ParameterTool.fromArgs(args) - argsMap.get(KEY_APP_CONF(), null) match { - case null | "" => - logWarn("Usage:can't fond config,you can set \"--conf $path \" in main arguments") - val parameter = ParameterTool.fromSystemProperties().mergeWith(argsMap) - FlinkConfiguration(parameter, new Configuration(), new Configuration()) - case file => - val configMap = parseConfig(file) - // set sql.. - val sqlConf = mutable.Map[String, String]() - configMap.foreach( - x => { - if (x._1.startsWith(KEY_SQL_PREFIX)) { - sqlConf += x._1.drop(KEY_SQL_PREFIX.length) -> x._2 - } - }) - - // config priority: explicitly specified priority > project profiles > system profiles - val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) - val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX) - val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX) - - val tableConfig = Configuration.fromMap(tableConf) - val envConfig = Configuration.fromMap(properConf) - - val parameter = ParameterTool - .fromSystemProperties() - .mergeWith(ParameterTool.fromMap(properConf)) - .mergeWith(ParameterTool.fromMap(tableConf)) - .mergeWith(ParameterTool.fromMap(appConf)) - .mergeWith(ParameterTool.fromMap(sqlConf)) - .mergeWith(argsMap) - - FlinkConfiguration(parameter, envConfig, tableConfig) + val configMap = parseConfig() + if (configMap.isEmpty) { + logWarn("Usage:can't fond config,you can set \"--conf $path \" in main arguments") } + // set sql.. + val sqlConf = mutable.Map[String, String]() + configMap.foreach( + x => { + if (x._1.startsWith(KEY_SQL_PREFIX)) { + sqlConf += x._1.drop(KEY_SQL_PREFIX.length) -> x._2 + } + }) + + // config priority: explicitly specified priority > project profiles > system profiles + val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) + val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX) + val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX) + + val tableConfig = Configuration.fromMap(tableConf) + val envConfig = Configuration.fromMap(flinkConf) + + val parameter = ParameterTool + .fromSystemProperties() + .mergeWith(ParameterTool.fromMap(flinkConf)) + .mergeWith(ParameterTool.fromMap(tableConf)) + .mergeWith(ParameterTool.fromMap(appConf)) + .mergeWith(ParameterTool.fromMap(sqlConf)) + + FlinkConfiguration(parameter, envConfig, tableConfig) } configuration.parameter.get(KEY_FLINK_SQL()) match { diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala index 755c40103..f73844b54 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala @@ -17,8 +17,8 @@ package org.apache.streampark.flink.core import org.apache.streampark.common.conf.ConfigConst.printLogo -import org.apache.streampark.flink.core.EnhancerImplicit._ +import EnhancerImplicit._ import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.table.api._ diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala index 1f00d0fce..653c43035 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala @@ -30,7 +30,7 @@ import scala.collection.{immutable, mutable} import scala.collection.mutable.ListBuffer import scala.util.control.Breaks.{break, breakable} -object SqlCommandParser extends Logger { +private[flink] object SqlCommandParser extends Logger { def parseSQL( sql: String, diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala similarity index 96% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala index 6d8393ee8..0d5095b4a 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkClientTrait.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.api.common.JobID import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala similarity index 96% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala index 2103a6353..611d3f09d 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClientTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClientTrait.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala similarity index 94% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala index cf97dd98b..c11a51f75 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorTrait.scala @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.yarn.YarnClusterDescriptor import java.io.File import java.util -import java.util.List class YarnClusterDescriptorTrait(yarnClusterDescriptor: YarnClusterDescriptor) { def addShipFiles(shipFiles: util.List[File]) = diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala deleted file mode 100644 index a8b0ba5c6..000000000 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.streampark.flink.core - -import org.apache.flink.yarn.YarnClusterDescriptor - -class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor) - extends YarnClusterDescriptorTrait(yarnClusterDescriptor) diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 6beb92a8a..6fd62c1fb 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index 83e9b4d89..6fc4d1614 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index a8b0ba5c6..105a4d9a4 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.yarn.YarnClusterDescriptor diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 6beb92a8a..6fd62c1fb 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index 83e9b4d89..5919bf84d 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core + +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 95% copy from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala copy to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index 9d3393ec7..ad6e0afd9 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.yarn.YarnClusterDescriptor diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 6beb92a8a..6fd62c1fb 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index 83e9b4d89..6fc4d1614 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index a8b0ba5c6..105a4d9a4 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.yarn.YarnClusterDescriptor diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 97% copy from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala copy to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 4f6336f5a..fd1e58803 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.api.common.JobID import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 96% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index 89184c756..3ed7e7bbc 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index 9d3393ec7..ad6e0afd9 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.yarn.YarnClusterDescriptor diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 97% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 4f6336f5a..fd1e58803 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.api.common.JobID import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 96% copy from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala copy to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index f388c8e9f..59b9a517c 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index 9d3393ec7..ad6e0afd9 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.yarn.YarnClusterDescriptor diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 97% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 4f6336f5a..fd1e58803 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.api.common.JobID import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 96% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index f388c8e9f..59b9a517c 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index 9d3393ec7..ad6e0afd9 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.yarn.YarnClusterDescriptor diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 97% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 4f6336f5a..fd1e58803 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.api.common.JobID import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 90% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index f388c8e9f..f2a4cc51e 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment + +import org.apache.streampark.flink.deployment.FlinkKubernetesClientTrait import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 95% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index 9d3393ec7..ad6e0afd9 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.yarn.YarnClusterDescriptor diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala deleted file mode 100644 index 4f6336f5a..000000000 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.streampark.flink.core - -import org.apache.flink.api.common.JobID -import org.apache.flink.client.program.ClusterClient -import org.apache.flink.core.execution.SavepointFormatType - -import java.util.concurrent.CompletableFuture - -class FlinkClusterClient[T](clusterClient: ClusterClient[T]) - extends FlinkClientTrait[T](clusterClient) { - - override def triggerSavepoint(jobID: JobID, savepointDir: String): CompletableFuture[String] = { - clusterClient.triggerSavepoint(jobID, savepointDir, SavepointFormatType.DEFAULT) - } - - override def cancelWithSavepoint( - jobID: JobID, - savepointDirectory: String): CompletableFuture[String] = { - clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT) - } - - override def stopWithSavepoint( - jobID: JobID, - advanceToEndOfEventTime: Boolean, - savepointDirectory: String): CompletableFuture[String] = { - clusterClient.stopWithSavepoint( - jobID, - advanceToEndOfEventTime, - savepointDirectory, - SavepointFormatType.DEFAULT) - } - -} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 97% copy from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala copy to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 4f6336f5a..fd1e58803 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.api.common.JobID import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 96% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index f388c8e9f..59b9a517c 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 93% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index a09b86844..2fef79071 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -15,15 +15,16 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment -import collection.JavaConversions._ import org.apache.flink.yarn.YarnClusterDescriptor import org.apache.hadoop.fs.Path import java.io.File import java.util +import scala.collection.JavaConversions._ + class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor) extends YarnClusterDescriptorTrait(yarnClusterDescriptor) { diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala deleted file mode 100644 index 4f6336f5a..000000000 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.streampark.flink.core - -import org.apache.flink.api.common.JobID -import org.apache.flink.client.program.ClusterClient -import org.apache.flink.core.execution.SavepointFormatType - -import java.util.concurrent.CompletableFuture - -class FlinkClusterClient[T](clusterClient: ClusterClient[T]) - extends FlinkClientTrait[T](clusterClient) { - - override def triggerSavepoint(jobID: JobID, savepointDir: String): CompletableFuture[String] = { - clusterClient.triggerSavepoint(jobID, savepointDir, SavepointFormatType.DEFAULT) - } - - override def cancelWithSavepoint( - jobID: JobID, - savepointDirectory: String): CompletableFuture[String] = { - clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT) - } - - override def stopWithSavepoint( - jobID: JobID, - advanceToEndOfEventTime: Boolean, - savepointDirectory: String): CompletableFuture[String] = { - clusterClient.stopWithSavepoint( - jobID, - advanceToEndOfEventTime, - savepointDirectory, - SavepointFormatType.DEFAULT) - } - -} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala deleted file mode 100644 index f388c8e9f..000000000 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.streampark.flink.core - -import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient -import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService - -import java.util.Optional - -class FlinkKubernetesClient(kubeClient: FlinkKubeClient) - extends FlinkKubernetesClientTrait(kubeClient) { - - override def getService(serviceName: String): Optional[KubernetesService] = { - kubeClient.getService(serviceName) - } - -} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala index 65f715c75..81a855c7a 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala @@ -17,6 +17,8 @@ package org.apache.streampark.flink.core +import org.apache.streampark.flink.core.FlinkTableInitializer + import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Schema, Table, TableDescriptor, TableResult} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index e8f704f39..7aff52560 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -17,6 +17,8 @@ package org.apache.streampark.flink.core +import org.apache.streampark.flink.core.FlinkTableInitializer + import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Table, TableDescriptor, TableEnvironment, TableResult} import org.apache.flink.table.catalog.CatalogDescriptor diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala similarity index 97% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala index 4f6336f5a..fd1e58803 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkClusterClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.api.common.JobID import org.apache.flink.client.program.ClusterClient diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala similarity index 96% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala index f388c8e9f..59b9a517c 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/FlinkKubernetesClient.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala similarity index 96% rename from streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala rename to streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala index a09b86844..72cd605c8 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/YarnClusterDescriptorWrapper.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/deployment/YarnClusterDescriptorWrapper.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.core +package org.apache.streampark.flink.deployment import collection.JavaConversions._ import org.apache.flink.yarn.YarnClusterDescriptor
