This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new 31754c6a8 [Improve] flink remote mode submit job improvement
31754c6a8 is described below
commit 31754c6a8104c347246a8039b43d322c6ecbdcbe
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 22:20:47 2023 +0800
[Improve] flink remote mode submit job improvement
---
.../impl/KubernetesNativeSessionClient.scala | 5 ++--
.../flink/client/impl/RemoteClient.scala | 11 +++----
.../flink/client/trait/FlinkClientTrait.scala | 35 +++++++++++-----------
3 files changed, 25 insertions(+), 26 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 346a8f1c6..621b7ac8a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -54,8 +54,9 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
s"[flink-submit] submit flink job failed, clusterId is null,
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
)
- super.trySubmit(submitRequest, flinkConfig,
submitRequest.userJarFile)(restApiSubmit)(
- jobGraphSubmit)
+ super.trySubmit(submitRequest, flinkConfig, submitRequest.userJarFile)(
+ jobGraphSubmit,
+ restApiSubmit)
}
/** Submit flink session job via rest api. */
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index 5569662a0..f8a433305 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.client.impl
-import org.apache.streampark.common.util.{ClassLoaderUtils, Utils}
+import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
import org.apache.streampark.flink.client.bean.{CancelRequest, CancelResponse,
SavepointRequestTrait, SavepointResponse, SubmitRequest, SubmitResponse,
TriggerSavepointRequest}
import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
@@ -26,13 +26,9 @@ import org.apache.flink.api.common.JobID
import org.apache.flink.client.deployment.{DefaultClusterClientServiceLoader,
StandaloneClusterDescriptor, StandaloneClusterId}
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
-import org.apache.flink.yarn.configuration.YarnConfigOptions
import java.io.File
import java.lang.{Integer => JavaInt}
-import java.util
-
-import scala.util.{Failure, Success, Try}
/** Submit Job to Remote Cluster */
object RemoteClient extends FlinkClientTrait {
@@ -48,8 +44,9 @@ object RemoteClient extends FlinkClientTrait {
flinkConfig: Configuration): SubmitResponse = {
// 2) submit job
- super.trySubmit(submitRequest, flinkConfig,
submitRequest.userJarFile)(restApiSubmit)(
- jobGraphSubmit)
+ super.trySubmit(submitRequest, flinkConfig, submitRequest.userJarFile)(
+ jobGraphSubmit,
+ restApiSubmit)
}
override def doCancel(request: CancelRequest, flinkConfig: Configuration):
CancelResponse = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 3212b6451..58dc7f82b 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -40,7 +40,6 @@ import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
import java.io.File
-import java.net.URL
import java.util.{Collections, List => JavaList, Map => JavaMap}
import scala.collection.JavaConversions._
@@ -200,33 +199,35 @@ trait FlinkClientTrait extends Logger {
def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration):
CancelResponse
def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration,
jarFile: File)(
- restApiFunc: (SubmitRequest, Configuration, File) => SubmitResponse)(
- jobGraphFunc: (SubmitRequest, Configuration, File) => SubmitResponse):
SubmitResponse = {
- // Prioritize using Rest API submit while using JobGraph submit plan as
backup
+ jobGraphFunc: (SubmitRequest, Configuration, File) => SubmitResponse,
+ restApiFunc: (SubmitRequest, Configuration, File) => SubmitResponse):
SubmitResponse = {
+ // Prioritize using JobGraph submit plan while using Rest API submit plan
as backup
Try {
- logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
- restApiFunc(submitRequest, flinkConfig, jarFile)
+ logInfo(s"[flink-submit] Submit job with JobGraph Plan.")
+ jobGraphFunc(submitRequest, flinkConfig, jarFile)
} match {
case Failure(e) =>
logWarn(
- s"""
- |\n[flink-submit] RestAPI Submit Plan failed, error detail:
+ s"""\n
+ |[flink-submit] JobGraph Submit Plan failed, error detail:
|------------------------------------------------------------------
|${Utils.stringifyException(e)}
|------------------------------------------------------------------
- |Try JobGraph Submit Plan now...
+ |Now retry submit with RestAPI Plan ...
|""".stripMargin
)
- Try(jobGraphFunc(submitRequest, flinkConfig, jarFile)) match {
+ Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match {
case Success(r) => r
case Failure(e) =>
- logError(s"""
- |\n[flink-submit] JobGraph Submit failed, error detail:
-
|------------------------------------------------------------------
- |${Utils.stringifyException(e)}
-
|------------------------------------------------------------------
- |Both Rest API Submit and JobGraph failed!
- |""".stripMargin)
+ logError(
+ s"""\n
+ |[flink-submit] RestAPI Submit failed, error detail:
+
|------------------------------------------------------------------
+ |${Utils.stringifyException(e)}
+
|------------------------------------------------------------------
+ |Both JobGraph submit plan and Rest API submit plan all
failed!
+ |""".stripMargin
+ )
throw e
}
case Success(v) => v