Al-assad commented on code in PR #2994:
URL:
https://github.com/apache/incubator-streampark/pull/2994#discussion_r1315539896
##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -103,4 +110,39 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
super.doTriggerSavepoint(request, flinkConf)
}
+
+ private[this] def convertFlinkDeploymentDef(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): FlinkDeploymentDef = {
+ val spec = FlinkDeploymentDef(
+ name = submitRequest.appName,
+ namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+ image = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
Review Comment:
The flink base image should come from buildResult.flinkImageTag
##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -59,17 +64,19 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
buildResult.flinkImageTag)
Review Comment:
Do not directly set the image tag into the Flink configuration in the
operator API.
##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -59,17 +64,19 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
buildResult.flinkImageTag)
// retrieve k8s cluster and submit flink job on application mode
- var clusterDescriptor: KubernetesClusterDescriptor = null
var clusterClient: ClusterClient[String] = null
Review Comment:
ClusterClient is unnecessary in flink-k8s v2.
##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -84,7 +91,7 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
logError(s"submit flink job fail in ${submitRequest.executionMode}
mode")
throw e
} finally {
- Utils.close(clusterDescriptor, clusterClient)
+ Utils.close(clusterClient)
Review Comment:
It will cause a NullPointException
##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -103,4 +110,39 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
Review Comment:
Do not directly set the TARGET into the Flink configuration in the operator
API.
##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -103,4 +110,39 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
super.doTriggerSavepoint(request, flinkConf)
}
+
+ private[this] def convertFlinkDeploymentDef(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): FlinkDeploymentDef = {
+ val spec = FlinkDeploymentDef(
+ name = submitRequest.appName,
+ namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+ image = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
+ flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
+ .map(_.replace(".", "_"))
+ .map("V" + _)
+ .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
+ case Some(version) => version
+ case None => throw new IllegalArgumentException("Flink version not
found")
+ },
+ jobManager = JobManagerDef(
+ cpu = 1,
+ memory =
flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+ taskManager = TaskManagerDef(
+ cpu = 1,
+ memory =
flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+ job = Some(
+ JobDef(
+ jarURI =
+
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
+ parallelism = 1
+ )),
+ extJarPaths = submitRequest.userJarFile match {
+ case null => Array.empty[String]
+ case file => Array(file.getAbsolutePath)
+ }
+ )
+ spec
Review Comment:
Lack of settings for FlinkDeploymentDef.flinkConfiguration, these additional
flink user parameters may comes from SubmitRequest.extraParameter.
##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -103,4 +110,39 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
super.doTriggerSavepoint(request, flinkConf)
}
+
+ private[this] def convertFlinkDeploymentDef(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): FlinkDeploymentDef = {
+ val spec = FlinkDeploymentDef(
+ name = submitRequest.appName,
+ namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+ image = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
+ flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
+ .map(_.replace(".", "_"))
+ .map("V" + _)
+ .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
+ case Some(version) => version
+ case None => throw new IllegalArgumentException("Flink version not
found")
+ },
+ jobManager = JobManagerDef(
+ cpu = 1,
+ memory =
flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+ taskManager = TaskManagerDef(
+ cpu = 1,
+ memory =
flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+ job = Some(
+ JobDef(
+ jarURI =
+
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
Review Comment:
Missing settings for entryclass and args for JobDef, especially in
streampark sql mode. What's more, Don't ignore the savepoint and restoreMode
parameter in SubmitRequest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]