This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new e5bf407fd [Improve] [Flink-Kubernetes-V2] Add test examples for
watching restSvcEndpointSnaps (#3257)
e5bf407fd is described below
commit e5bf407fd281cc246b965660ac8073c6a8730faf
Author: caicancai <[email protected]>
AuthorDate: Sat Oct 21 14:20:18 2023 +0800
[Improve] [Flink-Kubernetes-V2] Add test examples for watching
restSvcEndpointSnaps (#3257)
---
.../kubernetes/v2/example/UsingObserver.scala | 12 +++++++++++
.../kubernetes/v2/example/UsingOperator.scala | 25 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
index c65f8c1fd..e93d190c7 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
@@ -78,6 +78,18 @@ class UsingObserver extends AnyWordSpecLike with
BeforeAndAfterAll {
} yield ()
}
+ "Only subscribe Flink job enpoint changes." in unsafeRun {
+ for {
+ _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev",
"simple-appjob"))
+ _ <- FlinkK8sObserver.restSvcEndpointSnaps
+ .flatSubscribe()
+ .map { case (appId, status) => (appId, status.ipRest) }
+ .diffPrev
+ .debug
+ .runDrain
+ } yield ()
+ }
+
"Subscribe Flink cluster metrics changes." in unsafeRun {
for {
// track resource
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
index c00580dad..e61b3c397 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
@@ -97,7 +97,7 @@ class UsingOperator extends AnyWordSpecLike with
BeforeAndAfterAll {
"Deploy a simple Flink application job with flinkConfiguration to
Kubernetes" in unsafeRun {
val spec = FlinkDeploymentDef(
- name = "basic-appjob",
+ name = "simple-appjob",
namespace = "fdev",
image = "flink:1.16",
flinkVersion = FlinkVersion.V1_16,
@@ -116,6 +116,29 @@ class UsingOperator extends AnyWordSpecLike with
BeforeAndAfterAll {
} yield ()
}
+ "Deploy a simple Flink application job with flinkConfiguration to Kubernetes
and subscribe job status and endpoint" in unsafeRun {
+ val spec = FlinkDeploymentDef(
+ name = "simple-appjob",
+ namespace = "fdev",
+ image = "flink:1.16",
+ flinkVersion = FlinkVersion.V1_16,
+ flinkConfiguration = Map("taskmanager.numberOfTaskSlots" -> "2"),
+ jobManager = JobManagerDef(cpu = 1, memory = "1024m", "2G"),
+ taskManager = TaskManagerDef(cpu = 1, memory = "1024m", "2G"),
+ job = JobDef(
+ jarURI =
"local:///opt/flink/examples/streaming/StateMachineExample.jar",
+ parallelism = 1
+ )
+ )
+ for {
+ _ <- FlinkK8sOperator.deployApplicationJob(114514, spec)
+ // subscribe job status
+ _ <-
FlinkK8sObserver.evaluatedJobSnaps.flatSubscribeValues().debugPretty.runDrain.fork
+ _ <-
FlinkK8sObserver.restSvcEndpointSnaps.flatSubscribeValues().debugPretty.runDrain.fork
+ _ <- ZIO.never
+ } yield ()
+ }
+
"Deploy an application mode job with additional jar resources such as
third-party dependencies pr udf" in unsafeRun {
val spec = FlinkDeploymentDef(
name = "appjob-with-extra-jar",