This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
new 9f8bfe13f [Bug] RestOptions.AWAIT_LEADER_TIMEOUT type compatibility
improvement
9f8bfe13f is described below
commit 9f8bfe13f7a00f3eb64d1673e5f63f3fbb6eef96
Author: benjobs <[email protected]>
AuthorDate: Fri Jan 24 22:07:13 2025 +0800
[Bug] RestOptions.AWAIT_LEADER_TIMEOUT type compatibility improvement
---
.../client/tool/FlinkSessionClientHelper.scala | 21 ++++++++++++--------
.../flink/kubernetes/KubernetesRetriever.scala | 21 +++++---------------
.../watcher/FlinkCheckpointWatcher.scala | 17 +++++++---------
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 4 ++--
.../kubernetes/watcher/FlinkMetricsWatcher.scala | 23 ++++++++++------------
.../flink/kubernetes/watcher/FlinkWatcher.scala | 10 +++++++++-
6 files changed, 46 insertions(+), 50 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index fcfcd3fc1..1fc86c789 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -17,9 +17,6 @@
package org.apache.streampark.flink.client.tool
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.KubernetesRetriever
-
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration.{Configuration, CoreOptions}
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
@@ -27,18 +24,26 @@ import
org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
+import org.apache.hc.core5.util.Timeout
+import org.apache.streampark.common.util.Logger
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import java.io.File
import java.nio.charset.StandardCharsets
-
+import java.time.Duration
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
object FlinkSessionSubmitHelper extends Logger {
+ // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
+ private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
+
+ // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
+ private lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(30000L)
+
@transient
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
@@ -59,8 +64,8 @@ object FlinkSessionSubmitHelper extends Logger {
// upload flink-job jar
val uploadResult = Request
.post(s"$jmRestUrl/jars/upload")
- .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
- .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+ .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.body(
MultipartEntityBuilder
.create()
@@ -90,8 +95,8 @@ object FlinkSessionSubmitHelper extends Logger {
// refer to
https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
val resp = Request
.post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run")
- .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
- .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+ .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.body(new StringEntity(Serialization.write(new
JarRunRequest(flinkConfig))))
.execute
.returnContent()
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index da6870f17..8b0cd6908 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -17,12 +17,6 @@
package org.apache.streampark.flink.kubernetes
-import org.apache.streampark.common.util.{DateUtils, Logger, Utils}
-import org.apache.streampark.common.util.Utils.using
-import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
-import org.apache.streampark.flink.kubernetes.ingress.IngressController
-import org.apache.streampark.flink.kubernetes.model.ClusterKey
-
import io.fabric8.kubernetes.client.{DefaultKubernetesClient,
KubernetesClient, KubernetesClientException}
import org.apache.flink.client.cli.ClientOptions
import org.apache.flink.client.deployment.{ClusterClientFactory,
DefaultClusterClientServiceLoader}
@@ -30,23 +24,18 @@ import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{Configuration, DeploymentOptions,
RestOptions}
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
-import org.apache.hc.core5.util.Timeout
+import org.apache.streampark.common.util.Utils.using
+import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
+import org.apache.streampark.flink.kubernetes.model.ClusterKey
import javax.annotation.Nullable
-
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
object KubernetesRetriever extends Logger {
- // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
- val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
-
Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)
-
- // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
- val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
- Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
-
private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]()
/** get new KubernetesClient */
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 4abd10df8..bd5279441 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -17,23 +17,20 @@
package org.apache.streampark.flink.kubernetes.watcher
+import org.apache.hc.client5.http.fluent.Request
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
import
org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEvent
import org.apache.streampark.flink.kubernetes.model.{CheckpointCV, ClusterKey,
TrackId}
-
-import org.apache.hc.client5.http.fluent.Request
-import org.json4s.{DefaultFormats, JNull}
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, MetricWatcherConfig}
import org.json4s.JsonAST.JNothing
import org.json4s.jackson.JsonMethods.parse
-
-import javax.annotation.concurrent.ThreadSafe
+import org.json4s.{DefaultFormats, JNull}
import java.nio.charset.StandardCharsets
import java.util.concurrent.{ScheduledFuture, TimeUnit}
-
-import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
+import javax.annotation.concurrent.ThreadSafe
import scala.concurrent.duration.DurationLong
+import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
@@ -119,8 +116,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.def
Checkpoint.as(
Request
.get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints")
- .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
- .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+ .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8)) match {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 2d574e40d..05835252f 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -291,8 +291,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
JobDetails.as(
Request
.get(s"$restUrl/jobs/overview")
- .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
- .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+ .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent()
.asString(StandardCharsets.UTF_8)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 269ca6bc3..889714a2d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -17,23 +17,20 @@
package org.apache.streampark.flink.kubernetes.watcher
+import org.apache.flink.configuration.{JobManagerOptions, MemorySize,
TaskManagerOptions}
+import org.apache.hc.client5.http.fluent.Request
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
import
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
import org.apache.streampark.flink.kubernetes.model.{ClusterKey,
FlinkMetricCV, TrackId}
-
-import org.apache.flink.configuration.{JobManagerOptions, MemorySize,
TaskManagerOptions}
-import org.apache.hc.client5.http.fluent.Request
-import org.json4s.{DefaultFormats, JArray}
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, MetricWatcherConfig}
import org.json4s.jackson.JsonMethods.parse
-
-import javax.annotation.concurrent.ThreadSafe
+import org.json4s.{DefaultFormats, JArray}
import java.nio.charset.StandardCharsets
import java.util.concurrent.{ScheduledFuture, TimeUnit}
-
-import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
+import javax.annotation.concurrent.ThreadSafe
import scala.concurrent.duration.DurationLong
+import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
@@ -131,8 +128,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
.as(
Request
.get(s"$flinkJmRestUrl/overview")
- .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
- .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+ .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8))
@@ -144,8 +141,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
.as(
Request
.get(s"$flinkJmRestUrl/jobmanager/config")
- .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
- .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
+ .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8))
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
index fbedc5287..6ecb0bbe1 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
@@ -17,13 +17,21 @@
package org.apache.streampark.flink.kubernetes.watcher
+import org.apache.hc.core5.util.Timeout
+
+import java.time.Duration
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean
-
import scala.language.implicitConversions
trait FlinkWatcher extends AutoCloseable {
+ // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
+ lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
+
+ // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
+ lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(30000L)
+
private[this] val started: AtomicBoolean = new AtomicBoolean(false)
private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)