This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 436b9f320 Naming standardization (#3177)
436b9f320 is described below
commit 436b9f320184dbc85320d6a45b26bda84ddce855
Author: caicancai <[email protected]>
AuthorDate: Sun Sep 24 16:02:31 2023 +0800
Naming standardization (#3177)
---
.../impl/KubernetesNativeApplicationClient.scala | 4 +--
.../impl/KubernetesNativeSessionClient.scala | 4 +--
.../client/trait/KubernetesClientV2Trait.scala | 32 ++++++++++++----------
3 files changed, 21 insertions(+), 19 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index ea4905d9c..209a21045 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -99,9 +99,9 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
}
override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ triggerSavepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {
flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
- super.doTriggerSavepoint(request, flinkConf)
+ super.doTriggerSavepoint(triggerSavepointRequest, flinkConf)
}
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index c4c9c6e41..a63e91298 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -250,9 +250,9 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
}
override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ triggerSavepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
flinkConfig.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
- super.doTriggerSavepoint(request, flinkConfig)
+ super.doTriggerSavepoint(triggerSavepointRequest, flinkConfig)
}
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
index 75b979246..eebcaec2e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
@@ -99,23 +99,23 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
}
@throws[Exception]
- override def doCancel(request: CancelRequest, flinkConf: Configuration):
CancelResponse = {
+ override def doCancel(cancelRequest: CancelRequest, flinkConf:
Configuration): CancelResponse = {
val effect =
- if (!request.withSavepoint) {
+ if (!cancelRequest.withSavepoint) {
// cancel job
FlinkK8sOperator
- .cancelJob(request.id)
+ .cancelJob(cancelRequest.id)
.as(CancelResponse(null))
} else {
// stop job with savepoint
val savepointDef = JobSavepointDef(
- drain = Option(request.withDrain).getOrElse(false),
- savepointPath = Option(request.savepointPath),
- formatType = Option(request.nativeFormat)
+ drain = Option(cancelRequest.withDrain).getOrElse(false),
+ savepointPath = Option(cancelRequest.savepointPath),
+ formatType = Option(cancelRequest.nativeFormat)
.map(if (_) JobSavepointDef.NATIVE_FORMAT else
JobSavepointDef.CANONICAL_FORMAT)
)
FlinkK8sOperator
- .stopJob(request.id, savepointDef)
+ .stopJob(cancelRequest.id, savepointDef)
.flatMap {
result =>
if (result.isFailed)
ZIO.fail(StopJobFail(result.failureCause.get))
@@ -123,7 +123,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
}
}
- def richMsg: String => String = s"[flink-cancel][appId=${request.id}] " + _
+ def richMsg: String => String =
s"[flink-cancel][appId=${cancelRequest.id}] " + _
effect.runIOAsTry match {
case Success(rsp) =>
@@ -131,7 +131,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
rsp
case Failure(err) =>
logError(
- richMsg(s"Cancel flink job fail in
${request.executionMode.getName}_V2 mode!"),
+ richMsg(s"Cancel flink job fail in
${cancelRequest.executionMode.getName}_V2 mode!"),
err)
throw err
}
@@ -139,19 +139,20 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
@throws[Exception]
override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
+ triggerSavepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {
val savepointDef = JobSavepointDef(
- savepointPath = Option(request.savepointPath),
- formatType = Option(request.nativeFormat)
+ savepointPath = Option(triggerSavepointRequest.savepointPath),
+ formatType = Option(triggerSavepointRequest.nativeFormat)
.map(if (_) JobSavepointDef.NATIVE_FORMAT else
JobSavepointDef.CANONICAL_FORMAT)
)
- def richMsg: String => String =
s"[flink-trigger-savepoint][appId=${request.id}] " + _
+ def richMsg: String => String =
+ s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _
FlinkK8sOperator
- .triggerJobSavepoint(request.id, savepointDef)
+ .triggerJobSavepoint(triggerSavepointRequest.id, savepointDef)
.flatMap {
result =>
if (result.isFailed)
ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
@@ -163,7 +164,8 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
rsp
case Failure(err) =>
logError(
- richMsg(s"Cancel flink job fail in
${request.executionMode.getName}_V2 mode!"),
+ richMsg(
+ s"Cancel flink job fail in
${triggerSavepointRequest.executionMode.getName}_V2 mode!"),
err)
throw err
}