This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b7eacf8ee [Improve] Improve streampark-flink-client-core module base
on Naming Style and add log (#3231)
b7eacf8ee is described below
commit b7eacf8ee9332e18d9558599d411aeb3c34130e0
Author: caicancai <[email protected]>
AuthorDate: Tue Oct 10 20:05:58 2023 +0800
[Improve] Improve streampark-flink-client-core module base on Naming Style
and add log (#3231)
---
.../flink/client/FlinkClientEndpoint.scala | 56 ++++++++++++----------
.../client/impl/KubernetesSessionClientV2.scala | 16 ++++++-
.../flink/client/trait/FlinkClientTrait.scala | 2 +-
.../client/trait/KubernetesClientV2Trait.scala | 12 ++---
.../client/trait/KubernetesNativeClientTrait.scala | 9 ++--
5 files changed, 58 insertions(+), 37 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
index 752bdf045..609d5cf66 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
@@ -42,59 +42,65 @@ object FlinkClientEndpoint {
}
)
- def submit(request: SubmitRequest): SubmitResponse = {
- clients.get(request.executionMode) match {
- case Some(client) => client.submit(request)
+ def submit(submitRequest: SubmitRequest): SubmitResponse = {
+ clients.get(submitRequest.executionMode) match {
+ case Some(client) => client.submit(submitRequest)
case _ =>
- throw new UnsupportedOperationException(s"Unsupported
${request.executionMode} submit ")
+ throw new UnsupportedOperationException(
+ s"Unsupported ${submitRequest.executionMode} submit ")
}
}
- def cancel(request: CancelRequest): CancelResponse = {
- clients.get(request.executionMode) match {
- case Some(client) => client.cancel(request)
+ def cancel(cancelRequest: CancelRequest): CancelResponse = {
+ clients.get(cancelRequest.executionMode) match {
+ case Some(client) => client.cancel(cancelRequest)
case _ =>
- throw new UnsupportedOperationException(s"Unsupported
${request.executionMode} cancel ")
+ throw new UnsupportedOperationException(
+ s"Unsupported ${cancelRequest.executionMode} cancel ")
}
}
- def triggerSavepoint(request: TriggerSavepointRequest): SavepointResponse = {
- clients.get(request.executionMode) match {
- case Some(client) => client.triggerSavepoint(request)
+ def triggerSavepoint(savepointRequest: TriggerSavepointRequest):
SavepointResponse = {
+ clients.get(savepointRequest.executionMode) match {
+ case Some(client) => client.triggerSavepoint(savepointRequest)
case _ =>
throw new UnsupportedOperationException(
- s"Unsupported ${request.executionMode} triggerSavepoint ")
+ s"Unsupported ${savepointRequest.executionMode} triggerSavepoint ")
}
}
- def deploy(request: DeployRequest): DeployResponse = {
- request.executionMode match {
- case YARN_SESSION => YarnSessionClient.deploy(request)
- case KUBERNETES_NATIVE_SESSION =>
KubernetesNativeSessionClient.deploy(request)
+ def deploy(deployRequest: DeployRequest): DeployResponse = {
+ deployRequest.executionMode match {
+ case YARN_SESSION => YarnSessionClient.deploy(deployRequest)
+ case KUBERNETES_NATIVE_SESSION =>
+ K8sFlinkConfig.isV2Enabled match {
+ case true => KubernetesSessionClientV2.deploy(deployRequest)
+ case _ => KubernetesNativeSessionClient.deploy(deployRequest)
+ }
case _ =>
throw new UnsupportedOperationException(
- s"Unsupported ${request.executionMode} deploy cluster ")
+ s"Unsupported ${deployRequest.executionMode} deploy cluster ")
}
}
- def shutdown(request: ShutDownRequest): ShutDownResponse = {
- request.executionMode match {
- case YARN_SESSION => YarnSessionClient.shutdown(request)
+ def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+ shutDownRequest.executionMode match {
+ case YARN_SESSION => YarnSessionClient.shutdown(shutDownRequest)
case KUBERNETES_NATIVE_SESSION =>
K8sFlinkConfig.isV2Enabled match {
- case true => KubernetesSessionClientV2.shutdown(request)
- case _ => KubernetesNativeSessionClient.shutdown(request)
+ case true => KubernetesSessionClientV2.shutdown(shutDownRequest)
+ case _ => KubernetesNativeSessionClient.shutdown(shutDownRequest)
}
case KUBERNETES_NATIVE_APPLICATION =>
K8sFlinkConfig.isV2Enabled match {
- case true => KubernetesApplicationClientV2.shutdown(request)
+ case true => KubernetesApplicationClientV2.shutdown(shutDownRequest)
case _ =>
throw new UnsupportedOperationException(
- s"Unsupported ${request.executionMode} shutdown application ")
+ s"Unsupported ${shutDownRequest.executionMode} shutdown
application ")
}
case _ =>
throw new UnsupportedOperationException(
- s"Unsupported ${request.executionMode} shutdown cluster ")
+ s"Unsupported ${shutDownRequest.executionMode} shutdown cluster ")
}
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index c40fba899..de4dbe6cf 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -35,6 +35,7 @@ import
org.apache.flink.runtime.jobgraph.SavepointConfigOptions
import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
import zio.ZIO
+import scala.collection.convert.ImplicitConversions.`map AsScala`
import scala.collection.mutable
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
import scala.util.{Failure, Success, Try}
@@ -125,7 +126,20 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
@throws[Throwable]
def deploy(deployRequest: DeployRequest): DeployResponse = {
-
+ logInfo(
+ s"""
+ |--------------------------------------- kubernetes session start
---------------------------------------
+ | userFlinkHome : ${deployRequest.flinkVersion.flinkHome}
+ | flinkVersion : ${deployRequest.flinkVersion.version}
+ | execMode : ${deployRequest.executionMode.name()}
+ | clusterId : ${deployRequest.clusterId}
+ | namespace :
${deployRequest.k8sDeployParam.kubernetesNamespace}
+ | exposedType :
${deployRequest.k8sDeployParam.flinkRestExposedType}
+ | serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
+ | flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
+ | properties : ${deployRequest.properties.mkString(" ")}
+
|-------------------------------------------------------------------------------------------
+ |""".stripMargin)
val richMsg: String => String =
s"[flink-submit][appId=${deployRequest.id}] " + _
val flinkConfig =
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 092488c47..7fb6aa148 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -209,7 +209,7 @@ trait FlinkClientTrait extends Logger {
@throws[Exception]
def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ savepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse
@throws[Exception]
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
index 17949cb37..e853acdd1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
@@ -139,20 +139,20 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
@throws[Exception]
override def doTriggerSavepoint(
- triggerSavepointRequest: TriggerSavepointRequest,
+ savepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {
val savepointDef = JobSavepointDef(
- savepointPath = Option(triggerSavepointRequest.savepointPath),
- formatType = Option(triggerSavepointRequest.nativeFormat)
+ savepointPath = Option(savepointRequest.savepointPath),
+ formatType = Option(savepointRequest.nativeFormat)
.map(if (_) JobSavepointDef.NATIVE_FORMAT else
JobSavepointDef.CANONICAL_FORMAT)
)
def richMsg: String => String =
- s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _
+ s"[flink-trigger-savepoint][appId=${savepointRequest.id}] " + _
FlinkK8sOperator
- .triggerJobSavepoint(triggerSavepointRequest.id, savepointDef)
+ .triggerJobSavepoint(savepointRequest.id, savepointDef)
.flatMap {
result =>
if (result.isFailed)
ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
@@ -165,7 +165,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
case Failure(err) =>
logError(
richMsg(
- s"Trigger flink job savepoint failed in
${triggerSavepointRequest.executionMode.getName}_V2 mode!"),
+ s"Trigger flink job savepoint failed in
${savepointRequest.executionMode.getName}_V2 mode!"),
err)
throw err
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 712fd4bc8..49958a20c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -129,15 +129,16 @@ trait KubernetesNativeClientTrait extends
FlinkClientTrait {
@throws[Exception]
override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
executeClientAction(
- request,
+ savepointRequest,
flinkConfig,
(jobId, clusterClient) => {
- val actionResult = super.triggerSavepoint(request, jobId,
clusterClient)
+ val actionResult = super.triggerSavepoint(savepointRequest, jobId,
clusterClient)
SavepointResponse(actionResult)
- })
+ }
+ )
}
// noinspection DuplicatedCode