This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch pyflink in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 7db201db3ee36dd57681afe1e9ae09dc5c780398 Author: benjobs <[email protected]> AuthorDate: Sat Sep 2 18:32:51 2023 +0800 [Improve] pyflink improvement --- .../streampark/common/enums/DevelopmentMode.java | 5 +- .../core/service/impl/ApplicationServiceImpl.java | 212 ++++++++++++--------- .../flink/client/bean/SubmitRequest.scala | 2 +- .../impl/KubernetesNativeSessionClient.scala | 24 +-- .../streampark/flink/client/impl/LocalClient.scala | 7 +- .../flink/client/impl/RemoteClient.scala | 25 +-- .../flink/client/impl/YarnApplicationClient.scala | 48 ++--- .../flink/client/impl/YarnPerJobClient.scala | 7 +- .../flink/client/impl/YarnSessionClient.scala | 7 +- .../flink/client/trait/FlinkClientTrait.scala | 96 +++++----- .../apache/streampark/flink/util/FlinkUtils.scala | 16 -- 11 files changed, 208 insertions(+), 241 deletions(-) diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java index 5649d5f95..6fdef4ce5 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java @@ -25,7 +25,10 @@ public enum DevelopmentMode implements Serializable { CUSTOM_CODE("Custom Code", 1), /** Flink SQL */ - FLINK_SQL("Flink SQL", 2); + FLINK_SQL("Flink SQL", 2), + + /** Py flink */ + PYFLINK("Python Flink", 3); private final String mode; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index c76540eb4..5b7dc19b1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -27,6 +27,7 @@ import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.fs.HdfsOperator; import org.apache.streampark.common.fs.LfsOperator; +import org.apache.streampark.common.tuple.Tuple2; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.FileUtils; @@ -734,6 +735,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli resourceService.findByResourceName(appParam.getTeamId(), appParam.getJar()); if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) { jarPath = resource.getFilePath(); + // todo: we need to set the JobType to pyflink in the job add page of the front-end + if (jarPath.endsWith(ConfigConst.PYTHON_SUFFIX())) { + appParam.setJobType(DevelopmentMode.PYFLINK.getValue()); + } } } appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new File(jarPath))); @@ -1421,6 +1426,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli throw new ApiAlertException("[StreamPark] The application cannot be started repeatedly."); } + AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); + Utils.notNull(buildPipeline); + FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId()); if (flinkEnv == null) { throw new ApiAlertException("[StreamPark] can no found flink version"); @@ -1440,8 +1448,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli starting(application); application.setAllowNonRestored(appParam.getAllowNonRestored()); - String appConf; - String flinkUserJar = null; String jobId = new JobID().toHexString(); ApplicationLog applicationLog = new ApplicationLog(); applicationLog.setOptionName(Operation.START.getValue()); @@ -1451,84 +1457,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli // set the latest to Effective, (it will only become the current effective at this time) this.toEffective(application); - ApplicationConfig applicationConfig = configService.getEffective(application.getId()); - ExecutionMode executionMode = ExecutionMode.of(application.getExecutionMode()); - ApiAlertException.throwIfNull( - executionMode, "ExecutionMode can't be null, start application failed."); - if (application.isCustomCodeJob()) { - if (application.isUploadJob()) { - appConf = - String.format( - "json://{\"%s\":\"%s\"}", - ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); - } else { - switch (application.getApplicationType()) { - case STREAMPARK_FLINK: - ConfigFileType fileType = ConfigFileType.of(applicationConfig.getFormat()); - if (fileType != null && !fileType.equals(ConfigFileType.UNKNOWN)) { - appConf = - String.format("%s://%s", fileType.getTypeName(), applicationConfig.getContent()); - } else { - throw new IllegalArgumentException( - "application' config type error,must be ( yaml| properties| hocon )"); - } - break; - case APACHE_FLINK: - appConf = - String.format( - "json://{\"%s\":\"%s\"}", - ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); - break; - default: - throw new IllegalArgumentException( - "[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... "); - } - } - - if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) { - switch (application.getApplicationType()) { - case STREAMPARK_FLINK: - flinkUserJar = - String.format( - "%s/%s", application.getAppLib(), application.getModule().concat(".jar")); - break; - case APACHE_FLINK: - flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); - if (!FsOperator.hdfs().exists(flinkUserJar)) { - Resource resource = - resourceService.findByResourceName(application.getTeamId(), application.getJar()); - if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) { - flinkUserJar = - String.format( - "%s/%s", - application.getAppHome(), new File(resource.getFilePath()).getName()); - } - } - break; - default: - throw new IllegalArgumentException( - "[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... "); - } - } - } else if (application.isFlinkSqlJob()) { - FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false); - Utils.notNull(flinkSql); - // 1) dist_userJar - String sqlDistJar = commonService.getSqlClientJar(flinkEnv); - // 2) appConfig - appConf = - applicationConfig == null - ? null - : String.format("yaml://%s", applicationConfig.getContent()); - // 3) client - if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) { - String clientPath = Workspace.remote().APP_CLIENT(); - flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar); - } - } else { - throw new UnsupportedOperationException("Unsupported..."); - } - Map<String, Object> extraParameter = new HashMap<>(0); if (application.isFlinkSqlJob()) { FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true); @@ -1544,12 +1472,12 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli application.getK8sNamespace(), application.getK8sRestExposedTypeEnum()); - AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); - - Utils.notNull(buildPipeline); + Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application); + String flinkUserJar = userJarAndAppConf.f0; + String appConf = userJarAndAppConf.f1; BuildResult buildResult = buildPipeline.getBuildResult(); - if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) { + if (ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) { buildResult = new ShadedBuildResponse(null, flinkUserJar, true); } @@ -1557,14 +1485,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli String applicationArgs = variableService.replaceVariable(application.getTeamId(), application.getArgs()); - String pyflinkFilePath = ""; - Resource resource = - resourceService.findByResourceName(application.getTeamId(), application.getJar()); - if (resource != null - && StringUtils.isNotBlank(resource.getFilePath()) - && resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX())) { - pyflinkFilePath = resource.getFilePath(); - } SubmitRequest submitRequest = new SubmitRequest( flinkEnv.getFlinkVersion(), @@ -1580,7 +1500,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli getSavePointed(appParam), appParam.getRestoreMode() == null ? null : RestoreMode.of(appParam.getRestoreMode()), applicationArgs, - pyflinkFilePath, buildResult, kubernetesSubmitParam, extraParameter); @@ -1682,6 +1601,113 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli }); } + private Tuple2<String, String> getUserJarAndAppConf(FlinkEnv flinkEnv, Application application) { + ExecutionMode executionMode = application.getExecutionModeEnum(); + ApplicationConfig applicationConfig = configService.getEffective(application.getId()); + + ApiAlertException.throwIfNull( + executionMode, "ExecutionMode can't be null, start application failed."); + + String flinkUserJar = null; + String appConf = null; + + switch (application.getDevelopmentMode()) { + case FLINK_SQL: + FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false); + Utils.notNull(flinkSql); + // 1) dist_userJar + String sqlDistJar = commonService.getSqlClientJar(flinkEnv); + // 2) appConfig + appConf = + applicationConfig == null + ? null + : String.format("yaml://%s", applicationConfig.getContent()); + // 3) client + if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) { + String clientPath = Workspace.remote().APP_CLIENT(); + flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar); + } + break; + + case PYFLINK: + Resource resource = + resourceService.findByResourceName(application.getTeamId(), application.getJar()); + + ApiAlertException.throwIfNull( + resource, "pyflink file can't be null, start application failed."); + + ApiAlertException.throwIfNull( + resource.getFilePath(), "pyflink file can't be null, start application failed."); + + ApiAlertException.throwIfFalse( + resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX()), + "pyflink format error, must be a \".py\" suffix, start application failed."); + + flinkUserJar = resource.getFilePath(); + break; + + case CUSTOM_CODE: + if (application.isUploadJob()) { + appConf = + String.format( + "json://{\"%s\":\"%s\"}", + ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + } else { + switch (application.getApplicationType()) { + case STREAMPARK_FLINK: + ConfigFileType fileType = ConfigFileType.of(applicationConfig.getFormat()); + if (fileType != null && !fileType.equals(ConfigFileType.UNKNOWN)) { + appConf = + String.format( + "%s://%s", fileType.getTypeName(), applicationConfig.getContent()); + } else { + throw new IllegalArgumentException( + "application' config type error,must be ( yaml| properties| hocon )"); + } + break; + case APACHE_FLINK: + appConf = + String.format( + "json://{\"%s\":\"%s\"}", + ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + break; + default: + throw new IllegalArgumentException( + "[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... "); + } + } + + if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) { + switch (application.getApplicationType()) { + case STREAMPARK_FLINK: + flinkUserJar = + String.format( + "%s/%s", application.getAppLib(), application.getModule().concat(".jar")); + break; + case APACHE_FLINK: + flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); + if (!FsOperator.hdfs().exists(flinkUserJar)) { + resource = + resourceService.findByResourceName( + application.getTeamId(), application.getJar()); + if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) { + flinkUserJar = + String.format( + "%s/%s", + application.getAppHome(), new File(resource.getFilePath()).getName()); + } + } + break; + default: + throw new IllegalArgumentException( + "[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... "); + } + } + break; + } + return Tuple2.of(flinkUserJar, appConf); + } + private Map<String, Object> getProperties(Application application) { Map<String, Object> properties = new HashMap<>(application.getOptionMap()); if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 575d6eea9..d06c036c1 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -63,7 +63,6 @@ case class SubmitRequest( savePoint: String, restoreMode: RestoreMode, args: String, - pyflinkFilePath: String = "", @Nullable buildResult: BuildResult, @Nullable k8sSubmitParam: KubernetesSubmitParam, @Nullable extraParameter: JavaMap[String, Any]) { @@ -74,6 +73,7 @@ case class SubmitRequest( lazy val appMain: String = this.developmentMode match { case DevelopmentMode.FLINK_SQL => ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS + case DevelopmentMode.PYFLINK => ConfigConst.PYTHON_DRIVER_CLASS_NAME case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS) } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index d609b5e5b..2daa1f68a 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -36,8 +36,6 @@ import org.apache.flink.kubernetes.configuration.{KubernetesConfigOptions, Kuber import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.ServiceExposedType import org.apache.flink.kubernetes.kubeclient.{FlinkKubeClient, FlinkKubeClientFactory} -import java.io.File - import scala.collection.convert.ImplicitConversions._ import scala.language.postfixOps import scala.util.{Failure, Success, Try} @@ -54,16 +52,12 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId), s"[flink-submit] submit flink job failed, clusterId is null, mode=${flinkConfig.get(DeploymentOptions.TARGET)}" ) - super.trySubmit(submitRequest, flinkConfig, submitRequest.userJarFile)(restApiSubmit)( - jobGraphSubmit) + super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit) } /** Submit flink session job via rest api. */ @throws[Exception] - def restApiSubmit( - submitRequest: SubmitRequest, - flinkConfig: Configuration, - fatJar: File): SubmitResponse = { + def restApiSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = { Try { // get jm rest url of flink session cluster val clusterKey = ClusterKey( @@ -75,7 +69,8 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo .getOrElse(throw new Exception( s"[flink-submit] retrieve flink session rest url failed, clusterKey=$clusterKey")) // submit job via rest api - val jobId = FlinkSessionSubmitHelper.submitViaRestApi(jmRestUrl, fatJar, flinkConfig) + val jobId = + FlinkSessionSubmitHelper.submitViaRestApi(jmRestUrl, submitRequest.userJarFile, flinkConfig) SubmitResponse(clusterKey.clusterId, flinkConfig.toMap, jobId, jmRestUrl) } match { case Success(s) => s @@ -87,10 +82,7 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo /** Submit flink session job with building JobGraph via ClusterClient api. */ @throws[Exception] - def jobGraphSubmit( - submitRequest: SubmitRequest, - flinkConfig: Configuration, - jarFile: File): SubmitResponse = { + def jobGraphSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = { // retrieve k8s cluster and submit flink job on session mode var clusterDescriptor: KubernetesClusterDescriptor = null var packageProgram: PackagedProgram = null @@ -99,9 +91,9 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo try { clusterDescriptor = getK8sClusterDescriptor(flinkConfig) // build JobGraph - val packageProgramJobGraph = super.getJobGraph(flinkConfig, submitRequest, jarFile) - packageProgram = packageProgramJobGraph._1 - val jobGraph = packageProgramJobGraph._2 + val programJobGraph = super.getJobGraph(submitRequest, flinkConfig) + packageProgram = programJobGraph._1 + val jobGraph = programJobGraph._2 // retrieve client and submit JobGraph client = clusterDescriptor .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala index 380832fab..6baf9d375 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala @@ -47,10 +47,9 @@ object LocalClient extends FlinkClientTrait { var client: ClusterClient[MiniClusterId] = null try { // build JobGraph - val packageProgramJobGraph = - super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile) - packageProgram = packageProgramJobGraph._1 - val jobGraph = packageProgramJobGraph._2 + val programJobGraph = super.getJobGraph(submitRequest, flinkConfig) + packageProgram = programJobGraph._1 + val jobGraph = programJobGraph._2 client = createLocalCluster(flinkConfig) val jobId = client.submitJob(jobGraph).get().toString SubmitResponse(jobId, flinkConfig.toMap, jobId, client.getWebInterfaceURL) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala index d38f5254a..8e5c6a5c0 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala @@ -27,7 +27,6 @@ import org.apache.flink.client.deployment.{DefaultClusterClientServiceLoader, St import org.apache.flink.client.program.{ClusterClient, PackagedProgram} import org.apache.flink.configuration._ -import java.io.File import java.lang.{Integer => JavaInt} import scala.util.{Failure, Success, Try} @@ -45,8 +44,7 @@ object RemoteClient extends FlinkClientTrait { submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = { // submit job - super.trySubmit(submitRequest, flinkConfig, submitRequest.userJarFile)(restApiSubmit)( - jobGraphSubmit) + super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit) } @@ -105,10 +103,7 @@ object RemoteClient extends FlinkClientTrait { /** Submit flink session job via rest api. */ // noinspection DuplicatedCode @throws[Exception] - def restApiSubmit( - submitRequest: SubmitRequest, - flinkConfig: Configuration, - fatJar: File): SubmitResponse = { + def restApiSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = { // retrieve standalone session cluster and submit flink job on session mode var clusterDescriptor: StandaloneClusterDescriptor = null; var client: ClusterClient[StandaloneClusterId] = null @@ -119,7 +114,10 @@ object RemoteClient extends FlinkClientTrait { client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient val jobId = - FlinkSessionSubmitHelper.submitViaRestApi(client.getWebInterfaceURL, fatJar, flinkConfig) + FlinkSessionSubmitHelper.submitViaRestApi( + client.getWebInterfaceURL, + submitRequest.userJarFile, + flinkConfig) logInfo( s"${submitRequest.executionMode} mode submit by restApi, WebInterfaceURL ${client.getWebInterfaceURL}, jobId: $jobId") SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL) @@ -133,10 +131,7 @@ object RemoteClient extends FlinkClientTrait { /** Submit flink session job with building JobGraph via Standalone ClusterClient api. */ @throws[Exception] - def jobGraphSubmit( - submitRequest: SubmitRequest, - flinkConfig: Configuration, - jarFile: File): SubmitResponse = { + def jobGraphSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = { var clusterDescriptor: StandaloneClusterDescriptor = null; var packageProgram: PackagedProgram = null var client: ClusterClient[StandaloneClusterId] = null @@ -144,9 +139,9 @@ object RemoteClient extends FlinkClientTrait { val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig) clusterDescriptor = standAloneDescriptor._2 // build JobGraph - val packageProgramJobGraph = super.getJobGraph(flinkConfig, submitRequest, jarFile) - packageProgram = packageProgramJobGraph._1 - val jobGraph = packageProgramJobGraph._2 + val programJobGraph = super.getJobGraph(submitRequest, flinkConfig) + packageProgram = programJobGraph._1 + val jobGraph = programJobGraph._2 client = clusterDescriptor.retrieve(standAloneDescriptor._1).getClusterClient val jobId = client.submitJob(jobGraph).get().toString logInfo( diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index 9c83659d0..86501c32c 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -24,9 +24,7 @@ import org.apache.streampark.common.util.{HdfsUtils, Utils} import org.apache.streampark.flink.client.`trait`.YarnClientTrait import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse -import org.apache.streampark.flink.util.FlinkUtils -import org.apache.commons.lang3.StringUtils import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader import org.apache.flink.client.deployment.application.ApplicationConfiguration import org.apache.flink.client.program.ClusterClient @@ -38,10 +36,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.ApplicationId -import java.io.File -import java.util import java.util.Collections -import java.util.concurrent.Callable import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -99,41 +94,24 @@ object YarnApplicationClient extends YarnClientTrait { // yarn application Type .safeSet(YarnConfigOptions.APPLICATION_TYPE, submitRequest.applicationType.getName) - if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) { - val pythonVenv: String = workspace.APP_PYTHON_VENV - if (!FsOperator.hdfs.exists(pythonVenv)) { - throw new RuntimeException(s"$pythonVenv File does not exist") - } - val pyflinkFile: File = new File(submitRequest.pyflinkFilePath) - - val argList = new util.ArrayList[String]() - argList.add("-pym") - argList.add(pyflinkFile.getName.replace(ConfigConst.PYTHON_SUFFIX, "")) - - val pythonFlinkconnectorJars: String = - FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome) - if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) { - flinkConfig.setString(PipelineOptions.JARS.key(), pythonFlinkconnectorJars) + if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) { + val pyVenv: String = workspace.APP_PYTHON_VENV + if (!FsOperator.hdfs.exists(pyVenv)) { + throw new RuntimeException(s"$pyVenv File does not exist") } // yarn.ship-files - flinkConfig.setString( - YarnConfigOptions.SHIP_FILES.key(), - pyflinkFile.getParentFile.getAbsolutePath) - flinkConfig - // python.archives - .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) - // python.client.executable - .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) - // python.executable - .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) - // python.files - .safeSet(PythonOptions.PYTHON_FILES, pyflinkFile.getParentFile.getName) .safeSet( - ApplicationConfiguration.APPLICATION_MAIN_CLASS, - ConfigConst.PYTHON_DRIVER_CLASS_NAME) - .safeSet(ApplicationConfiguration.APPLICATION_ARGS, argList) + YarnConfigOptions.SHIP_FILES, + submitRequest.userJarFile.getParentFile.getAbsolutePath) + // python.files + .safeSet(PythonOptions.PYTHON_FILES, submitRequest.userJarFile.getParentFile.getName) + + val args = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS) + args.add("-pym") + args.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length)) + flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, args) } logInfo(s""" diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala index 1c2bb0394..370064f4a 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala @@ -83,10 +83,9 @@ object YarnPerJobClient extends YarnClientTrait { |------------------------------------------------------------------ |""".stripMargin) - val packageProgramJobGraph = - super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile) - packagedProgram = packageProgramJobGraph._1 - val jobGraph = packageProgramJobGraph._2 + val programJobGraph = super.getJobGraph(submitRequest, flinkConfig) + packagedProgram = programJobGraph._1 + val jobGraph = programJobGraph._2 logInfo(s""" |-------------------------<<applicationId>>------------------------ diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index 40a278e40..d3608e7cf 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -106,10 +106,9 @@ object YarnSessionClient extends YarnClientTrait { val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig) clusterDescriptor = yarnClusterDescriptor._2 val yarnClusterId: ApplicationId = yarnClusterDescriptor._1 - val packageProgramJobGraph = - super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile) - packageProgram = packageProgramJobGraph._1 - val jobGraph = packageProgramJobGraph._2 + val programJobGraph = super.getJobGraph(submitRequest, flinkConfig) + packageProgram = programJobGraph._1 + val jobGraph = programJobGraph._2 client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient val jobId = client.submitJob(jobGraph).get().toString diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 3a179d8bc..2e48f7939 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -25,7 +25,6 @@ import org.apache.streampark.common.util.{DeflaterUtils, EnvUtils, Logger} import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.core.FlinkClusterClient import org.apache.streampark.flink.core.conf.FlinkRunOption -import org.apache.streampark.flink.util.FlinkUtils import com.google.common.collect.Lists import org.apache.commons.cli.{CommandLine, Options} @@ -92,13 +91,26 @@ trait FlinkClientTrait extends Logger { |""".stripMargin) val (commandLine, flinkConfig) = getCommandLineAndFlinkConfig(submitRequest) - if (submitRequest.userJarFile != null) { - val uri = PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath) - val programOptions = ProgramOptions.create(commandLine) - val executionParameters = ExecutionConfigAccessor.fromProgramOptions( - programOptions, - Collections.singletonList(uri.toString)) - executionParameters.applyToConfiguration(flinkConfig) + + submitRequest.developmentMode match { + case DevelopmentMode.PYFLINK => + val flinkOptPath: String = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR) + if (StringUtils.isBlank(flinkOptPath)) { + logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail") + val flinkHome = submitRequest.flinkVersion.flinkHome + EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt"); + logInfo( + s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt") + } + case _ => + if (submitRequest.userJarFile != null) { + val uri = PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath) + val programOptions = ProgramOptions.create(commandLine) + val executionParameters = ExecutionConfigAccessor.fromProgramOptions( + programOptions, + Collections.singletonList(uri.toString)) + executionParameters.applyToConfiguration(flinkConfig) + } } // set common parameter @@ -144,17 +156,6 @@ trait FlinkClientTrait extends Logger { }) } - if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) { - val flinkOptPath: String = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR) - if (StringUtils.isBlank(flinkOptPath)) { - logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail") - val flinkHome = submitRequest.flinkVersion.flinkHome - EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt"); - logInfo( - s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt") - } - } - setConfig(submitRequest, flinkConfig) doSubmit(submitRequest, flinkConfig) @@ -214,31 +215,28 @@ trait FlinkClientTrait extends Logger { @throws[Exception] def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse - def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration, jarFile: File)( - restApiFunc: (SubmitRequest, Configuration, File) => SubmitResponse)( - jobGraphFunc: (SubmitRequest, Configuration, File) => SubmitResponse): SubmitResponse = { + def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration)( + restApiFunc: (SubmitRequest, Configuration) => SubmitResponse)( + jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse): SubmitResponse = { // Prioritize using Rest API submit while using JobGraph submit plan as backup Try { logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.") - restApiFunc(submitRequest, flinkConfig, jarFile) + restApiFunc(submitRequest, flinkConfig) }.getOrElse { logWarn(s"[flink-submit] RestAPI Submit Plan failed,try JobGraph Submit Plan now.") - Try(jobGraphFunc(submitRequest, flinkConfig, jarFile)) match { + Try(jobGraphFunc(submitRequest, flinkConfig)) match { case Success(r) => r case Failure(e) => logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph Submit Plan failed.") throw e } - } } private[client] def getJobGraph( - flinkConfig: Configuration, submitRequest: SubmitRequest, - jarFile: File): (PackagedProgram, JobGraph) = { - var packageProgram: PackagedProgram = null - if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) { + flinkConfig: Configuration): (PackagedProgram, JobGraph) = { + if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) { val pythonVenv: String = Workspace.local.APP_PYTHON_VENV if (!FsOperator.lfs.exists(pythonVenv)) { throw new RuntimeException(s"$pythonVenv File does not exist") @@ -250,31 +248,20 @@ trait FlinkClientTrait extends Logger { .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) // python.executable .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) - - val pythonFlinkconnectorJars: String = - FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome) - if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) { - flinkConfig.setString(PipelineOptions.JARS.key(), pythonFlinkconnectorJars) - } - - packageProgram = PackagedProgram.newBuilder - .setEntryPointClassName(ConfigConst.PYTHON_DRIVER_CLASS_NAME) - .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) - .setArguments("-py", submitRequest.pyflinkFilePath) - .build() - } else { - packageProgram = PackagedProgram.newBuilder - .setJarFile(jarFile) - .setEntryPointClassName( - flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()) - .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) - .setArguments( - flinkConfig - .getOptional(ApplicationConfiguration.APPLICATION_ARGS) - .orElse(Lists.newArrayList()): _*) - .build() } + val packageProgram = PackagedProgram.newBuilder + .setJarFile(submitRequest.userJarFile) + .setArguments( + flinkConfig + .getOptional(ApplicationConfiguration.APPLICATION_ARGS) + .orElse(Lists.newArrayList()): _*) + .setEntryPointClassName( + flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get() + ) + .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) + .build() + val jobGraph = PackagedProgramUtils.createJobGraph( packageProgram, flinkConfig, @@ -521,6 +508,11 @@ trait FlinkClientTrait extends Logger { } } + if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) { + // python file + programArgs.add("-py") + programArgs.add(submitRequest.userJarFile.getAbsolutePath) + } programArgs.toList.asJava } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala index 11364f02b..58d38d311 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala @@ -48,22 +48,6 @@ object FlinkUtils { } } - /** - * Return a sample value: - * - * file:///flink-1.16.2/lib/flink-connector-jdbc-3.1.0-1.16.jar;file:///flink-1.16.2/lib/flink-sql-connector-mysql-cdc-2.4.0.jar - * @param flinkHome - * @return - * flink-connector-xxx.jar and flink-sql-connector-xxx.jar - */ - def getPythonFlinkconnectorJars(flinkHome: String): String = { - new File(s"$flinkHome/lib").list().filter(_.matches("flink.*connector.*\\.jar")) match { - case array if array.length > 0 => - array.map(jar => s"file://$flinkHome/lib/$jar").mkString(";") - case _ => "" - } - } - def isCheckpointEnabled(map: util.Map[String, String]): Boolean = { val checkpointInterval: Duration = TimeUtils.parseDuration( map.getOrDefault(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key, "0ms"))
