This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 3f3a5ebb6 [Imporve][Flink-Kubernetes-V2] Improve UsingOperator test
(#3643)
3f3a5ebb6 is described below
commit 3f3a5ebb6d4480a0dc94ae62af9843aca51df22a
Author: Cancai Cai <[email protected]>
AuthorDate: Sun Mar 31 19:05:38 2024 +0800
[Imporve][Flink-Kubernetes-V2] Improve UsingOperator test (#3643)
---
.../kubernetes/v2/example/UsingOperator.scala | 23 +++++++++++++++++++---
1 file changed, 20 insertions(+), 3 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
index e61b3c397..06dca7e9a 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
@@ -44,7 +44,6 @@ import zio.{Console, ZIO}
class UsingOperator extends AnyWordSpecLike with BeforeAndAfterAll {
"Deploy a simple Flink application job to Kubernetes" in unsafeRun {
-
val spec = FlinkDeploymentDef(
name = "simple-appjob",
namespace = "fdev",
@@ -116,13 +115,31 @@ class UsingOperator extends AnyWordSpecLike with
BeforeAndAfterAll {
} yield ()
}
- "Deploy a simple Flink application job with flinkConfiguration to Kubernetes
and subscribe job status and endpoint" in unsafeRun {
+ "Deploy a simple Flink session mode job to Kubernetes and subscribe job
status and endpoint" in unsafeRun {
+ val spec = FlinkSessionJobDef(
+ namespace = "fdev",
+ name = "simple-sessionjob",
+ deploymentName = "simple-session",
+ job = JobDef(
+ jarURI = s"$assetPath/StateMachineExample.jar",
+ parallelism = 1
+ )
+ )
+ for {
+ _ <- FlinkK8sOperator.deploySessionJob(114515, spec)
+ // subscribe job status
+ _ <-
FlinkK8sObserver.evaluatedJobSnaps.flatSubscribeValues().debugPretty.runDrain.fork
+ _ <-
FlinkK8sObserver.restSvcEndpointSnaps.flatSubscribeValues().debugPretty.runDrain.fork
+ _ <- ZIO.never
+ } yield ()
+ }
+
+ "Deploy a simple Flink application job to Kubernetes and subscribe job
status and endpoint" in unsafeRun {
val spec = FlinkDeploymentDef(
name = "simple-appjob",
namespace = "fdev",
image = "flink:1.16",
flinkVersion = FlinkVersion.V1_16,
- flinkConfiguration = Map("taskmanager.numberOfTaskSlots" -> "2"),
jobManager = JobManagerDef(cpu = 1, memory = "1024m", "2G"),
taskManager = TaskManagerDef(cpu = 1, memory = "1024m", "2G"),
job = JobDef(