This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new ba269cbbf [ISSUE-3299][Improve] Improve streampark-flink-client module
base on [3.1 Naming Style] (#3300)
ba269cbbf is described below
commit ba269cbbf0eb5655efced43dc00fd6a2aac65327
Author: caicancai <[email protected]>
AuthorDate: Mon Oct 30 16:31:50 2023 +0800
[ISSUE-3299][Improve] Improve streampark-flink-client module base on [3.1
Naming Style] (#3300)
---
.../streampark/flink/client/impl/LocalClient.scala | 4 ++--
.../streampark/flink/client/impl/RemoteClient.scala | 14 ++++++++------
.../flink/client/impl/YarnSessionClient.scala | 21 +++++++++++----------
.../flink/client/trait/YarnClientTrait.scala | 14 +++++++-------
4 files changed, 28 insertions(+), 25 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
index 6baf9d375..5fb663685 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
@@ -67,9 +67,9 @@ object LocalClient extends FlinkClientTrait {
}
override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
- RemoteClient.doTriggerSavepoint(request, flinkConfig)
+ RemoteClient.doTriggerSavepoint(savepointRequest, flinkConfig)
}
override def doCancel(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index 8e5c6a5c0..fe4fe454a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -48,12 +48,14 @@ object RemoteClient extends FlinkClientTrait {
}
- override def doCancel(request: CancelRequest, flinkConfig: Configuration):
CancelResponse = {
+ override def doCancel(
+ cancelRequest: CancelRequest,
+ flinkConfig: Configuration): CancelResponse = {
executeClientAction(
- request,
+ cancelRequest,
flinkConfig,
(jobID, clusterClient) => {
- CancelResponse(super.cancelJob(request, jobID, clusterClient))
+ CancelResponse(super.cancelJob(cancelRequest, jobID, clusterClient))
})
}
@@ -90,13 +92,13 @@ object RemoteClient extends FlinkClientTrait {
}
override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
executeClientAction(
- request,
+ savepointRequest,
flinkConfig,
(jobID, clusterClient) => {
- SavepointResponse(super.triggerSavepoint(request, jobID,
clusterClient))
+ SavepointResponse(super.triggerSavepoint(savepointRequest, jobID,
clusterClient))
})
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 89260632c..c91fc45f0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -134,11 +134,11 @@ object YarnSessionClient extends YarnClientTrait {
}
private[this] def executeClientAction[O, R <: SavepointRequestTrait](
- request: R,
+ savepointRequestTrait: R,
flinkConfig: Configuration,
actFunc: (JobID, ClusterClient[_]) => O): O = {
flinkConfig
- .safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+ .safeSet(YarnConfigOptions.APPLICATION_ID,
savepointRequestTrait.clusterId)
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
logInfo(s"""
|------------------------------------------------------------------
@@ -152,10 +152,10 @@ object YarnSessionClient extends YarnClientTrait {
val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
client =
clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
- actFunc(JobID.fromHexString(request.jobId), client)
+ actFunc(JobID.fromHexString(savepointRequestTrait.jobId), client)
} catch {
case e: Exception =>
- logError(s"${request.getClass.getSimpleName} for flink yarn session
job fail")
+ logError(s"${savepointRequestTrait.getClass.getSimpleName} for flink
yarn session job fail")
e.printStackTrace()
throw e
} finally {
@@ -176,15 +176,16 @@ object YarnSessionClient extends YarnClientTrait {
}
override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
executeClientAction(
- request,
+ savepointRequest,
flinkConfig,
(jobID, clusterClient) => {
- val actionResult = super.triggerSavepoint(request, jobID,
clusterClient)
+ val actionResult = super.triggerSavepoint(savepointRequest, jobID,
clusterClient)
SavepointResponse(actionResult)
- })
+ }
+ )
}
def deploy(deployRequest: DeployRequest): DeployResponse = {
@@ -195,8 +196,8 @@ object YarnSessionClient extends YarnClientTrait {
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
- | properties : ${deployRequest.properties.mkString(" ")}
-
|-------------------------------------------------------------------------------------------
+ | properties : ${deployRequest.properties.mkString(",")}
+
|-------------------------------------------------------------------------------------------------------
|""".stripMargin)
var clusterDescriptor: YarnClusterDescriptor = null
var client: ClusterClient[ApplicationId] = null
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 456b260c2..4e58a50f9 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -40,11 +40,11 @@ import scala.util.Try
trait YarnClientTrait extends FlinkClientTrait {
private[this] def executeClientAction[R <: SavepointRequestTrait, O](
- request: R,
+ savepointRequestTrait: R,
flinkConf: Configuration,
actionFunc: (JobID, ClusterClient[_]) => O): O = {
- flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+ flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID,
savepointRequestTrait.clusterId)
val clusterClientFactory = new YarnClusterClientFactory
val applicationId = clusterClientFactory.getClusterId(flinkConf)
if (applicationId == null) {
@@ -57,22 +57,22 @@ trait YarnClientTrait extends FlinkClientTrait {
.getClusterClient
.autoClose(
client =>
- Try(actionFunc(getJobID(request.jobId), client)).recover {
+ Try(actionFunc(getJobID(savepointRequestTrait.jobId),
client)).recover {
case e =>
throw new FlinkException(
- s"[StreamPark] Do ${request.getClass.getSimpleName} for the
job ${request.jobId} failed. " +
+ s"[StreamPark] Do
${savepointRequestTrait.getClass.getSimpleName} for the job
${savepointRequestTrait.jobId} failed. " +
s"detail: ${ExceptionUtils.stringifyException(e)}");
}.get)
}
override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ savepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {
executeClientAction(
- request,
+ savepointRequest,
flinkConf,
(jid, client) => {
- SavepointResponse(super.triggerSavepoint(request, jid, client))
+ SavepointResponse(super.triggerSavepoint(savepointRequest, jid,
client))
})
}