This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch args
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/args by this push:
     new 481016baa [Improve] minor improve
481016baa is described below

commit 481016baa80d491c64000230db7313d993b3a9da
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 01:04:37 2023 +0800

    [Improve] minor improve
---
 .../flink/client/trait/FlinkClientTrait.scala      | 43 +++++++++++++++-------
 1 file changed, 30 insertions(+), 13 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index aa21ea3ce..c7651b883 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,11 +18,11 @@
 package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
 import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.{ApplicationType, 
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
 import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger, 
SystemPropertyUtils}
+import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils, 
FileUtils, Logger, SystemPropertyUtils, Utils}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -223,14 +223,30 @@ trait FlinkClientTrait extends Logger {
     Try {
       logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
       restApiFunc(submitRequest, flinkConfig)
-    }.getOrElse {
-      logWarn(s"[flink-submit] RestAPI Submit Plan failed,try JobGraph Submit 
Plan now.")
-      Try(jobGraphFunc(submitRequest, flinkConfig)) match {
-        case Success(r) => r
-        case Failure(e) =>
-          logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph 
Submit Plan failed.")
-          throw e
-      }
+    } match {
+      case Failure(e) =>
+        logWarn(
+          s"""
+             |\n[flink-submit] RestAPI Submit Plan failed, error detail:
+             
|------------------------------------------------------------------
+             |${ExceptionUtils.stringifyException(e)}
+             
|------------------------------------------------------------------
+             |Try JobGraph Submit Plan now...
+             |""".stripMargin
+        )
+        Try(jobGraphFunc(submitRequest, flinkConfig)) match {
+          case Success(r) => r
+          case Failure(e) =>
+            logError(s"""
+                        |\n[flink-submit] JobGraph Submit failed, error detail:
+                        
|------------------------------------------------------------------
+                        |${ExceptionUtils.stringifyException(e)}
+                        
|------------------------------------------------------------------
+                        |Both Rest API Submit and JobGraph failed!
+                        |""".stripMargin)
+            throw e
+        }
+      case Success(v) => v
     }
   }
 
@@ -242,12 +258,13 @@ trait FlinkClientTrait extends Logger {
       .setUserClassPaths(
         Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
       )
+      .setEntryPointClassName(
+        
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+      )
       .setArguments(
         flinkConfig
           .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
-          .orElse(Lists.newArrayList()): _*)
-      .setEntryPointClassName(
-        
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+          .orElse(Lists.newArrayList()): _*
       )
       .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
 

Reply via email to