This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 078437f80 [Improve] flink job on k8s timeout improvement
078437f80 is described below

commit 078437f80c84661401ab4dece3fb48414b6b48a9
Author: benjobs <[email protected]>
AuthorDate: Sat Dec 23 21:07:23 2023 +0800

    [Improve] flink job on k8s timeout improvement
---
 .../flink/client/tool/FlinkSessionClientHelper.scala   |  8 ++++----
 .../flink/kubernetes/KubernetesRetriever.scala         | 18 +++++++++++-------
 .../kubernetes/watcher/FlinkCheckpointWatcher.scala    |  4 ++--
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala     |  4 ++--
 .../flink/kubernetes/watcher/FlinkMetricsWatcher.scala |  8 ++++----
 5 files changed, 23 insertions(+), 19 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index e7d673628..63d5e8c64 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -59,8 +59,8 @@ object FlinkSessionSubmitHelper extends Logger {
     // upload flink-job jar
     val uploadResult = Request
       .post(s"$jmRestUrl/jars/upload")
-      
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-      .responseTimeout(Timeout.ofSeconds(60))
+      .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+      .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
       .body(
         MultipartEntityBuilder
           .create()
@@ -90,8 +90,8 @@ object FlinkSessionSubmitHelper extends Logger {
     // refer to 
https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
     val resp = Request
       .post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run")
-      
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-      .responseTimeout(Timeout.ofSeconds(60))
+      .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+      .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
       .body(new StringEntity(Serialization.write(new 
JarRunRequest(flinkConfig))))
       .execute
       .returnContent()
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 17a5df525..71ee9d6de 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
RestOptions}
 import org.apache.flink.kubernetes.KubernetesClusterDescriptor
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
+import org.apache.hc.core5.util.Timeout
 
 import javax.annotation.Nullable
 
@@ -41,11 +42,12 @@ import scala.util.{Failure, Success, Try}
 object KubernetesRetriever extends Logger {
 
   // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
-  val FLINK_CLIENT_TIMEOUT_SEC = 30L
+  val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
+    
Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)
+
   // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
-  val FLINK_REST_AWAIT_TIMEOUT_SEC = 10L
-  // see org.apache.flink.configuration.RestOptions.RETRY_MAX_ATTEMPTS
-  val FLINK_REST_RETRY_MAX_ATTEMPTS = 2
+  val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
+    Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
 
   /** get new KubernetesClient */
   @throws(classOf[KubernetesClientException])
@@ -69,9 +71,11 @@ object KubernetesRetriever extends Logger {
     val flinkConfig = new Configuration()
     flinkConfig.setString(DeploymentOptions.TARGET, executeMode.toString)
     flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId)
-    flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, 
Duration.ofSeconds(FLINK_CLIENT_TIMEOUT_SEC))
-    flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 
FLINK_REST_AWAIT_TIMEOUT_SEC * 1000)
-    flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 
FLINK_REST_RETRY_MAX_ATTEMPTS)
+    flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, 
ClientOptions.CLIENT_TIMEOUT.defaultValue())
+    flinkConfig.set(
+      RestOptions.AWAIT_LEADER_TIMEOUT,
+      RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
+    flinkConfig.set(RestOptions.RETRY_MAX_ATTEMPTS, 
RestOptions.RETRY_MAX_ATTEMPTS.defaultValue())
     if (Try(namespace.isEmpty).getOrElse(true)) {
       flinkConfig.setString(
         KubernetesConfigOptions.NAMESPACE,
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index d031e82d5..3d627ea1f 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -116,8 +116,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.def
         Checkpoint.as(
           Request
             .get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints")
-            
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-            
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+            .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+            .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
             .execute
             .returnContent
             .asString(StandardCharsets.UTF_8)) match {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 55be3f9de..55f62f07f 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -267,8 +267,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
     val jobDetails = JobDetails.as(
       Request
         .get(s"$restUrl/jobs/overview")
-        
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-        
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+        .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+        .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
         .execute
         .returnContent()
         .asString(StandardCharsets.UTF_8))
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 748bc8862..6f5ff0095 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -128,8 +128,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
       .as(
         Request
           .get(s"$flinkJmRestUrl/overview")
-          
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-          
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+          .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+          .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
           .execute
           .returnContent
           .asString(StandardCharsets.UTF_8))
@@ -141,8 +141,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
         .as(
           Request
             .get(s"$flinkJmRestUrl/jobmanager/config")
-            
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-            
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+            .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+            .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
             .execute
             .returnContent
             .asString(StandardCharsets.UTF_8))

Reply via email to