This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new beb5c6dd6 add comment (#3305)
beb5c6dd6 is described below
commit beb5c6dd68eb7d657bac8eeb4bb0dd63faf2ba4d
Author: caicancai <[email protected]>
AuthorDate: Wed Nov 1 23:38:07 2023 +0800
add comment (#3305)
---
.../streampark/flink/client/impl/KubernetesApplicationClientV2.scala | 3 ++-
.../streampark/flink/client/impl/KubernetesSessionClientV2.scala | 2 ++
.../scala/org/apache/streampark/flink/client/impl/RemoteClient.scala | 4 ----
.../streampark/flink/client/trait/KubernetesClientV2Trait.scala | 2 ++
4 files changed, 6 insertions(+), 5 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index 4b51af2d0..a2fbaa1cb 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -81,7 +81,7 @@ object KubernetesApplicationClientV2 extends
KubernetesClientV2Trait with Logger
)
}
- // Generate FlinkDeployment CR definition, it is a pure effect function.
+ /** Generate FlinkDeployment CR definition, it is a pure effect function. */
private def genFlinkDeployDef(
submitReq: SubmitRequest,
originFlinkConfig: Configuration,
@@ -246,6 +246,7 @@ object KubernetesApplicationClientV2 extends
KubernetesClientV2Trait with Logger
))
}
+ /** Shutdown Flink Application deployment. */
@throws[Throwable]
def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
val name = shutDownRequest.clusterId
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index 8b2c3969d..68a89cac0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -125,6 +125,7 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
))
}
+ /** Deploy Flink cluster. */
@throws[Throwable]
def deploy(deployRequest: DeployRequest): DeployResponse = {
logInfo(
@@ -190,6 +191,7 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
}
}
+ /** Generate FlinkDeployment CR definition, it is a pure effect function. */
private def genFlinkDeployDef(
deployReq: DeployRequest,
originFlinkConfig: Configuration): Either[FailureMessage,
FlinkDeploymentDef] = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index 6002306f1..27bae061c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -34,10 +34,6 @@ import scala.util.{Failure, Success, Try}
/** Submit Job to Remote Cluster */
object RemoteClient extends FlinkClientTrait {
- /**
- * @param submitRequest
- * @param flinkConfig
- */
override def setConfig(submitRequest: SubmitRequest, flinkConfig:
Configuration): Unit = {}
override def doSubmit(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
index e853acdd1..f64e99ced 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
@@ -34,6 +34,7 @@ import scala.collection.mutable
import scala.jdk.CollectionConverters.asScalaBufferConverter
import scala.util.{Failure, Success, Try}
+/** Flink K8s session/application mode cancel and Savepoint */
trait KubernetesClientV2Trait extends FlinkClientTrait {
protected type FailureMessage = String
@@ -60,6 +61,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
Try(yamlMapper.readValue(yaml, classOf[Pod]))
}
+ /** Generate JobDef */
protected def genJobDef(
flinkConfObj: Configuration,
jarUriHint: Option[String]): Either[FailureMessage, JobDef] = {