This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch pyflink
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 7db201db3ee36dd57681afe1e9ae09dc5c780398
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 2 18:32:51 2023 +0800

    [Improve] pyflink improvement
---
 .../streampark/common/enums/DevelopmentMode.java   |   5 +-
 .../core/service/impl/ApplicationServiceImpl.java  | 212 ++++++++++++---------
 .../flink/client/bean/SubmitRequest.scala          |   2 +-
 .../impl/KubernetesNativeSessionClient.scala       |  24 +--
 .../streampark/flink/client/impl/LocalClient.scala |   7 +-
 .../flink/client/impl/RemoteClient.scala           |  25 +--
 .../flink/client/impl/YarnApplicationClient.scala  |  48 ++---
 .../flink/client/impl/YarnPerJobClient.scala       |   7 +-
 .../flink/client/impl/YarnSessionClient.scala      |   7 +-
 .../flink/client/trait/FlinkClientTrait.scala      |  96 +++++-----
 .../apache/streampark/flink/util/FlinkUtils.scala  |  16 --
 11 files changed, 208 insertions(+), 241 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
index 5649d5f95..6fdef4ce5 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
@@ -25,7 +25,10 @@ public enum DevelopmentMode implements Serializable {
   CUSTOM_CODE("Custom Code", 1),
 
   /** Flink SQL */
-  FLINK_SQL("Flink SQL", 2);
+  FLINK_SQL("Flink SQL", 2),
+
+  /** Py flink */
+  PYFLINK("Python Flink", 3);
 
   private final String mode;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index c76540eb4..5b7dc19b1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.fs.LfsOperator;
+import org.apache.streampark.common.tuple.Tuple2;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.FileUtils;
@@ -734,6 +735,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             resourceService.findByResourceName(appParam.getTeamId(), 
appParam.getJar());
         if (resource != null && 
StringUtils.isNotBlank(resource.getFilePath())) {
           jarPath = resource.getFilePath();
+          // todo: we need to set the JobType to pyflink in the job add page 
of the front-end
+          if (jarPath.endsWith(ConfigConst.PYTHON_SUFFIX())) {
+            appParam.setJobType(DevelopmentMode.PYFLINK.getValue());
+          }
         }
       }
       
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new 
File(jarPath)));
@@ -1421,6 +1426,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       throw new ApiAlertException("[StreamPark] The application cannot be 
started repeatedly.");
     }
 
+    AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
+    Utils.notNull(buildPipeline);
+
     FlinkEnv flinkEnv = 
flinkEnvService.getByIdOrDefault(application.getVersionId());
     if (flinkEnv == null) {
       throw new ApiAlertException("[StreamPark] can no found flink version");
@@ -1440,8 +1448,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     starting(application);
     application.setAllowNonRestored(appParam.getAllowNonRestored());
 
-    String appConf;
-    String flinkUserJar = null;
     String jobId = new JobID().toHexString();
     ApplicationLog applicationLog = new ApplicationLog();
     applicationLog.setOptionName(Operation.START.getValue());
@@ -1451,84 +1457,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // set the latest to Effective, (it will only become the current effective 
at this time)
     this.toEffective(application);
 
