This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new 9f8bfe13f [Bug] RestOptions.AWAIT_LEADER_TIMEOUT type compatibility 
improvement
9f8bfe13f is described below

commit 9f8bfe13f7a00f3eb64d1673e5f63f3fbb6eef96
Author: benjobs <[email protected]>
AuthorDate: Fri Jan 24 22:07:13 2025 +0800

    [Bug] RestOptions.AWAIT_LEADER_TIMEOUT type compatibility improvement
---
 .../client/tool/FlinkSessionClientHelper.scala     | 21 ++++++++++++--------
 .../flink/kubernetes/KubernetesRetriever.scala     | 21 +++++---------------
 .../watcher/FlinkCheckpointWatcher.scala           | 17 +++++++---------
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala |  4 ++--
 .../kubernetes/watcher/FlinkMetricsWatcher.scala   | 23 ++++++++++------------
 .../flink/kubernetes/watcher/FlinkWatcher.scala    | 10 +++++++++-
 6 files changed, 46 insertions(+), 50 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index fcfcd3fc1..1fc86c789 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -17,9 +17,6 @@
 
 package org.apache.streampark.flink.client.tool
 
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.KubernetesRetriever
-
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.configuration.{Configuration, CoreOptions}
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
@@ -27,18 +24,26 @@ import 
org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder
 import org.apache.hc.client5.http.fluent.Request
 import org.apache.hc.core5.http.ContentType
 import org.apache.hc.core5.http.io.entity.StringEntity
+import org.apache.hc.core5.util.Timeout
+import org.apache.streampark.common.util.Logger
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods._
 import org.json4s.jackson.Serialization
 
 import java.io.File
 import java.nio.charset.StandardCharsets
-
+import java.time.Duration
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success, Try}
 
 object FlinkSessionSubmitHelper extends Logger {
 
+  // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
+  private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = 
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
+
+  // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
+  private lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = 
Timeout.ofMilliseconds(30000L)
+
   @transient
   implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
 
@@ -59,8 +64,8 @@ object FlinkSessionSubmitHelper extends Logger {
     // upload flink-job jar
     val uploadResult = Request
       .post(s"$jmRestUrl/jars/upload")
-      .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
-      .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+      .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+      .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
       .body(
         MultipartEntityBuilder
           .create()
@@ -90,8 +95,8 @@ object FlinkSessionSubmitHelper extends Logger {
     // refer to 
https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
     val resp = Request
       .post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run")
-      .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
-      .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+      .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+      .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
       .body(new StringEntity(Serialization.write(new 
JarRunRequest(flinkConfig))))
       .execute
       .returnContent()
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index da6870f17..8b0cd6908 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -17,12 +17,6 @@
 
 package org.apache.streampark.flink.kubernetes
 
-import org.apache.streampark.common.util.{DateUtils, Logger, Utils}
-import org.apache.streampark.common.util.Utils.using
-import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
-import org.apache.streampark.flink.kubernetes.ingress.IngressController
-import org.apache.streampark.flink.kubernetes.model.ClusterKey
-
 import io.fabric8.kubernetes.client.{DefaultKubernetesClient, 
KubernetesClient, KubernetesClientException}
 import org.apache.flink.client.cli.ClientOptions
 import org.apache.flink.client.deployment.{ClusterClientFactory, 
DefaultClusterClientServiceLoader}
@@ -30,23 +24,18 @@ import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
RestOptions}
 import org.apache.flink.kubernetes.KubernetesClusterDescriptor
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
-import org.apache.hc.core5.util.Timeout
+import org.apache.streampark.common.util.Utils.using
+import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
+import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
 import javax.annotation.Nullable
-
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
 object KubernetesRetriever extends Logger {
 
-  // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
-  val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
-    
Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)
-
-  // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
-  val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
-    Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
-
   private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]()
 
   /** get new KubernetesClient */
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 4abd10df8..bd5279441 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -17,23 +17,20 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
+import org.apache.hc.client5.http.fluent.Request
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
 import 
org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEvent
 import org.apache.streampark.flink.kubernetes.model.{CheckpointCV, ClusterKey, 
TrackId}
-
-import org.apache.hc.client5.http.fluent.Request
-import org.json4s.{DefaultFormats, JNull}
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, MetricWatcherConfig}
 import org.json4s.JsonAST.JNothing
 import org.json4s.jackson.JsonMethods.parse
-
-import javax.annotation.concurrent.ThreadSafe
+import org.json4s.{DefaultFormats, JNull}
 
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.{ScheduledFuture, TimeUnit}
-
-import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
+import javax.annotation.concurrent.ThreadSafe
 import scala.concurrent.duration.DurationLong
+import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
 
@@ -119,8 +116,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.def
         Checkpoint.as(
           Request
             .get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints")
-            .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
-            .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+            .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+            .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
             .execute
             .returnContent
             .asString(StandardCharsets.UTF_8)) match {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 2d574e40d..05835252f 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -291,8 +291,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
     JobDetails.as(
       Request
         .get(s"$restUrl/jobs/overview")
-        .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
-        .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+        .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+        .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
         .execute
         .returnContent()
         .asString(StandardCharsets.UTF_8)
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 269ca6bc3..889714a2d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -17,23 +17,20 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
+import org.apache.flink.configuration.{JobManagerOptions, MemorySize, 
TaskManagerOptions}
+import org.apache.hc.client5.http.fluent.Request
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
 import 
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
 import org.apache.streampark.flink.kubernetes.model.{ClusterKey, 
FlinkMetricCV, TrackId}
-
-import org.apache.flink.configuration.{JobManagerOptions, MemorySize, 
TaskManagerOptions}
-import org.apache.hc.client5.http.fluent.Request
-import org.json4s.{DefaultFormats, JArray}
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, MetricWatcherConfig}
 import org.json4s.jackson.JsonMethods.parse
-
-import javax.annotation.concurrent.ThreadSafe
+import org.json4s.{DefaultFormats, JArray}
 
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.{ScheduledFuture, TimeUnit}
-
-import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
+import javax.annotation.concurrent.ThreadSafe
 import scala.concurrent.duration.DurationLong
+import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
 
@@ -131,8 +128,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
       .as(
         Request
           .get(s"$flinkJmRestUrl/overview")
-          .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
-          .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+          .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+          .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
           .execute
           .returnContent
           .asString(StandardCharsets.UTF_8))
@@ -144,8 +141,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
         .as(
           Request
             .get(s"$flinkJmRestUrl/jobmanager/config")
-            .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
-            .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+            .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+            .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
             .execute
             .returnContent
             .asString(StandardCharsets.UTF_8))
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
index fbedc5287..6ecb0bbe1 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
@@ -17,13 +17,21 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
+import org.apache.hc.core5.util.Timeout
+
+import java.time.Duration
 import java.util.concurrent.ScheduledThreadPoolExecutor
 import java.util.concurrent.atomic.AtomicBoolean
-
 import scala.language.implicitConversions
 
 trait FlinkWatcher extends AutoCloseable {
 
+  // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
+  lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = 
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
+
+  // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
+  lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = 
Timeout.ofMilliseconds(30000L)
+
   private[this] val started: AtomicBoolean = new AtomicBoolean(false)
 
   private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)

Reply via email to