This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3f3a5ebb6 [Imporve][Flink-Kubernetes-V2] Improve UsingOperator test 
(#3643)
3f3a5ebb6 is described below

commit 3f3a5ebb6d4480a0dc94ae62af9843aca51df22a
Author: Cancai Cai <[email protected]>
AuthorDate: Sun Mar 31 19:05:38 2024 +0800

    [Imporve][Flink-Kubernetes-V2] Improve UsingOperator test (#3643)
---
 .../kubernetes/v2/example/UsingOperator.scala      | 23 +++++++++++++++++++---
 1 file changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
index e61b3c397..06dca7e9a 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
@@ -44,7 +44,6 @@ import zio.{Console, ZIO}
 class UsingOperator extends AnyWordSpecLike with BeforeAndAfterAll {
 
   "Deploy a simple Flink application job to Kubernetes" in unsafeRun {
-
     val spec = FlinkDeploymentDef(
       name = "simple-appjob",
       namespace = "fdev",
@@ -116,13 +115,31 @@ class UsingOperator extends AnyWordSpecLike with 
BeforeAndAfterAll {
     } yield ()
   }
 
-  "Deploy a simple Flink application job with flinkConfiguration to Kubernetes 
and subscribe job status and endpoint" in unsafeRun {
+  "Deploy a simple Flink session mode job to Kubernetes and subscribe job 
status and endpoint" in unsafeRun {
+    val spec = FlinkSessionJobDef(
+      namespace = "fdev",
+      name = "simple-sessionjob",
+      deploymentName = "simple-session",
+      job = JobDef(
+        jarURI = s"$assetPath/StateMachineExample.jar",
+        parallelism = 1
+      )
+    )
+    for {
+      _ <- FlinkK8sOperator.deploySessionJob(114515, spec)
+      // subscribe job status
+      _ <- 
FlinkK8sObserver.evaluatedJobSnaps.flatSubscribeValues().debugPretty.runDrain.fork
+      _ <- 
FlinkK8sObserver.restSvcEndpointSnaps.flatSubscribeValues().debugPretty.runDrain.fork
+      _ <- ZIO.never
+    } yield ()
+  }
+
+  "Deploy a simple Flink application job to Kubernetes and subscribe job 
status and endpoint" in unsafeRun {
     val spec = FlinkDeploymentDef(
       name = "simple-appjob",
       namespace = "fdev",
       image = "flink:1.16",
       flinkVersion = FlinkVersion.V1_16,
-      flinkConfiguration = Map("taskmanager.numberOfTaskSlots" -> "2"),
       jobManager = JobManagerDef(cpu = 1, memory = "1024m", "2G"),
       taskManager = TaskManagerDef(cpu = 1, memory = "1024m", "2G"),
       job = JobDef(

Reply via email to