-    ApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
-    ExecutionMode executionMode = 
ExecutionMode.of(application.getExecutionMode());
-    ApiAlertException.throwIfNull(
-        executionMode, "ExecutionMode can't be null, start application 
failed.");
-    if (application.isCustomCodeJob()) {
-      if (application.isUploadJob()) {
-        appConf =
-            String.format(
-                "json://{\"%s\":\"%s\"}",
-                ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), 
application.getMainClass());
-      } else {
-        switch (application.getApplicationType()) {
-          case STREAMPARK_FLINK:
-            ConfigFileType fileType = 
ConfigFileType.of(applicationConfig.getFormat());
-            if (fileType != null && !fileType.equals(ConfigFileType.UNKNOWN)) {
-              appConf =
-                  String.format("%s://%s", fileType.getTypeName(), 
applicationConfig.getContent());
-            } else {
-              throw new IllegalArgumentException(
-                  "application' config type error,must be ( yaml| properties| 
hocon )");
-            }
-            break;
-          case APACHE_FLINK:
-            appConf =
-                String.format(
-                    "json://{\"%s\":\"%s\"}",
-                    ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), 
application.getMainClass());
-            break;
-          default:
-            throw new IllegalArgumentException(
-                "[StreamPark] ApplicationType must be (StreamPark flink | 
Apache flink)... ");
-        }
-      }
-
-      if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
-        switch (application.getApplicationType()) {
-          case STREAMPARK_FLINK:
-            flinkUserJar =
-                String.format(
-                    "%s/%s", application.getAppLib(), 
application.getModule().concat(".jar"));
-            break;
-          case APACHE_FLINK:
-            flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
-            if (!FsOperator.hdfs().exists(flinkUserJar)) {
-              Resource resource =
-                  resourceService.findByResourceName(application.getTeamId(), 
application.getJar());
-              if (resource != null && 
StringUtils.isNotBlank(resource.getFilePath())) {
-                flinkUserJar =
-                    String.format(
-                        "%s/%s",
-                        application.getAppHome(), new 
File(resource.getFilePath()).getName());
-              }
-            }
-            break;
-          default:
-            throw new IllegalArgumentException(
-                "[StreamPark] ApplicationType must be (StreamPark flink | 
Apache flink)... ");
-        }
-      }
-    } else if (application.isFlinkSqlJob()) {
-      FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), 
false);
-      Utils.notNull(flinkSql);
-      // 1) dist_userJar
-      String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
-      // 2) appConfig
-      appConf =
-          applicationConfig == null
-              ? null
-              : String.format("yaml://%s", applicationConfig.getContent());
-      // 3) client
-      if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
-        String clientPath = Workspace.remote().APP_CLIENT();
-        flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
-      }
-    } else {
-      throw new UnsupportedOperationException("Unsupported...");
-    }
-
     Map<String, Object> extraParameter = new HashMap<>(0);
     if (application.isFlinkSqlJob()) {
       FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), 
true);
@@ -1544,12 +1472,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             application.getK8sNamespace(),
             application.getK8sRestExposedTypeEnum());
 
-    AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
-
-    Utils.notNull(buildPipeline);
+    Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, 
application);
+    String flinkUserJar = userJarAndAppConf.f0;
+    String appConf = userJarAndAppConf.f1;
 
     BuildResult buildResult = buildPipeline.getBuildResult();
-    if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+    if 
(ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
       buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
     }
 
@@ -1557,14 +1485,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     String applicationArgs =
         variableService.replaceVariable(application.getTeamId(), 
application.getArgs());
 
-    String pyflinkFilePath = "";
-    Resource resource =
-        resourceService.findByResourceName(application.getTeamId(), 
application.getJar());
-    if (resource != null
-        && StringUtils.isNotBlank(resource.getFilePath())
-        && resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX())) {
-      pyflinkFilePath = resource.getFilePath();
-    }
     SubmitRequest submitRequest =
         new SubmitRequest(
             flinkEnv.getFlinkVersion(),
@@ -1580,7 +1500,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             getSavePointed(appParam),
             appParam.getRestoreMode() == null ? null : 
RestoreMode.of(appParam.getRestoreMode()),
             applicationArgs,
-            pyflinkFilePath,
             buildResult,
             kubernetesSubmitParam,
             extraParameter);
@@ -1682,6 +1601,113 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             });
   }
 
