This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new 43c2a4564 [Improve] submit job by jobgraph improvement
43c2a4564 is described below

commit 43c2a4564b75c062282899db17de07fd8908c1a8
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 28 09:33:36 2023 +0800

    [Improve] submit job by jobgraph improvement
---
 .../src/main/assembly/script/data/mysql-data.sql   |  2 +-
 .../main/assembly/script/schema/mysql-schema.sql   |  1 +
 .../script/upgrade/mysql/{2.1.1.sql => 2.1.2.sql}  |  0
 .../script/upgrade/pgsql/{2.1.1.sql => 2.1.2.sql}  |  0
 .../src/main/resources/db/schema-h2.sql            |  2 +-
 .../flink/client/impl/RemoteClient.scala           | 29 +++++++----------
 .../flink/client/trait/FlinkClientTrait.scala      | 38 ++++++++++++++++------
 7 files changed, 43 insertions(+), 29 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index 71001d79b..c5ad94555 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -30,7 +30,7 @@ insert into `t_team` values (100000, 'default', null, now(), 
now());
 -- ----------------------------
 -- Records of t_flink_app
 -- ----------------------------
-insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL 
Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, 
null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink 
SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), now(), 
null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 
'streampark,test');
+insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL 
Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null, 
null, null, null, null, null, null, '0', 0, null, null, null, null, null, null, 
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), 
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 
'streampark,test');
 
 -- ----------------------------
 -- Records of t_flink_effective
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 9c5e3264d..284992ff5 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -53,6 +53,7 @@ create table `t_flink_app` (
   `jar` varchar(255) collate utf8mb4_general_ci default null,
   `jar_check_sum` bigint default null,
   `main_class` varchar(255) collate utf8mb4_general_ci default null,
+  `dependency` text collate utf8mb4_general_ci default null,
   `args` text collate utf8mb4_general_ci,
   `options` text collate utf8mb4_general_ci,
   `hot_params` text collate utf8mb4_general_ci,
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.2.sql
similarity index 100%
rename from 
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
rename to 
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.2.sql
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.2.sql
similarity index 100%
rename from 
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
rename to 
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.2.sql
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index d06f907e0..158bce5c7 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -46,7 +46,7 @@ create table if not exists `t_flink_app` (
   `jar` varchar(255)  default null,
   `jar_check_sum` bigint default null,
   `main_class` varchar(255)  default null,
-  `dependency` text ,
+  `dependency` text,
   `args` text,
   `options` text,
   `hot_params` text ,
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index 27f126f33..c986bc8d0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -26,9 +26,11 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.client.deployment.{DefaultClusterClientServiceLoader, 
StandaloneClusterDescriptor, StandaloneClusterId}
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration._
+import org.apache.flink.yarn.configuration.YarnConfigOptions
 
 import java.io.File
 import java.lang.{Integer => JavaInt}
+import java.util
 
 import scala.util.{Failure, Success, Try}
 
@@ -111,23 +113,16 @@ object RemoteClient extends FlinkClientTrait {
     // retrieve standalone session cluster and submit flink job on session mode
     var clusterDescriptor: StandaloneClusterDescriptor = null;
     var client: ClusterClient[StandaloneClusterId] = null
-    Try {
-      val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
-      val yarnClusterId: StandaloneClusterId = standAloneDescriptor._1
-      clusterDescriptor = standAloneDescriptor._2
-
-      client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
-      val jobId =
-        FlinkSessionSubmitHelper.submitViaRestApi(client.getWebInterfaceURL, 
fatJar, flinkConfig)
-      logInfo(
-        s"${submitRequest.executionMode} mode submit by restApi, 
WebInterfaceURL ${client.getWebInterfaceURL}, jobId: $jobId")
-      SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
-    } match {
-      case Success(s) => s
-      case Failure(e) =>
-        logError(s"${submitRequest.executionMode} mode submit by restApi 
fail.")
-        throw e
-    }
+    val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
+    val yarnClusterId: StandaloneClusterId = standAloneDescriptor._1
+    clusterDescriptor = standAloneDescriptor._2
+
+    client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
+    val jobId =
+      FlinkSessionSubmitHelper.submitViaRestApi(client.getWebInterfaceURL, 
fatJar, flinkConfig)
+    logInfo(
+      s"${submitRequest.executionMode} mode submit by restApi, WebInterfaceURL 
${client.getWebInterfaceURL}, jobId: $jobId")
+    SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
   }
 
   /** Submit flink session job with building JobGraph via Standalone 
ClusterClient api. */
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 79d4b9bf8..3d69c760a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait`
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode}
-import org.apache.streampark.common.util.{DeflaterUtils, Logger, 
PropertiesUtils}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger, 
PropertiesUtils, Utils}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -40,10 +40,10 @@ import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import java.io.File
+import java.net.URL
 import java.util.{Collections, List => JavaList, Map => JavaMap}
 
 import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.language.postfixOps
@@ -209,11 +209,24 @@ trait FlinkClientTrait extends Logger {
     } match {
       case Failure(e) =>
         logWarn(
-          s"[flink-submit] RestAPI Submit Plan failed, error: $e, try JobGraph 
Submit Plan now.")
+          s"""
+             |\n[flink-submit] RestAPI Submit Plan failed, error detail:
+             
|------------------------------------------------------------------
+             |${Utils.stringifyException(e)}
+             
|------------------------------------------------------------------
+             |Try JobGraph Submit Plan now...
+             |""".stripMargin
+        )
         Try(jobGraphFunc(submitRequest, flinkConfig, jarFile)) match {
           case Success(r) => r
           case Failure(e) =>
-            logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph 
Submit Plan failed.")
+            logError(s"""
+                        |\n[flink-submit] JobGraph Submit failed, error detail:
+                        
|------------------------------------------------------------------
+                        |${Utils.stringifyException(e)}
+                        
|------------------------------------------------------------------
+                        |Both Rest API Submit and JobGraph failed!
+                        |""".stripMargin)
             throw e
         }
       case Success(v) => v
@@ -224,15 +237,21 @@ trait FlinkClientTrait extends Logger {
       flinkConfig: Configuration,
       submitRequest: SubmitRequest,
       jarFile: File): (PackagedProgram, JobGraph) = {
+
     val packageProgram = PackagedProgram.newBuilder
       .setJarFile(jarFile)
+      .setUserClassPaths(
+        Lists.newArrayList(
+          submitRequest.flinkVersion.flinkLib
+            .listFiles()
+            .map(_.toURI.toURL)
+            .toBuffer[URL]: _*))
       .setEntryPointClassName(
         
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
       .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-      .setArguments(
-        flinkConfig
-          .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
-          .orElse(Lists.newArrayList()): _*)
+      .setArguments(flinkConfig
+        .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+        .orElse(Lists.newArrayList()): _*)
       .build()
 
     val jobGraph = PackagedProgramUtils.createJobGraph(
@@ -416,9 +435,8 @@ trait FlinkClientTrait extends Logger {
         case _ if 
Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
           programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
       }
-
     }
-    programArgs.toList.asJava
+    Lists.newArrayList(programArgs: _*)
   }
 
   private[this] def applyConfiguration(

Reply via email to