This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new fa3de49b8 [Improve][Flink-Kubnernetes-V2] Improve UsingObserver test 
(#3654)
fa3de49b8 is described below

commit fa3de49b821f287ca2ec2319324e0faa3dfffe83
Author: Cancai Cai <[email protected]>
AuthorDate: Sat Apr 6 16:02:40 2024 +0800

    [Improve][Flink-Kubnernetes-V2] Improve UsingObserver test (#3654)
---
 .../kubernetes/v2/example/UsingObserver.scala      | 41 +++++++++++++++++++---
 1 file changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
index e93d190c7..40080e6d1 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
@@ -29,11 +29,11 @@ import zio.{durationInt, Console, ZIO}
 
 class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {
 
-  "Track and get flink job snapshot." in unsafeRun {
+  "Track and get flink application job snapshot." in unsafeRun {
     for {
       // track resource
       _       <- ZIO.unit
-      trackId  = TrackKey.appJob(233, "fdev", "simple-appjob")
+      trackId  = TrackKey.appJob(114514, "fdev", "simple-appjob")
       _       <- FlinkK8sObserver.track(trackId)
       // get job snapshot
       _       <- ZIO.sleep(3.seconds)
@@ -42,6 +42,35 @@ class UsingObserver extends AnyWordSpecLike with 
BeforeAndAfterAll {
     } yield ()
   }
 
+  "Track and get flink application endpoint snapshot." in unsafeRun {
+    for {
+      // track resource
+      _       <- ZIO.unit
+      trackId  = TrackKey.appJob(233, "fdev", "simple-appjob")
+      _       <- FlinkK8sObserver.track(trackId)
+      // get job rest endpoint
+      _       <- ZIO.sleep(3.seconds)
+      jobSnap <- FlinkK8sObserver.restSvcEndpointSnaps.get(trackId.namespace, 
trackId.name)
+      _       <- Console.printLine(jobSnap.prettyStr)
+    } yield ()
+  }
+
+  "Track and get flink seesion job snapshot and endpoint." in unsafeRun {
+    for {
+      // track resource
+      _        <- ZIO.unit
+      trackId   = TrackKey.sessionJob(233, "fdev", "simple-sessionjob", 
"simple-session")
+      _        <- FlinkK8sObserver.track(trackId)
+      // get job rest endpoint
+      _        <- ZIO.sleep(3.seconds)
+      endpoint <- FlinkK8sObserver.restSvcEndpointSnaps.get(trackId.namespace, 
trackId.name)
+      _        <- Console.printLine(endpoint.prettyStr)
+      // get job snapshot
+      jobSnap  <- FlinkK8sObserver.evaluatedJobSnaps.getValue(trackId.id)
+      _        <- Console.printLine(jobSnap.prettyStr)
+    } yield ()
+  }
+
   "Track and get flink cluster metrics" in unsafeRun {
     for {
       // track resource
@@ -66,9 +95,11 @@ class UsingObserver extends AnyWordSpecLike with 
BeforeAndAfterAll {
     } yield ()
   }
 
-  "Only subscribe Flink job state changes." in unsafeRun {
+  "Only subscribe Flink application job state changes." in unsafeRun {
     for {
+      // track resource
       _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", 
"simple-appjob"))
+      // subscribe job status changes
       _ <- FlinkK8sObserver.evaluatedJobSnaps
              .flatSubscribe()
              .map { case (appId, status) => (appId, status.evalState) }
@@ -78,9 +109,11 @@ class UsingObserver extends AnyWordSpecLike with 
BeforeAndAfterAll {
     } yield ()
   }
 
-  "Only subscribe Flink job enpoint changes." in unsafeRun {
+  "Only subscribe Flink application job enpoint changes." in unsafeRun {
     for {
+      // track resource
       _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", 
"simple-appjob"))
+      // subscribe job status changes
       _ <- FlinkK8sObserver.restSvcEndpointSnaps
              .flatSubscribe()
              .map { case (appId, status) => (appId, status.ipRest) }

Reply via email to