This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new e5bf407fd [Improve] [Flink-Kubernetes-V2] Add test examples for 
watching restSvcEndpointSnaps (#3257)
e5bf407fd is described below

commit e5bf407fd281cc246b965660ac8073c6a8730faf
Author: caicancai <[email protected]>
AuthorDate: Sat Oct 21 14:20:18 2023 +0800

    [Improve] [Flink-Kubernetes-V2] Add test examples for watching 
restSvcEndpointSnaps (#3257)
---
 .../kubernetes/v2/example/UsingObserver.scala      | 12 +++++++++++
 .../kubernetes/v2/example/UsingOperator.scala      | 25 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
index c65f8c1fd..e93d190c7 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
@@ -78,6 +78,18 @@ class UsingObserver extends AnyWordSpecLike with 
BeforeAndAfterAll {
     } yield ()
   }
 
+  "Only subscribe Flink job enpoint changes." in unsafeRun {
+    for {
+      _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", 
"simple-appjob"))
+      _ <- FlinkK8sObserver.restSvcEndpointSnaps
+             .flatSubscribe()
+             .map { case (appId, status) => (appId, status.ipRest) }
+             .diffPrev
+             .debug
+             .runDrain
+    } yield ()
+  }
+
   "Subscribe Flink cluster metrics changes." in unsafeRun {
     for {
       // track resource
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
index c00580dad..e61b3c397 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
@@ -97,7 +97,7 @@ class UsingOperator extends AnyWordSpecLike with 
BeforeAndAfterAll {
 
   "Deploy a simple Flink application job with flinkConfiguration to 
Kubernetes" in unsafeRun {
     val spec = FlinkDeploymentDef(
-      name = "basic-appjob",
+      name = "simple-appjob",
       namespace = "fdev",
       image = "flink:1.16",
       flinkVersion = FlinkVersion.V1_16,
@@ -116,6 +116,29 @@ class UsingOperator extends AnyWordSpecLike with 
BeforeAndAfterAll {
     } yield ()
   }
 
+  "Deploy a simple Flink application job with flinkConfiguration to Kubernetes 
and subscribe job status and endpoint" in unsafeRun {
+    val spec = FlinkDeploymentDef(
+      name = "simple-appjob",
+      namespace = "fdev",
+      image = "flink:1.16",
+      flinkVersion = FlinkVersion.V1_16,
+      flinkConfiguration = Map("taskmanager.numberOfTaskSlots" -> "2"),
+      jobManager = JobManagerDef(cpu = 1, memory = "1024m", "2G"),
+      taskManager = TaskManagerDef(cpu = 1, memory = "1024m", "2G"),
+      job = JobDef(
+        jarURI = 
"local:///opt/flink/examples/streaming/StateMachineExample.jar",
+        parallelism = 1
+      )
+    )
+    for {
+      _ <- FlinkK8sOperator.deployApplicationJob(114514, spec)
+      // subscribe job status
+      _ <- 
FlinkK8sObserver.evaluatedJobSnaps.flatSubscribeValues().debugPretty.runDrain.fork
+      _ <- 
FlinkK8sObserver.restSvcEndpointSnaps.flatSubscribeValues().debugPretty.runDrain.fork
+      _ <- ZIO.never
+    } yield ()
+  }
+
   "Deploy an application mode job with additional jar resources such as 
third-party dependencies pr udf" in unsafeRun {
     val spec = FlinkDeploymentDef(
       name = "appjob-with-extra-jar",

Reply via email to