This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new e3dea563c [Improve] SubmitRequest bean improvement
e3dea563c is described below
commit e3dea563c00f66ebffd2744f614865a7d4a65aa2
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 13 18:08:07 2024 +0800
[Improve] SubmitRequest bean improvement
---
.../core/service/impl/ApplicationServiceImpl.java | 23 +++++++++---------
.../flink/client/bean/SubmitRequest.scala | 27 +++++++---------------
.../flink/client/trait/FlinkClientTrait.scala | 6 ++---
3 files changed, 22 insertions(+), 34 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 869e79afd..c27e5772e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -82,7 +82,6 @@ import
org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
import org.apache.streampark.flink.client.bean.CancelResponse;
-import org.apache.streampark.flink.client.bean.KubernetesSubmitParam;
import org.apache.streampark.flink.client.bean.SubmitRequest;
import org.apache.streampark.flink.client.bean.SubmitResponse;
import org.apache.streampark.flink.core.conf.ParameterCli;
@@ -1557,14 +1556,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}
- TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
-
- KubernetesSubmitParam kubernetesSubmitParam =
- new KubernetesSubmitParam(
- application.getClusterId(),
- application.getK8sNamespace(),
- application.getK8sRestExposedTypeEnum());
-
AppBuildPipeline buildPipeline =
appBuildPipeService.getById(application.getId());
Utils.notNull(buildPipeline);
@@ -1579,7 +1570,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
variableService.replaceVariable(application.getTeamId(),
application.getArgs());
SubmitRequest submitRequest =
- new SubmitRequest(
+ SubmitRequest.apply(
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
getProperties(application),
@@ -1593,9 +1584,12 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
getSavePointed(appParam),
applicationArgs,
buildResult,
- kubernetesSubmitParam,
- extraParameter);
+ extraParameter,
+ application.getClusterId(),
+ application.getK8sNamespace(),
+ application.getK8sRestExposedTypeEnum());
+ TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest),
executorService);
@@ -1608,6 +1602,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// 2) exception
if (throwable != null) {
+ log.info(" start exception : " + throwable);
String exception = Utils.stringifyException(throwable);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
@@ -1655,6 +1650,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// if start completed, will be added task to tracking queue
if (isKubernetesApp(application)) {
+ log.info(
+ "start job {} on {} success, doWatching...",
+ application.getJobName(),
+ application.getExecutionModeEnum().getName());
application.setRelease(ReleaseState.DONE.get());
flinkK8sWatcher.doWatching(trackId);
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 96afd4b08..dd24a6c14 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -36,19 +36,6 @@ import java.util.{Map => JavaMap}
import scala.collection.JavaConversions._
import scala.util.Try
-/**
- * @param clusterId
- * flink cluster id in k8s cluster.
- * @param kubernetesNamespace
- * k8s namespace.
- * @param flinkRestExposedType
- * flink rest-service exposed type on k8s cluster.
- */
-case class KubernetesSubmitParam(
- clusterId: String,
- kubernetesNamespace: String,
- @Nullable flinkRestExposedType: FlinkK8sRestExposedType)
-
case class SubmitRequest(
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
@@ -63,8 +50,10 @@ case class SubmitRequest(
savePoint: String,
args: String,
@Nullable buildResult: BuildResult,
- @Nullable k8sSubmitParam: KubernetesSubmitParam,
- @Nullable extraParameter: JavaMap[String, Any]) {
+ @Nullable extraParameter: JavaMap[String, Any],
+ @Nullable clusterId: String,
+ @Nullable kubernetesNamespace: String,
+ @Nullable flinkRestExposedType: FlinkK8sRestExposedType) {
private lazy val appProperties: Map[String, String] =
getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
@@ -178,14 +167,14 @@ case class SubmitRequest(
if (buildResult == null) {
throw new Exception(
s"[flink-submit] current job: ${this.effectiveAppName} was not yet
built, buildResult is empty" +
- s",clusterId=${k8sSubmitParam.clusterId}," +
- s",namespace=${k8sSubmitParam.kubernetesNamespace}")
+ s",clusterId=$clusterId," +
+ s",namespace=$kubernetesNamespace")
}
if (!buildResult.pass) {
throw new Exception(
s"[flink-submit] current job ${this.effectiveAppName} build
failed, clusterId" +
- s",clusterId=${k8sSubmitParam.clusterId}," +
- s",namespace=${k8sSubmitParam.kubernetesNamespace}")
+ s",clusterId=$clusterId," +
+ s",namespace=$kubernetesNamespace")
}
case _ =>
if (this.buildResult == null) {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 5f536a3e9..f1b171310 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -66,9 +66,9 @@ trait FlinkClientTrait extends Logger {
| appName : ${submitRequest.appName}
| devMode : ${submitRequest.developmentMode.name()}
| execMode : ${submitRequest.executionMode.name()}
- | k8sNamespace :
${submitRequest.k8sSubmitParam.kubernetesNamespace}
- | flinkExposedType :
${submitRequest.k8sSubmitParam.flinkRestExposedType}
- | clusterId : ${submitRequest.k8sSubmitParam.clusterId}
+ | k8sNamespace : ${submitRequest.kubernetesNamespace}
+ | flinkExposedType : ${submitRequest.flinkRestExposedType}
+ | clusterId : ${submitRequest.clusterId}
| applicationType : ${submitRequest.applicationType.getName}
| savePoint : ${submitRequest.savePoint}
| properties : ${submitRequest.properties.mkString(" ")}