+  private Tuple2<String, String> getUserJarAndAppConf(FlinkEnv flinkEnv, 
Application application) {
+    ExecutionMode executionMode = application.getExecutionModeEnum();
+    ApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
+
+    ApiAlertException.throwIfNull(
+        executionMode, "ExecutionMode can't be null, start application 
failed.");
+
+    String flinkUserJar = null;
+    String appConf = null;
+
+    switch (application.getDevelopmentMode()) {
+      case FLINK_SQL:
+        FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), 
false);
+        Utils.notNull(flinkSql);
+        // 1) dist_userJar
+        String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
+        // 2) appConfig
+        appConf =
+            applicationConfig == null
+                ? null
+                : String.format("yaml://%s", applicationConfig.getContent());
+        // 3) client
+        if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+          String clientPath = Workspace.remote().APP_CLIENT();
+          flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
+        }
+        break;
+
+      case PYFLINK:
+        Resource resource =
+            resourceService.findByResourceName(application.getTeamId(), 
application.getJar());
+
+        ApiAlertException.throwIfNull(
+            resource, "pyflink file can't be null, start application failed.");
+
+        ApiAlertException.throwIfNull(
+            resource.getFilePath(), "pyflink file can't be null, start 
application failed.");
+
+        ApiAlertException.throwIfFalse(
+            resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX()),
+            "pyflink format error, must be a \".py\" suffix, start application 
failed.");
+
+        flinkUserJar = resource.getFilePath();
+        break;
+
+      case CUSTOM_CODE:
+        if (application.isUploadJob()) {
+          appConf =
+              String.format(
+                  "json://{\"%s\":\"%s\"}",
+                  ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), 
application.getMainClass());
+        } else {
+          switch (application.getApplicationType()) {
+            case STREAMPARK_FLINK:
+              ConfigFileType fileType = 
ConfigFileType.of(applicationConfig.getFormat());
+              if (fileType != null && 
!fileType.equals(ConfigFileType.UNKNOWN)) {
+                appConf =
+                    String.format(
+                        "%s://%s", fileType.getTypeName(), 
applicationConfig.getContent());
+              } else {
+                throw new IllegalArgumentException(
+                    "application' config type error,must be ( yaml| 
properties| hocon )");
+              }
+              break;
+            case APACHE_FLINK:
+              appConf =
+                  String.format(
+                      "json://{\"%s\":\"%s\"}",
+                      ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), 
application.getMainClass());
+              break;
+            default:
+              throw new IllegalArgumentException(
+                  "[StreamPark] ApplicationType must be (StreamPark flink | 
Apache flink)... ");
+          }
+        }
+
+        if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+          switch (application.getApplicationType()) {
+            case STREAMPARK_FLINK:
+              flinkUserJar =
+                  String.format(
+                      "%s/%s", application.getAppLib(), 
application.getModule().concat(".jar"));
+              break;
+            case APACHE_FLINK:
+              flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
+              if (!FsOperator.hdfs().exists(flinkUserJar)) {
+                resource =
+                    resourceService.findByResourceName(
+                        application.getTeamId(), application.getJar());
+                if (resource != null && 
StringUtils.isNotBlank(resource.getFilePath())) {
+                  flinkUserJar =
+                      String.format(
+                          "%s/%s",
+                          application.getAppHome(), new 
File(resource.getFilePath()).getName());
+                }
+              }
+              break;
+            default:
+              throw new IllegalArgumentException(
+                  "[StreamPark] ApplicationType must be (StreamPark flink | 
Apache flink)... ");
+          }
+        }
+        break;
+    }
+    return Tuple2.of(flinkUserJar, appConf);
+  }
+
   private Map<String, Object> getProperties(Application application) {
     Map<String, Object> properties = new HashMap<>(application.getOptionMap());
     if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 575d6eea9..d06c036c1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -63,7 +63,6 @@ case class SubmitRequest(
     savePoint: String,
     restoreMode: RestoreMode,
     args: String,
-    pyflinkFilePath: String = "",
     @Nullable buildResult: BuildResult,
     @Nullable k8sSubmitParam: KubernetesSubmitParam,
     @Nullable extraParameter: JavaMap[String, Any]) {
@@ -74,6 +73,7 @@ case class SubmitRequest(
 
   lazy val appMain: String = this.developmentMode match {
     case DevelopmentMode.FLINK_SQL => 
ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS
+    case DevelopmentMode.PYFLINK => ConfigConst.PYTHON_DRIVER_CLASS_NAME
     case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index d609b5e5b..2daa1f68a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -36,8 +36,6 @@ import 
org.apache.flink.kubernetes.configuration.{KubernetesConfigOptions, Kuber
 import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.ServiceExposedType
 import org.apache.flink.kubernetes.kubeclient.{FlinkKubeClient, 
FlinkKubeClientFactory}
 
-import java.io.File
-
 import scala.collection.convert.ImplicitConversions._
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
@@ -54,16 +52,12 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
       StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
       s"[flink-submit] submit flink job failed, clusterId is null, 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
     )
-    super.trySubmit(submitRequest, flinkConfig, 
submitRequest.userJarFile)(restApiSubmit)(
-      jobGraphSubmit)
+    super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
   }
 
   /** Submit flink session job via rest api. */
   @throws[Exception]
-  def restApiSubmit(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration,
-      fatJar: File): SubmitResponse = {
+  def restApiSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): 
SubmitResponse = {
     Try {
       // get jm rest url of flink session cluster
       val clusterKey = ClusterKey(
@@ -75,7 +69,8 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         .getOrElse(throw new Exception(
           s"[flink-submit] retrieve flink session rest url failed, 
clusterKey=$clusterKey"))
       // submit job via rest api
-      val jobId = FlinkSessionSubmitHelper.submitViaRestApi(jmRestUrl, fatJar, 
flinkConfig)
+      val jobId =
+        FlinkSessionSubmitHelper.submitViaRestApi(jmRestUrl, 
submitRequest.userJarFile, flinkConfig)
       SubmitResponse(clusterKey.clusterId, flinkConfig.toMap, jobId, jmRestUrl)
     } match {
       case Success(s) => s
@@ -87,10 +82,7 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
 
   /** Submit flink session job with building JobGraph via ClusterClient api. */
   @throws[Exception]
-  def jobGraphSubmit(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration,
-      jarFile: File): SubmitResponse = {
+  def jobGraphSubmit(submitRequest: SubmitRequest, flinkConfig: 
Configuration): SubmitResponse = {
     // retrieve k8s cluster and submit flink job on session mode
     var clusterDescriptor: KubernetesClusterDescriptor = null
     var packageProgram: PackagedProgram = null
@@ -99,9 +91,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     try {
       clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
       // build JobGraph
-      val packageProgramJobGraph = super.getJobGraph(flinkConfig, 
submitRequest, jarFile)
-      packageProgram = packageProgramJobGraph._1
-      val jobGraph = packageProgramJobGraph._2
+      val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+      packageProgram = programJobGraph._1
+      val jobGraph = programJobGraph._2
       // retrieve client and submit JobGraph
       client = clusterDescriptor
         .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
index 380832fab..6baf9d375 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
@@ -47,10 +47,9 @@ object LocalClient extends FlinkClientTrait {
     var client: ClusterClient[MiniClusterId] = null
     try {
       // build JobGraph
-      val packageProgramJobGraph =
-        super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
-      packageProgram = packageProgramJobGraph._1
-      val jobGraph = packageProgramJobGraph._2
+      val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+      packageProgram = programJobGraph._1
+      val jobGraph = programJobGraph._2
       client = createLocalCluster(flinkConfig)
       val jobId = client.submitJob(jobGraph).get().toString
       SubmitResponse(jobId, flinkConfig.toMap, jobId, 
client.getWebInterfaceURL)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index d38f5254a..8e5c6a5c0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -27,7 +27,6 @@ import 
org.apache.flink.client.deployment.{DefaultClusterClientServiceLoader, St
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration._
 
-import java.io.File
 import java.lang.{Integer => JavaInt}
 
 import scala.util.{Failure, Success, Try}
@@ -45,8 +44,7 @@ object RemoteClient extends FlinkClientTrait {
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
     //  submit job
-    super.trySubmit(submitRequest, flinkConfig, 
submitRequest.userJarFile)(restApiSubmit)(
-      jobGraphSubmit)
+    super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
 
   }
 
@@ -105,10 +103,7 @@ object RemoteClient extends FlinkClientTrait {
   /** Submit flink session job via rest api. */
   // noinspection DuplicatedCode
   @throws[Exception]
-  def restApiSubmit(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration,
-      fatJar: File): SubmitResponse = {
+  def restApiSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): 
SubmitResponse = {
     // retrieve standalone session cluster and submit flink job on session mode
     var clusterDescriptor: StandaloneClusterDescriptor = null;
     var client: ClusterClient[StandaloneClusterId] = null
@@ -119,7 +114,10 @@ object RemoteClient extends FlinkClientTrait {
 
       client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
       val jobId =
-        FlinkSessionSubmitHelper.submitViaRestApi(client.getWebInterfaceURL, 
fatJar, flinkConfig)
+        FlinkSessionSubmitHelper.submitViaRestApi(
+          client.getWebInterfaceURL,
+          submitRequest.userJarFile,
+          flinkConfig)
       logInfo(
         s"${submitRequest.executionMode} mode submit by restApi, 
WebInterfaceURL ${client.getWebInterfaceURL}, jobId: $jobId")
       SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
@@ -133,10 +131,7 @@ object RemoteClient extends FlinkClientTrait {
 
   /** Submit flink session job with building JobGraph via Standalone 
ClusterClient api. */
   @throws[Exception]
-  def jobGraphSubmit(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration,
-      jarFile: File): SubmitResponse = {
+  def jobGraphSubmit(submitRequest: SubmitRequest, flinkConfig: 
Configuration): SubmitResponse = {
     var clusterDescriptor: StandaloneClusterDescriptor = null;
     var packageProgram: PackagedProgram = null
     var client: ClusterClient[StandaloneClusterId] = null
@@ -144,9 +139,9 @@ object RemoteClient extends FlinkClientTrait {
       val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
       clusterDescriptor = standAloneDescriptor._2
       // build JobGraph
-      val packageProgramJobGraph = super.getJobGraph(flinkConfig, 
submitRequest, jarFile)
-      packageProgram = packageProgramJobGraph._1
-      val jobGraph = packageProgramJobGraph._2
+      val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+      packageProgram = programJobGraph._1
+      val jobGraph = programJobGraph._2
       client = 
clusterDescriptor.retrieve(standAloneDescriptor._1).getClusterClient
       val jobId = client.submitJob(jobGraph).get().toString
       logInfo(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 9c83659d0..86501c32c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -24,9 +24,7 @@ import org.apache.streampark.common.util.{HdfsUtils, Utils}
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
-import org.apache.streampark.flink.util.FlinkUtils
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.client.program.ClusterClient
@@ -38,10 +36,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
-import java.io.File
-import java.util
 import java.util.Collections
-import java.util.concurrent.Callable
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
@@ -99,41 +94,24 @@ object YarnApplicationClient extends YarnClientTrait {
       // yarn application Type
       .safeSet(YarnConfigOptions.APPLICATION_TYPE, 
submitRequest.applicationType.getName)
 
-    if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
-      val pythonVenv: String = workspace.APP_PYTHON_VENV
-      if (!FsOperator.hdfs.exists(pythonVenv)) {
-        throw new RuntimeException(s"$pythonVenv File does not exist")
-      }
-      val pyflinkFile: File = new File(submitRequest.pyflinkFilePath)
-
-      val argList = new util.ArrayList[String]()
-      argList.add("-pym")
-      argList.add(pyflinkFile.getName.replace(ConfigConst.PYTHON_SUFFIX, ""))
-
-      val pythonFlinkconnectorJars: String =
-        
FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome)
-      if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) {
-        flinkConfig.setString(PipelineOptions.JARS.key(), 
pythonFlinkconnectorJars)
+    if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
+      val pyVenv: String = workspace.APP_PYTHON_VENV
+      if (!FsOperator.hdfs.exists(pyVenv)) {
+        throw new RuntimeException(s"$pyVenv File does not exist")
       }
 
       // yarn.ship-files
-      flinkConfig.setString(
-        YarnConfigOptions.SHIP_FILES.key(),
-        pyflinkFile.getParentFile.getAbsolutePath)
-
       flinkConfig
-        // python.archives
-        .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
-        // python.client.executable
-        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
-        // python.executable
-        .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
-        // python.files
-        .safeSet(PythonOptions.PYTHON_FILES, pyflinkFile.getParentFile.getName)
         .safeSet(
-          ApplicationConfiguration.APPLICATION_MAIN_CLASS,
-          ConfigConst.PYTHON_DRIVER_CLASS_NAME)
-        .safeSet(ApplicationConfiguration.APPLICATION_ARGS, argList)
+          YarnConfigOptions.SHIP_FILES,
+          submitRequest.userJarFile.getParentFile.getAbsolutePath)
+        // python.files
+        .safeSet(PythonOptions.PYTHON_FILES, 
submitRequest.userJarFile.getParentFile.getName)
+
+      val args = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
+      args.add("-pym")
+      
args.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
+      flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, args)
     }
 
     logInfo(s"""
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 1c2bb0394..370064f4a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -83,10 +83,9 @@ object YarnPerJobClient extends YarnClientTrait {
                    
|------------------------------------------------------------------
                    |""".stripMargin)
 
-        val packageProgramJobGraph =
-          super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
-        packagedProgram = packageProgramJobGraph._1
-        val jobGraph = packageProgramJobGraph._2
+        val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+        packagedProgram = programJobGraph._1
+        val jobGraph = programJobGraph._2
 
         logInfo(s"""
                    
|-------------------------<<applicationId>>------------------------
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 40a278e40..d3608e7cf 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -106,10 +106,9 @@ object YarnSessionClient extends YarnClientTrait {
       val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
       val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
-      val packageProgramJobGraph =
-        super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
-      packageProgram = packageProgramJobGraph._1
-      val jobGraph = packageProgramJobGraph._2
+      val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+      packageProgram = programJobGraph._1
+      val jobGraph = programJobGraph._2
 
       client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
       val jobId = client.submitJob(jobGraph).get().toString
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 3a179d8bc..2e48f7939 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -25,7 +25,6 @@ import org.apache.streampark.common.util.{DeflaterUtils, 
EnvUtils, Logger}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
-import org.apache.streampark.flink.util.FlinkUtils
 
 import com.google.common.collect.Lists
 import org.apache.commons.cli.{CommandLine, Options}
@@ -92,13 +91,26 @@ trait FlinkClientTrait extends Logger {
          |""".stripMargin)
 
     val (commandLine, flinkConfig) = 
getCommandLineAndFlinkConfig(submitRequest)
-    if (submitRequest.userJarFile != null) {
-      val uri = 
PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
-      val programOptions = ProgramOptions.create(commandLine)
-      val executionParameters = ExecutionConfigAccessor.fromProgramOptions(
-        programOptions,
-        Collections.singletonList(uri.toString))
-      executionParameters.applyToConfiguration(flinkConfig)
+
+    submitRequest.developmentMode match {
+      case DevelopmentMode.PYFLINK =>
+        val flinkOptPath: String = 
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
+        if (StringUtils.isBlank(flinkOptPath)) {
+          logWarn(s"Get environment variable 
${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
+          val flinkHome = submitRequest.flinkVersion.flinkHome
+          EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, 
s"$flinkHome/opt");
+          logInfo(
+            s"Set temporary environment variables 
${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
+        }
+      case _ =>
+        if (submitRequest.userJarFile != null) {
+          val uri = 
PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
+          val programOptions = ProgramOptions.create(commandLine)
+          val executionParameters = ExecutionConfigAccessor.fromProgramOptions(
+            programOptions,
+            Collections.singletonList(uri.toString))
+          executionParameters.applyToConfiguration(flinkConfig)
+        }
     }
 
     // set common parameter
@@ -144,17 +156,6 @@ trait FlinkClientTrait extends Logger {
           })
     }
 
-    if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
-      val flinkOptPath: String = 
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
-      if (StringUtils.isBlank(flinkOptPath)) {
-        logWarn(s"Get environment variable 
${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
-        val flinkHome = submitRequest.flinkVersion.flinkHome
-        EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt");
-        logInfo(
-          s"Set temporary environment variables 
${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
-      }
-    }
-
     setConfig(submitRequest, flinkConfig)
 
     doSubmit(submitRequest, flinkConfig)
@@ -214,31 +215,28 @@ trait FlinkClientTrait extends Logger {
   @throws[Exception]
   def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): 
CancelResponse
 
-  def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration, 
jarFile: File)(
-      restApiFunc: (SubmitRequest, Configuration, File) => SubmitResponse)(
-      jobGraphFunc: (SubmitRequest, Configuration, File) => SubmitResponse): 
SubmitResponse = {
+  def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration)(
+      restApiFunc: (SubmitRequest, Configuration) => SubmitResponse)(
+      jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse): 
SubmitResponse = {
     // Prioritize using Rest API submit while using JobGraph submit plan as 
backup
     Try {
       logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
-      restApiFunc(submitRequest, flinkConfig, jarFile)
+      restApiFunc(submitRequest, flinkConfig)
     }.getOrElse {
       logWarn(s"[flink-submit] RestAPI Submit Plan failed,try JobGraph Submit 
Plan now.")
-      Try(jobGraphFunc(submitRequest, flinkConfig, jarFile)) match {
+      Try(jobGraphFunc(submitRequest, flinkConfig)) match {
         case Success(r) => r
         case Failure(e) =>
           logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph 
Submit Plan failed.")
           throw e
       }
-
     }
   }
 
   private[client] def getJobGraph(
-      flinkConfig: Configuration,
       submitRequest: SubmitRequest,
-      jarFile: File): (PackagedProgram, JobGraph) = {
-    var packageProgram: PackagedProgram = null
-    if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
+      flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
+    if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
       val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
       if (!FsOperator.lfs.exists(pythonVenv)) {
         throw new RuntimeException(s"$pythonVenv File does not exist")
@@ -250,31 +248,20 @@ trait FlinkClientTrait extends Logger {
         .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
         // python.executable
         .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
-
-      val pythonFlinkconnectorJars: String =
-        
FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome)
-      if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) {
-        flinkConfig.setString(PipelineOptions.JARS.key(), 
pythonFlinkconnectorJars)
-      }
-
-      packageProgram = PackagedProgram.newBuilder
-        .setEntryPointClassName(ConfigConst.PYTHON_DRIVER_CLASS_NAME)
-        .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-        .setArguments("-py", submitRequest.pyflinkFilePath)
-        .build()
-    } else {
-      packageProgram = PackagedProgram.newBuilder
-        .setJarFile(jarFile)
-        .setEntryPointClassName(
-          
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
-        .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-        .setArguments(
-          flinkConfig
-            .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
-            .orElse(Lists.newArrayList()): _*)
-        .build()
     }
 
+    val packageProgram = PackagedProgram.newBuilder
+      .setJarFile(submitRequest.userJarFile)
+      .setArguments(
+        flinkConfig
+          .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+          .orElse(Lists.newArrayList()): _*)
+      .setEntryPointClassName(
+        
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+      )
+      .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
+      .build()
+
     val jobGraph = PackagedProgramUtils.createJobGraph(
       packageProgram,
       flinkConfig,
@@ -521,6 +508,11 @@ trait FlinkClientTrait extends Logger {
       }
     }
 
+    if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
+      // python file
+      programArgs.add("-py")
+      programArgs.add(submitRequest.userJarFile.getAbsolutePath)
+    }
     programArgs.toList.asJava
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
index 11364f02b..58d38d311 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
@@ -48,22 +48,6 @@ object FlinkUtils {
     }
   }
 
-  /**
-   * Return a sample value:
-   *
-   * 
file:///flink-1.16.2/lib/flink-connector-jdbc-3.1.0-1.16.jar;file:///flink-1.16.2/lib/flink-sql-connector-mysql-cdc-2.4.0.jar
-   * @param flinkHome
-   * @return
-   *   flink-connector-xxx.jar and flink-sql-connector-xxx.jar
-   */
-  def getPythonFlinkconnectorJars(flinkHome: String): String = {
-    new 
File(s"$flinkHome/lib").list().filter(_.matches("flink.*connector.*\\.jar")) 
match {
-      case array if array.length > 0 =>
-        array.map(jar => s"file://$flinkHome/lib/$jar").mkString(";")
-      case _ => ""
-    }
-  }
-
   def isCheckpointEnabled(map: util.Map[String, String]): Boolean = {
     val checkpointInterval: Duration = TimeUtils.parseDuration(
       
map.getOrDefault(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key, 
"0ms"))


Reply via email to