This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch args
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/args by this push:
new 481016baa [Improve] minor improve
481016baa is described below
commit 481016baa80d491c64000230db7313d993b3a9da
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 01:04:37 2023 +0800
[Improve] minor improve
---
.../flink/client/trait/FlinkClientTrait.scala | 43 +++++++++++++++-------
1 file changed, 30 insertions(+), 13 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index aa21ea3ce..c7651b883 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,11 +18,11 @@
package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.{ApplicationType,
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger,
SystemPropertyUtils}
+import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils,
FileUtils, Logger, SystemPropertyUtils, Utils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -223,14 +223,30 @@ trait FlinkClientTrait extends Logger {
Try {
logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
restApiFunc(submitRequest, flinkConfig)
- }.getOrElse {
- logWarn(s"[flink-submit] RestAPI Submit Plan failed,try JobGraph Submit
Plan now.")
- Try(jobGraphFunc(submitRequest, flinkConfig)) match {
- case Success(r) => r
- case Failure(e) =>
- logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph
Submit Plan failed.")
- throw e
- }
+ } match {
+ case Failure(e) =>
+ logWarn(
+ s"""
+ |\n[flink-submit] RestAPI Submit Plan failed, error detail:
+
|------------------------------------------------------------------
+ |${ExceptionUtils.stringifyException(e)}
+
|------------------------------------------------------------------
+ |Try JobGraph Submit Plan now...
+ |""".stripMargin
+ )
+ Try(jobGraphFunc(submitRequest, flinkConfig)) match {
+ case Success(r) => r
+ case Failure(e) =>
+ logError(s"""
+ |\n[flink-submit] JobGraph Submit failed, error detail:
+
|------------------------------------------------------------------
+ |${ExceptionUtils.stringifyException(e)}
+
|------------------------------------------------------------------
+ |Both Rest API Submit and JobGraph failed!
+ |""".stripMargin)
+ throw e
+ }
+ case Success(v) => v
}
}
@@ -242,12 +258,13 @@ trait FlinkClientTrait extends Logger {
.setUserClassPaths(
Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
)
+ .setEntryPointClassName(
+
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+ )
.setArguments(
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
- .orElse(Lists.newArrayList()): _*)
- .setEntryPointClassName(
-
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+ .orElse(Lists.newArrayList()): _*
)
.setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)