This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new 43c2a4564 [Improve] submit job by jobgraph improvement
43c2a4564 is described below
commit 43c2a4564b75c062282899db17de07fd8908c1a8
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 28 09:33:36 2023 +0800
[Improve] submit job by jobgraph improvement
---
.../src/main/assembly/script/data/mysql-data.sql | 2 +-
.../main/assembly/script/schema/mysql-schema.sql | 1 +
.../script/upgrade/mysql/{2.1.1.sql => 2.1.2.sql} | 0
.../script/upgrade/pgsql/{2.1.1.sql => 2.1.2.sql} | 0
.../src/main/resources/db/schema-h2.sql | 2 +-
.../flink/client/impl/RemoteClient.scala | 29 +++++++----------
.../flink/client/trait/FlinkClientTrait.scala | 38 ++++++++++++++++------
7 files changed, 43 insertions(+), 29 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index 71001d79b..c5ad94555 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -30,7 +30,7 @@ insert into `t_team` values (100000, 'default', null, now(),
now());
-- ----------------------------
-- Records of t_flink_app
-- ----------------------------
-insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL
Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null,
null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink
SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), now(),
null, 1, 1, null, null, null, null, null, null, 0, null, null, null,
'streampark,test');
+insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL
Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null,
null, null, null, null, null, null, '0', 0, null, null, null, null, null, null,
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(),
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null,
'streampark,test');
-- ----------------------------
-- Records of t_flink_effective
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 9c5e3264d..284992ff5 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -53,6 +53,7 @@ create table `t_flink_app` (
`jar` varchar(255) collate utf8mb4_general_ci default null,
`jar_check_sum` bigint default null,
`main_class` varchar(255) collate utf8mb4_general_ci default null,
+ `dependency` text collate utf8mb4_general_ci default null,
`args` text collate utf8mb4_general_ci,
`options` text collate utf8mb4_general_ci,
`hot_params` text collate utf8mb4_general_ci,
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.2.sql
similarity index 100%
rename from
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
rename to
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.2.sql
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.2.sql
similarity index 100%
rename from
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
rename to
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.2.sql
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index d06f907e0..158bce5c7 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -46,7 +46,7 @@ create table if not exists `t_flink_app` (
`jar` varchar(255) default null,
`jar_check_sum` bigint default null,
`main_class` varchar(255) default null,
- `dependency` text ,
+ `dependency` text,
`args` text,
`options` text,
`hot_params` text ,
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index 27f126f33..c986bc8d0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -26,9 +26,11 @@ import org.apache.flink.api.common.JobID
import org.apache.flink.client.deployment.{DefaultClusterClientServiceLoader,
StandaloneClusterDescriptor, StandaloneClusterId}
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
+import org.apache.flink.yarn.configuration.YarnConfigOptions
import java.io.File
import java.lang.{Integer => JavaInt}
+import java.util
import scala.util.{Failure, Success, Try}
@@ -111,23 +113,16 @@ object RemoteClient extends FlinkClientTrait {
// retrieve standalone session cluster and submit flink job on session mode
var clusterDescriptor: StandaloneClusterDescriptor = null;
var client: ClusterClient[StandaloneClusterId] = null
- Try {
- val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
- val yarnClusterId: StandaloneClusterId = standAloneDescriptor._1
- clusterDescriptor = standAloneDescriptor._2
-
- client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
- val jobId =
- FlinkSessionSubmitHelper.submitViaRestApi(client.getWebInterfaceURL,
fatJar, flinkConfig)
- logInfo(
- s"${submitRequest.executionMode} mode submit by restApi,
WebInterfaceURL ${client.getWebInterfaceURL}, jobId: $jobId")
- SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
- } match {
- case Success(s) => s
- case Failure(e) =>
- logError(s"${submitRequest.executionMode} mode submit by restApi
fail.")
- throw e
- }
+ val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
+ val yarnClusterId: StandaloneClusterId = standAloneDescriptor._1
+ clusterDescriptor = standAloneDescriptor._2
+
+ client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
+ val jobId =
+ FlinkSessionSubmitHelper.submitViaRestApi(client.getWebInterfaceURL,
fatJar, flinkConfig)
+ logInfo(
+ s"${submitRequest.executionMode} mode submit by restApi, WebInterfaceURL
${client.getWebInterfaceURL}, jobId: $jobId")
+ SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
}
/** Submit flink session job with building JobGraph via Standalone
ClusterClient api. */
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 79d4b9bf8..3d69c760a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode}
-import org.apache.streampark.common.util.{DeflaterUtils, Logger,
PropertiesUtils}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger,
PropertiesUtils, Utils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -40,10 +40,10 @@ import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
import java.io.File
+import java.net.URL
import java.util.{Collections, List => JavaList, Map => JavaMap}
import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.language.postfixOps
@@ -209,11 +209,24 @@ trait FlinkClientTrait extends Logger {
} match {
case Failure(e) =>
logWarn(
- s"[flink-submit] RestAPI Submit Plan failed, error: $e, try JobGraph
Submit Plan now.")
+ s"""
+ |\n[flink-submit] RestAPI Submit Plan failed, error detail:
+
|------------------------------------------------------------------
+ |${Utils.stringifyException(e)}
+
|------------------------------------------------------------------
+ |Try JobGraph Submit Plan now...
+ |""".stripMargin
+ )
Try(jobGraphFunc(submitRequest, flinkConfig, jarFile)) match {
case Success(r) => r
case Failure(e) =>
- logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph
Submit Plan failed.")
+ logError(s"""
+ |\n[flink-submit] JobGraph Submit failed, error detail:
+
|------------------------------------------------------------------
+ |${Utils.stringifyException(e)}
+
|------------------------------------------------------------------
+ |Both Rest API Submit and JobGraph failed!
+ |""".stripMargin)
throw e
}
case Success(v) => v
@@ -224,15 +237,21 @@ trait FlinkClientTrait extends Logger {
flinkConfig: Configuration,
submitRequest: SubmitRequest,
jarFile: File): (PackagedProgram, JobGraph) = {
+
val packageProgram = PackagedProgram.newBuilder
.setJarFile(jarFile)
+ .setUserClassPaths(
+ Lists.newArrayList(
+ submitRequest.flinkVersion.flinkLib
+ .listFiles()
+ .map(_.toURI.toURL)
+ .toBuffer[URL]: _*))
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
.setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
- .setArguments(
- flinkConfig
- .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
- .orElse(Lists.newArrayList()): _*)
+ .setArguments(flinkConfig
+ .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+ .orElse(Lists.newArrayList()): _*)
.build()
val jobGraph = PackagedProgramUtils.createJobGraph(
@@ -416,9 +435,8 @@ trait FlinkClientTrait extends Logger {
case _ if
Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
}
-
}
- programArgs.toList.asJava
+ Lists.newArrayList(programArgs: _*)
}
private[this] def applyConfiguration(