This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new 31754c6a8 [Improve] flink remote mode submit job improvement
31754c6a8 is described below

commit 31754c6a8104c347246a8039b43d322c6ecbdcbe
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 22:20:47 2023 +0800

    [Improve] flink remote mode submit job improvement
---
 .../impl/KubernetesNativeSessionClient.scala       |  5 ++--
 .../flink/client/impl/RemoteClient.scala           | 11 +++----
 .../flink/client/trait/FlinkClientTrait.scala      | 35 +++++++++++-----------
 3 files changed, 25 insertions(+), 26 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 346a8f1c6..621b7ac8a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -54,8 +54,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
       StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
       s"[flink-submit] submit flink job failed, clusterId is null, 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
     )
-    super.trySubmit(submitRequest, flinkConfig, 
submitRequest.userJarFile)(restApiSubmit)(
-      jobGraphSubmit)
+    super.trySubmit(submitRequest, flinkConfig, submitRequest.userJarFile)(
+      jobGraphSubmit,
+      restApiSubmit)
   }
 
   /** Submit flink session job via rest api. */
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index 5569662a0..f8a433305 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.client.impl
 
-import org.apache.streampark.common.util.{ClassLoaderUtils, Utils}
+import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
 import org.apache.streampark.flink.client.bean.{CancelRequest, CancelResponse, 
SavepointRequestTrait, SavepointResponse, SubmitRequest, SubmitResponse, 
TriggerSavepointRequest}
 import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
@@ -26,13 +26,9 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.client.deployment.{DefaultClusterClientServiceLoader, 
StandaloneClusterDescriptor, StandaloneClusterId}
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration._
-import org.apache.flink.yarn.configuration.YarnConfigOptions
 
 import java.io.File
 import java.lang.{Integer => JavaInt}
-import java.util
-
-import scala.util.{Failure, Success, Try}
 
 /** Submit Job to Remote Cluster */
 object RemoteClient extends FlinkClientTrait {
@@ -48,8 +44,9 @@ object RemoteClient extends FlinkClientTrait {
       flinkConfig: Configuration): SubmitResponse = {
 
     // 2) submit job
-    super.trySubmit(submitRequest, flinkConfig, 
submitRequest.userJarFile)(restApiSubmit)(
-      jobGraphSubmit)
+    super.trySubmit(submitRequest, flinkConfig, submitRequest.userJarFile)(
+      jobGraphSubmit,
+      restApiSubmit)
   }
 
   override def doCancel(request: CancelRequest, flinkConfig: Configuration): 
CancelResponse = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 3212b6451..58dc7f82b 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -40,7 +40,6 @@ import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import java.io.File
-import java.net.URL
 import java.util.{Collections, List => JavaList, Map => JavaMap}
 
 import scala.collection.JavaConversions._
@@ -200,33 +199,35 @@ trait FlinkClientTrait extends Logger {
   def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): 
CancelResponse
 
   def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration, 
jarFile: File)(
-      restApiFunc: (SubmitRequest, Configuration, File) => SubmitResponse)(
-      jobGraphFunc: (SubmitRequest, Configuration, File) => SubmitResponse): 
SubmitResponse = {
-    // Prioritize using Rest API submit while using JobGraph submit plan as 
backup
+      jobGraphFunc: (SubmitRequest, Configuration, File) => SubmitResponse,
+      restApiFunc: (SubmitRequest, Configuration, File) => SubmitResponse): 
SubmitResponse = {
+    // Prioritize using JobGraph submit plan while using Rest API submit plan 
as backup
     Try {
-      logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
-      restApiFunc(submitRequest, flinkConfig, jarFile)
+      logInfo(s"[flink-submit] Submit job with JobGraph Plan.")
+      jobGraphFunc(submitRequest, flinkConfig, jarFile)
     } match {
       case Failure(e) =>
         logWarn(
-          s"""
-             |\n[flink-submit] RestAPI Submit Plan failed, error detail:
+          s"""\n
+             |[flink-submit] JobGraph Submit Plan failed, error detail:
              
|------------------------------------------------------------------
              |${Utils.stringifyException(e)}
              
|------------------------------------------------------------------
-             |Try JobGraph Submit Plan now...
+             |Now retry submit with RestAPI Plan ...
              |""".stripMargin
         )
-        Try(jobGraphFunc(submitRequest, flinkConfig, jarFile)) match {
+        Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match {
           case Success(r) => r
           case Failure(e) =>
-            logError(s"""
-                        |\n[flink-submit] JobGraph Submit failed, error detail:
-                        
|------------------------------------------------------------------
-                        |${Utils.stringifyException(e)}
-                        
|------------------------------------------------------------------
-                        |Both Rest API Submit and JobGraph failed!
-                        |""".stripMargin)
+            logError(
+              s"""\n
+                 |[flink-submit] RestAPI Submit failed, error detail:
+                 
|------------------------------------------------------------------
+                 |${Utils.stringifyException(e)}
+                 
|------------------------------------------------------------------
+                 |Both JobGraph submit plan and Rest API submit plan all 
failed!
+                 |""".stripMargin
+            )
             throw e
         }
       case Success(v) => v

Reply via email to