This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new e3dea563c [Improve] SubmitRequest bean improvement
e3dea563c is described below

commit e3dea563c00f66ebffd2744f614865a7d4a65aa2
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 13 18:08:07 2024 +0800

    [Improve] SubmitRequest bean improvement
---
 .../core/service/impl/ApplicationServiceImpl.java  | 23 +++++++++---------
 .../flink/client/bean/SubmitRequest.scala          | 27 +++++++---------------
 .../flink/client/trait/FlinkClientTrait.scala      |  6 ++---
 3 files changed, 22 insertions(+), 34 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 869e79afd..c27e5772e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -82,7 +82,6 @@ import 
org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
 import org.apache.streampark.flink.client.bean.CancelResponse;
-import org.apache.streampark.flink.client.bean.KubernetesSubmitParam;
 import org.apache.streampark.flink.client.bean.SubmitRequest;
 import org.apache.streampark.flink.client.bean.SubmitResponse;
 import org.apache.streampark.flink.core.conf.ParameterCli;
@@ -1557,14 +1556,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
     }
 
-    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
-
-    KubernetesSubmitParam kubernetesSubmitParam =
-        new KubernetesSubmitParam(
-            application.getClusterId(),
-            application.getK8sNamespace(),
-            application.getK8sRestExposedTypeEnum());
-
     AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
 
     Utils.notNull(buildPipeline);
@@ -1579,7 +1570,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         variableService.replaceVariable(application.getTeamId(), 
application.getArgs());
 
     SubmitRequest submitRequest =
-        new SubmitRequest(
+        SubmitRequest.apply(
             flinkEnv.getFlinkVersion(),
             ExecutionMode.of(application.getExecutionMode()),
             getProperties(application),
@@ -1593,9 +1584,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             getSavePointed(appParam),
             applicationArgs,
             buildResult,
-            kubernetesSubmitParam,
-            extraParameter);
+            extraParameter,
+            application.getClusterId(),
+            application.getK8sNamespace(),
+            application.getK8sRestExposedTypeEnum());
 
+    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
     CompletableFuture<SubmitResponse> future =
         CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), 
executorService);
 
@@ -1608,6 +1602,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
           // 2) exception
           if (throwable != null) {
+            log.info(" start exception : " + throwable);
             String exception = Utils.stringifyException(throwable);
             applicationLog.setException(exception);
             applicationLog.setSuccess(false);
@@ -1655,6 +1650,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
           // if start completed, will be added task to tracking queue
           if (isKubernetesApp(application)) {
+            log.info(
+                "start job {} on {} success, doWatching...",
+                application.getJobName(),
+                application.getExecutionModeEnum().getName());
             application.setRelease(ReleaseState.DONE.get());
             flinkK8sWatcher.doWatching(trackId);
             if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 96afd4b08..dd24a6c14 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -36,19 +36,6 @@ import java.util.{Map => JavaMap}
 import scala.collection.JavaConversions._
 import scala.util.Try
 
-/**
- * @param clusterId
- *   flink cluster id in k8s cluster.
- * @param kubernetesNamespace
- *   k8s namespace.
- * @param flinkRestExposedType
- *   flink rest-service exposed type on k8s cluster.
- */
-case class KubernetesSubmitParam(
-    clusterId: String,
-    kubernetesNamespace: String,
-    @Nullable flinkRestExposedType: FlinkK8sRestExposedType)
-
 case class SubmitRequest(
     flinkVersion: FlinkVersion,
     executionMode: ExecutionMode,
@@ -63,8 +50,10 @@ case class SubmitRequest(
     savePoint: String,
     args: String,
     @Nullable buildResult: BuildResult,
-    @Nullable k8sSubmitParam: KubernetesSubmitParam,
-    @Nullable extraParameter: JavaMap[String, Any]) {
+    @Nullable extraParameter: JavaMap[String, Any],
+    @Nullable clusterId: String,
+    @Nullable kubernetesNamespace: String,
+    @Nullable flinkRestExposedType: FlinkK8sRestExposedType) {
 
   private lazy val appProperties: Map[String, String] = 
getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
 
@@ -178,14 +167,14 @@ case class SubmitRequest(
         if (buildResult == null) {
           throw new Exception(
             s"[flink-submit] current job: ${this.effectiveAppName} was not yet 
built, buildResult is empty" +
-              s",clusterId=${k8sSubmitParam.clusterId}," +
-              s",namespace=${k8sSubmitParam.kubernetesNamespace}")
+              s",clusterId=$clusterId," +
+              s",namespace=$kubernetesNamespace")
         }
         if (!buildResult.pass) {
           throw new Exception(
             s"[flink-submit] current job ${this.effectiveAppName} build 
failed, clusterId" +
-              s",clusterId=${k8sSubmitParam.clusterId}," +
-              s",namespace=${k8sSubmitParam.kubernetesNamespace}")
+              s",clusterId=$clusterId," +
+              s",namespace=$kubernetesNamespace")
         }
       case _ =>
         if (this.buildResult == null) {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 5f536a3e9..f1b171310 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -66,9 +66,9 @@ trait FlinkClientTrait extends Logger {
          |    appName          : ${submitRequest.appName}
          |    devMode          : ${submitRequest.developmentMode.name()}
          |    execMode         : ${submitRequest.executionMode.name()}
-         |    k8sNamespace     : 
${submitRequest.k8sSubmitParam.kubernetesNamespace}
-         |    flinkExposedType : 
${submitRequest.k8sSubmitParam.flinkRestExposedType}
-         |    clusterId        : ${submitRequest.k8sSubmitParam.clusterId}
+         |    k8sNamespace     : ${submitRequest.kubernetesNamespace}
+         |    flinkExposedType : ${submitRequest.flinkRestExposedType}
+         |    clusterId        : ${submitRequest.clusterId}
          |    applicationType  : ${submitRequest.applicationType.getName}
          |    savePoint        : ${submitRequest.savePoint}
          |    properties       : ${submitRequest.properties.mkString(" ")}

Reply via email to