Al-assad commented on code in PR #2994:
URL: 
https://github.com/apache/incubator-streampark/pull/2994#discussion_r1315539896


##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -103,4 +110,39 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
     super.doTriggerSavepoint(request, flinkConf)
   }
+
+  private[this] def convertFlinkDeploymentDef(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration): FlinkDeploymentDef = {
+    val spec = FlinkDeploymentDef(
+      name = submitRequest.appName,
+      namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+      image = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),

Review Comment:
   The flink base image should come from buildResult.flinkImageTag 



##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -59,17 +64,19 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
buildResult.flinkImageTag)

Review Comment:
   Do not directly set the image tag into the Flink configuration in the 
operator API.



##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -59,17 +64,19 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
buildResult.flinkImageTag)
 
     // retrieve k8s cluster and submit flink job on application mode
-    var clusterDescriptor: KubernetesClusterDescriptor = null
     var clusterClient: ClusterClient[String] = null

Review Comment:
   ClusterClient is unnecessary in flink-k8s v2.



##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -84,7 +91,7 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
         logError(s"submit flink job fail in ${submitRequest.executionMode} 
mode")
         throw e
     } finally {
-      Utils.close(clusterDescriptor, clusterClient)
+      Utils.close(clusterClient)

Review Comment:
   It will cause a NullPointException



##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -103,4 +110,39 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)

Review Comment:
   Do not directly set the TARGET  into the Flink configuration in the operator 
API.



##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -103,4 +110,39 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
     super.doTriggerSavepoint(request, flinkConf)
   }
+
+  private[this] def convertFlinkDeploymentDef(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration): FlinkDeploymentDef = {
+    val spec = FlinkDeploymentDef(
+      name = submitRequest.appName,
+      namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+      image = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
+      flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
+        .map(_.replace(".", "_"))
+        .map("V" + _)
+        .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
+        case Some(version) => version
+        case None => throw new IllegalArgumentException("Flink version not 
found")
+      },
+      jobManager = JobManagerDef(
+        cpu = 1,
+        memory = 
flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+      taskManager = TaskManagerDef(
+        cpu = 1,
+        memory = 
flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+      job = Some(
+        JobDef(
+          jarURI =
+            
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
+          parallelism = 1
+        )),
+      extJarPaths = submitRequest.userJarFile match {
+        case null => Array.empty[String]
+        case file => Array(file.getAbsolutePath)
+      }
+    )
+    spec

Review Comment:
   Lack of settings for FlinkDeploymentDef.flinkConfiguration, these additional 
flink user parameters may comes from SubmitRequest.extraParameter.



##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala:
##########
@@ -103,4 +110,39 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
     super.doTriggerSavepoint(request, flinkConf)
   }
+
+  private[this] def convertFlinkDeploymentDef(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration): FlinkDeploymentDef = {
+    val spec = FlinkDeploymentDef(
+      name = submitRequest.appName,
+      namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+      image = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
+      flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
+        .map(_.replace(".", "_"))
+        .map("V" + _)
+        .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
+        case Some(version) => version
+        case None => throw new IllegalArgumentException("Flink version not 
found")
+      },
+      jobManager = JobManagerDef(
+        cpu = 1,
+        memory = 
flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+      taskManager = TaskManagerDef(
+        cpu = 1,
+        memory = 
flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+      job = Some(
+        JobDef(
+          jarURI =
+            
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,

Review Comment:
   Missing settings for entryclass and args for JobDef, especially in 
streampark sql mode. What's more, Don't ignore the savepoint and restoreMode 
parameter in SubmitRequest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to