This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 36fea2323 [Improve] Pyflink improvement (#3014)
36fea2323 is described below
commit 36fea232320caa03ff1c58753b535a790a24d3c3
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 2 23:50:58 2023 -0500
[Improve] Pyflink improvement (#3014)
* [Improve] pyflink improvement
* [Improve] pyflink improvement
* Delete ApplicationService
---
.../streampark/common/enums/DevelopmentMode.java | 5 +-
.../core/service/ApplicationBackUpService.java | 12 +-
.../impl/ApplicationActionServiceImpl.java | 220 +++++++++++----------
.../service/impl/ApplicationBackUpServiceImpl.java | 64 +++---
.../flink/client/bean/SubmitRequest.scala | 2 +-
.../impl/KubernetesNativeSessionClient.scala | 24 +--
.../streampark/flink/client/impl/LocalClient.scala | 7 +-
.../flink/client/impl/RemoteClient.scala | 25 +--
.../flink/client/impl/YarnApplicationClient.scala | 46 ++---
.../flink/client/impl/YarnPerJobClient.scala | 7 +-
.../flink/client/impl/YarnSessionClient.scala | 7 +-
.../flink/client/trait/FlinkClientTrait.scala | 96 +++++----
.../apache/streampark/flink/util/FlinkUtils.scala | 16 --
13 files changed, 243 insertions(+), 288 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
index 5649d5f95..6fdef4ce5 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
@@ -25,7 +25,10 @@ public enum DevelopmentMode implements Serializable {
CUSTOM_CODE("Custom Code", 1),
/** Flink SQL */
- FLINK_SQL("Flink SQL", 2);
+ FLINK_SQL("Flink SQL", 2),
+
+ /** Py flink */
+ PYFLINK("Python Flink", 3);
private final String mode;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
index 7e7ad2a2c..8966826e0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
@@ -30,17 +30,17 @@ public interface ApplicationBackUpService extends
IService<ApplicationBackUp> {
Boolean delete(Long id) throws InternalException;
- void backup(Application application, FlinkSql flinkSql);
+ void backup(Application appParam, FlinkSql flinkSqlParam);
- IPage<ApplicationBackUp> page(ApplicationBackUp backUp, RestRequest request);
+ IPage<ApplicationBackUp> page(ApplicationBackUp bakParam, RestRequest
request);
- void rollback(ApplicationBackUp backUp);
+ void rollback(ApplicationBackUp bakParam);
- void revoke(Application application);
+ void revoke(Application appParam);
- void removeApp(Application application);
+ void removeApp(Application appParam);
- void rollbackFlinkSql(Application application, FlinkSql sql);
+ void rollbackFlinkSql(Application appParam, FlinkSql flinkSqlParam);
boolean isFlinkSqlBacked(Long appId, Long sqlId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 9d6926a9a..ff69ed84b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.RestoreMode;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.tuple.Tuple2;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.HadoopUtils;
@@ -370,24 +371,19 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Transactional(rollbackFor = {Exception.class})
public void start(Application appParam, boolean auto) throws Exception {
final Application application = getById(appParam.getId());
- ApiAlertException.throwIfNull(application, "[StreamPark] application is
not exists.");
-
+ Utils.notNull(application);
if (!application.isCanBeStart()) {
throw new ApiAlertException("[StreamPark] The application cannot be
started repeatedly.");
}
+ AppBuildPipeline buildPipeline =
appBuildPipeService.getById(application.getId());
+ Utils.notNull(buildPipeline);
+
FlinkEnv flinkEnv =
flinkEnvService.getByIdOrDefault(application.getVersionId());
if (flinkEnv == null) {
throw new ApiAlertException("[StreamPark] can no found flink version");
}
- applicationInfoService.checkEnv(appParam);
-
- // update state to starting
- application.setState(FlinkAppState.STARTING.getValue());
- application.setOptionTime(new Date());
- updateById(application);
-
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
@@ -398,10 +394,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
appParam.setSavePointed(true);
application.setRestartCount(application.getRestartCount() + 1);
}
- application.setAllowNonRestored(appParam.getAllowNonRestored());
- String appConf;
- String flinkUserJar = null;
String jobId = new JobID().toHexString();
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(Operation.START.getValue());
@@ -411,84 +404,6 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
// set the latest to Effective, (it will only become the current effective
at this time)
applicationManageService.toEffective(application);
- ApplicationConfig applicationConfig =
configService.getEffective(application.getId());
- ExecutionMode executionMode =
ExecutionMode.of(application.getExecutionMode());
- ApiAlertException.throwIfNull(
- executionMode, "ExecutionMode can't be null, start application
failed.");
- if (application.isCustomCodeJob()) {
- if (application.isUploadJob()) {
- appConf =
- String.format(
- "json://{\"%s\":\"%s\"}",
- ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(),
application.getMainClass());
- } else {
- switch (application.getApplicationType()) {
- case STREAMPARK_FLINK:
- ConfigFileType fileType =
ConfigFileType.of(applicationConfig.getFormat());
- if (fileType != null && !fileType.equals(ConfigFileType.UNKNOWN)) {
- appConf =
- String.format("%s://%s", fileType.getTypeName(),
applicationConfig.getContent());
- } else {
- throw new IllegalArgumentException(
- "application' config type error,must be ( yaml| properties|
hocon )");
- }
- break;
- case APACHE_FLINK:
- appConf =
- String.format(
- "json://{\"%s\":\"%s\"}",
- ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(),
application.getMainClass());
- break;
- default:
- throw new IllegalArgumentException(
- "[StreamPark] ApplicationType must be (StreamPark flink |
Apache flink)... ");
- }
- }
-
- if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
- switch (application.getApplicationType()) {
- case STREAMPARK_FLINK:
- flinkUserJar =
- String.format(
- "%s/%s", application.getAppLib(),
application.getModule().concat(".jar"));
- break;
- case APACHE_FLINK:
- flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
- if (!FsOperator.hdfs().exists(flinkUserJar)) {
- Resource resource =
- resourceService.findByResourceName(application.getTeamId(),
application.getJar());
- if (resource != null &&
StringUtils.isNotBlank(resource.getFilePath())) {
- flinkUserJar =
- String.format(
- "%s/%s",
- application.getAppHome(), new
File(resource.getFilePath()).getName());
- }
- }
- break;
- default:
- throw new IllegalArgumentException(
- "[StreamPark] ApplicationType must be (StreamPark flink |
Apache flink)... ");
- }
- }
- } else if (application.isFlinkSqlJob()) {
- FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(),
false);
- Utils.notNull(flinkSql);
- // 1) dist_userJar
- String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
- // 2) appConfig
- appConf =
- applicationConfig == null
- ? null
- : String.format("yaml://%s", applicationConfig.getContent());
- // 3) client
- if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
- String clientPath = Workspace.remote().APP_CLIENT();
- flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
- }
- } else {
- throw new UnsupportedOperationException("Unsupported...");
- }
-
Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(),
true);
@@ -504,12 +419,12 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
application.getK8sNamespace(),
application.getK8sRestExposedTypeEnum());
- AppBuildPipeline buildPipeline =
appBuildPipeService.getById(application.getId());
-
- Utils.notNull(buildPipeline);
+ Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv,
application);
+ String flinkUserJar = userJarAndAppConf.f0;
+ String appConf = userJarAndAppConf.f1;
BuildResult buildResult = buildPipeline.getBuildResult();
- if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+ if
(ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
}
@@ -517,15 +432,6 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
String applicationArgs =
variableService.replaceVariable(application.getTeamId(),
application.getArgs());
- String pyflinkFilePath = "";
- Resource resource =
- resourceService.findByResourceName(application.getTeamId(),
application.getJar());
- if (resource != null
- && StringUtils.isNotBlank(resource.getFilePath())
- && resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX())) {
- pyflinkFilePath = resource.getFilePath();
- }
-
SubmitRequest submitRequest =
new SubmitRequest(
flinkEnv.getFlinkVersion(),
@@ -541,7 +447,6 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
getSavePointed(appParam),
appParam.getRestoreMode() == null ? null :
RestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
- pyflinkFilePath,
buildResult,
kubernetesSubmitParam,
extraParameter);
@@ -643,6 +548,113 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
});
}
+ private Tuple2<String, String> getUserJarAndAppConf(FlinkEnv flinkEnv,
Application application) {
+ ExecutionMode executionMode = application.getExecutionModeEnum();
+ ApplicationConfig applicationConfig =
configService.getEffective(application.getId());
+
+ ApiAlertException.throwIfNull(
+ executionMode, "ExecutionMode can't be null, start application
failed.");
+
+ String flinkUserJar = null;
+ String appConf = null;
+
+ switch (application.getDevelopmentMode()) {
+ case FLINK_SQL:
+ FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(),
false);
+ Utils.notNull(flinkSql);
+ // 1) dist_userJar
+ String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
+ // 2) appConfig
+ appConf =
+ applicationConfig == null
+ ? null
+ : String.format("yaml://%s", applicationConfig.getContent());
+ // 3) client
+ if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+ String clientPath = Workspace.remote().APP_CLIENT();
+ flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
+ }
+ break;
+
+ case PYFLINK:
+ Resource resource =
+ resourceService.findByResourceName(application.getTeamId(),
application.getJar());
+
+ ApiAlertException.throwIfNull(
+ resource, "pyflink file can't be null, start application failed.");
+
+ ApiAlertException.throwIfNull(
+ resource.getFilePath(), "pyflink file can't be null, start
application failed.");
+
+ ApiAlertException.throwIfFalse(
+ resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX()),
+ "pyflink format error, must be a \".py\" suffix, start application
failed.");
+
+ flinkUserJar = resource.getFilePath();
+ break;
+
+ case CUSTOM_CODE:
+ if (application.isUploadJob()) {
+ appConf =
+ String.format(
+ "json://{\"%s\":\"%s\"}",
+ ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(),
application.getMainClass());
+ } else {
+ switch (application.getApplicationType()) {
+ case STREAMPARK_FLINK:
+ ConfigFileType fileType =
ConfigFileType.of(applicationConfig.getFormat());
+ if (fileType != null &&
!fileType.equals(ConfigFileType.UNKNOWN)) {
+ appConf =
+ String.format(
+ "%s://%s", fileType.getTypeName(),
applicationConfig.getContent());
+ } else {
+ throw new IllegalArgumentException(
+ "application' config type error,must be ( yaml|
properties| hocon )");
+ }
+ break;
+ case APACHE_FLINK:
+ appConf =
+ String.format(
+ "json://{\"%s\":\"%s\"}",
+ ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(),
application.getMainClass());
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "[StreamPark] ApplicationType must be (StreamPark flink |
Apache flink)... ");
+ }
+ }
+
+ if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+ switch (application.getApplicationType()) {
+ case STREAMPARK_FLINK:
+ flinkUserJar =
+ String.format(
+ "%s/%s", application.getAppLib(),
application.getModule().concat(".jar"));
+ break;
+ case APACHE_FLINK:
+ flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
+ if (!FsOperator.hdfs().exists(flinkUserJar)) {
+ resource =
+ resourceService.findByResourceName(
+ application.getTeamId(), application.getJar());
+ if (resource != null &&
StringUtils.isNotBlank(resource.getFilePath())) {
+ flinkUserJar =
+ String.format(
+ "%s/%s",
+ application.getAppHome(), new
File(resource.getFilePath()).getName());
+ }
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "[StreamPark] ApplicationType must be (StreamPark flink |
Apache flink)... ");
+ }
+ }
+ break;
+ }
+ return Tuple2.of(flinkUserJar, appConf);
+ }
+
private Map<String, Object> getProperties(Application application) {
Map<String, Object> properties = new HashMap<>(application.getOptionMap());
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
index 2b7779abc..f6b6337b3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
@@ -62,23 +62,23 @@ public class ApplicationBackUpServiceImpl
@Autowired private FlinkSqlService flinkSqlService;
@Override
- public IPage<ApplicationBackUp> page(ApplicationBackUp backUp, RestRequest
request) {
+ public IPage<ApplicationBackUp> page(ApplicationBackUp bakParam, RestRequest
request) {
Page<ApplicationBackUp> page = new
MybatisPager<ApplicationBackUp>().getDefaultPage(request);
LambdaQueryWrapper<ApplicationBackUp> queryWrapper =
new LambdaQueryWrapper<ApplicationBackUp>()
- .eq(ApplicationBackUp::getAppId, backUp.getAppId());
+ .eq(ApplicationBackUp::getAppId, bakParam.getAppId());
return this.baseMapper.selectPage(page, queryWrapper);
}
@Override
@Transactional(rollbackFor = {Exception.class})
- public void rollback(ApplicationBackUp backParam) {
+ public void rollback(ApplicationBackUp bakParam) {
- Application application =
applicationManageService.getById(backParam.getAppId());
+ Application application =
applicationManageService.getById(bakParam.getAppId());
FsOperator fsOperator = application.getFsOperator();
// backup files not exist
- if (!fsOperator.exists(backParam.getPath())) {
+ if (!fsOperator.exists(bakParam.getPath())) {
return;
}
@@ -86,8 +86,8 @@ public class ApplicationBackUpServiceImpl
// When rollback, determine the currently effective project is necessary
to be
// backed up.
// If necessary, perform the backup first
- if (backParam.isBackup()) {
- application.setBackUpDescription(backParam.getDescription());
+ if (bakParam.isBackup()) {
+ application.setBackUpDescription(bakParam.getDescription());
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(),
false);
backup(application, flinkSql);
@@ -101,13 +101,13 @@ public class ApplicationBackUpServiceImpl
// if running, set Latest
if (application.isRunning()) {
// rollback to back up config
- configService.setLatestOrEffective(true, backParam.getId(),
backParam.getAppId());
+ configService.setLatestOrEffective(true, bakParam.getId(),
bakParam.getAppId());
} else {
- effectiveService.saveOrUpdate(backParam.getAppId(),
EffectiveType.CONFIG, backParam.getId());
+ effectiveService.saveOrUpdate(bakParam.getAppId(), EffectiveType.CONFIG,
bakParam.getId());
// if flink sql task, will be rollback sql and dependencies
if (application.isFlinkSqlJob()) {
effectiveService.saveOrUpdate(
- backParam.getAppId(), EffectiveType.FLINKSQL,
backParam.getSqlId());
+ bakParam.getAppId(), EffectiveType.FLINKSQL, bakParam.getSqlId());
}
}
@@ -116,7 +116,7 @@ public class ApplicationBackUpServiceImpl
fsOperator.delete(application.getAppHome());
// copy backup files to a valid dir
- fsOperator.copyDir(backParam.getPath(), application.getAppHome());
+ fsOperator.copyDir(bakParam.getPath(), application.getAppHome());
// update restart status
applicationManageService.update(
@@ -127,45 +127,45 @@ public class ApplicationBackUpServiceImpl
}
@Override
- public void revoke(Application application) {
+ public void revoke(Application appParam) {
Page<ApplicationBackUp> page = new Page<>();
page.setCurrent(0).setSize(1).setSearchCount(false);
LambdaQueryWrapper<ApplicationBackUp> queryWrapper =
new LambdaQueryWrapper<ApplicationBackUp>()
- .eq(ApplicationBackUp::getAppId, application.getId())
+ .eq(ApplicationBackUp::getAppId, appParam.getId())
.orderByDesc(ApplicationBackUp::getCreateTime);
Page<ApplicationBackUp> backUpPages = baseMapper.selectPage(page,
queryWrapper);
if (!backUpPages.getRecords().isEmpty()) {
ApplicationBackUp backup = backUpPages.getRecords().get(0);
String path = backup.getPath();
- application.getFsOperator().move(path,
application.getWorkspace().APP_WORKSPACE());
+ appParam.getFsOperator().move(path,
appParam.getWorkspace().APP_WORKSPACE());
removeById(backup.getId());
}
}
@Override
- public void removeApp(Application application) {
+ public void removeApp(Application appParam) {
try {
baseMapper.delete(
new LambdaQueryWrapper<ApplicationBackUp>()
- .eq(ApplicationBackUp::getAppId, application.getId()));
- application
+ .eq(ApplicationBackUp::getAppId, appParam.getId()));
+ appParam
.getFsOperator()
.delete(
- application
+ appParam
.getWorkspace()
.APP_BACKUPS()
.concat("/")
- .concat(application.getId().toString()));
+ .concat(appParam.getId().toString()));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
@Override
- public void rollbackFlinkSql(Application application, FlinkSql sql) {
- ApplicationBackUp backUp = getFlinkSqlBackup(application.getId(),
sql.getId());
+ public void rollbackFlinkSql(Application appParam, FlinkSql flinkSqlParam) {
+ ApplicationBackUp backUp = getFlinkSqlBackup(appParam.getId(),
flinkSqlParam.getId());
ApiAlertException.throwIfNull(
backUp, "Application backup can't be null. Rollback flink sql
failed.");
// rollback config and sql
@@ -205,29 +205,29 @@ public class ApplicationBackUpServiceImpl
@Override
@Transactional(rollbackFor = {Exception.class})
- public void backup(Application application, FlinkSql flinkSql) {
+ public void backup(Application appParam, FlinkSql flinkSqlParam) {
// basic configuration file backup
String appHome =
- (application.isCustomCodeJob() && application.isCICDJob())
- ? application.getDistHome()
- : application.getAppHome();
- FsOperator fsOperator = application.getFsOperator();
+ (appParam.isCustomCodeJob() && appParam.isCICDJob())
+ ? appParam.getDistHome()
+ : appParam.getAppHome();
+ FsOperator fsOperator = appParam.getFsOperator();
if (fsOperator.exists(appHome)) {
// move files to back up directory
- ApplicationConfig config =
configService.getEffective(application.getId());
+ ApplicationConfig config = configService.getEffective(appParam.getId());
if (config != null) {
- application.setConfigId(config.getId());
+ appParam.setConfigId(config.getId());
}
// flink sql tasks need to back up sql and dependencies
int version = 1;
- if (flinkSql != null) {
- application.setSqlId(flinkSql.getId());
- version = flinkSql.getVersion();
+ if (flinkSqlParam != null) {
+ appParam.setSqlId(flinkSqlParam.getId());
+ version = flinkSqlParam.getVersion();
} else if (config != null) {
version = config.getVersion();
}
- ApplicationBackUp applicationBackUp = new ApplicationBackUp(application);
+ ApplicationBackUp applicationBackUp = new ApplicationBackUp(appParam);
applicationBackUp.setVersion(version);
this.save(applicationBackUp);
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 575d6eea9..d06c036c1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -63,7 +63,6 @@ case class SubmitRequest(
savePoint: String,
restoreMode: RestoreMode,
args: String,
- pyflinkFilePath: String = "",
@Nullable buildResult: BuildResult,
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {
@@ -74,6 +73,7 @@ case class SubmitRequest(
lazy val appMain: String = this.developmentMode match {
case DevelopmentMode.FLINK_SQL =>
ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS
+ case DevelopmentMode.PYFLINK => ConfigConst.PYTHON_DRIVER_CLASS_NAME
case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index d609b5e5b..2daa1f68a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -36,8 +36,6 @@ import
org.apache.flink.kubernetes.configuration.{KubernetesConfigOptions, Kuber
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.ServiceExposedType
import org.apache.flink.kubernetes.kubeclient.{FlinkKubeClient,
FlinkKubeClientFactory}
-import java.io.File
-
import scala.collection.convert.ImplicitConversions._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
@@ -54,16 +52,12 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
s"[flink-submit] submit flink job failed, clusterId is null,
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
)
- super.trySubmit(submitRequest, flinkConfig,
submitRequest.userJarFile)(restApiSubmit)(
- jobGraphSubmit)
+ super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
}
/** Submit flink session job via rest api. */
@throws[Exception]
- def restApiSubmit(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration,
- fatJar: File): SubmitResponse = {
+ def restApiSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration):
SubmitResponse = {
Try {
// get jm rest url of flink session cluster
val clusterKey = ClusterKey(
@@ -75,7 +69,8 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
.getOrElse(throw new Exception(
s"[flink-submit] retrieve flink session rest url failed,
clusterKey=$clusterKey"))
// submit job via rest api
- val jobId = FlinkSessionSubmitHelper.submitViaRestApi(jmRestUrl, fatJar,
flinkConfig)
+ val jobId =
+ FlinkSessionSubmitHelper.submitViaRestApi(jmRestUrl,
submitRequest.userJarFile, flinkConfig)
SubmitResponse(clusterKey.clusterId, flinkConfig.toMap, jobId, jmRestUrl)
} match {
case Success(s) => s
@@ -87,10 +82,7 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
/** Submit flink session job with building JobGraph via ClusterClient api. */
@throws[Exception]
- def jobGraphSubmit(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration,
- jarFile: File): SubmitResponse = {
+ def jobGraphSubmit(submitRequest: SubmitRequest, flinkConfig:
Configuration): SubmitResponse = {
// retrieve k8s cluster and submit flink job on session mode
var clusterDescriptor: KubernetesClusterDescriptor = null
var packageProgram: PackagedProgram = null
@@ -99,9 +91,9 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
try {
clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
// build JobGraph
- val packageProgramJobGraph = super.getJobGraph(flinkConfig,
submitRequest, jarFile)
- packageProgram = packageProgramJobGraph._1
- val jobGraph = packageProgramJobGraph._2
+ val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+ packageProgram = programJobGraph._1
+ val jobGraph = programJobGraph._2
// retrieve client and submit JobGraph
client = clusterDescriptor
.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
index 380832fab..6baf9d375 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
@@ -47,10 +47,9 @@ object LocalClient extends FlinkClientTrait {
var client: ClusterClient[MiniClusterId] = null
try {
// build JobGraph
- val packageProgramJobGraph =
- super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
- packageProgram = packageProgramJobGraph._1
- val jobGraph = packageProgramJobGraph._2
+ val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+ packageProgram = programJobGraph._1
+ val jobGraph = programJobGraph._2
client = createLocalCluster(flinkConfig)
val jobId = client.submitJob(jobGraph).get().toString
SubmitResponse(jobId, flinkConfig.toMap, jobId,
client.getWebInterfaceURL)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index d38f5254a..8e5c6a5c0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -27,7 +27,6 @@ import
org.apache.flink.client.deployment.{DefaultClusterClientServiceLoader, St
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
-import java.io.File
import java.lang.{Integer => JavaInt}
import scala.util.{Failure, Success, Try}
@@ -45,8 +44,7 @@ object RemoteClient extends FlinkClientTrait {
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
// submit job
- super.trySubmit(submitRequest, flinkConfig,
submitRequest.userJarFile)(restApiSubmit)(
- jobGraphSubmit)
+ super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
}
@@ -105,10 +103,7 @@ object RemoteClient extends FlinkClientTrait {
/** Submit flink session job via rest api. */
// noinspection DuplicatedCode
@throws[Exception]
- def restApiSubmit(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration,
- fatJar: File): SubmitResponse = {
+ def restApiSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration):
SubmitResponse = {
// retrieve standalone session cluster and submit flink job on session mode
var clusterDescriptor: StandaloneClusterDescriptor = null;
var client: ClusterClient[StandaloneClusterId] = null
@@ -119,7 +114,10 @@ object RemoteClient extends FlinkClientTrait {
client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
val jobId =
- FlinkSessionSubmitHelper.submitViaRestApi(client.getWebInterfaceURL,
fatJar, flinkConfig)
+ FlinkSessionSubmitHelper.submitViaRestApi(
+ client.getWebInterfaceURL,
+ submitRequest.userJarFile,
+ flinkConfig)
logInfo(
s"${submitRequest.executionMode} mode submit by restApi,
WebInterfaceURL ${client.getWebInterfaceURL}, jobId: $jobId")
SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
@@ -133,10 +131,7 @@ object RemoteClient extends FlinkClientTrait {
/** Submit flink session job with building JobGraph via Standalone
ClusterClient api. */
@throws[Exception]
- def jobGraphSubmit(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration,
- jarFile: File): SubmitResponse = {
+ def jobGraphSubmit(submitRequest: SubmitRequest, flinkConfig:
Configuration): SubmitResponse = {
var clusterDescriptor: StandaloneClusterDescriptor = null;
var packageProgram: PackagedProgram = null
var client: ClusterClient[StandaloneClusterId] = null
@@ -144,9 +139,9 @@ object RemoteClient extends FlinkClientTrait {
val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
clusterDescriptor = standAloneDescriptor._2
// build JobGraph
- val packageProgramJobGraph = super.getJobGraph(flinkConfig,
submitRequest, jarFile)
- packageProgram = packageProgramJobGraph._1
- val jobGraph = packageProgramJobGraph._2
+ val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+ packageProgram = programJobGraph._1
+ val jobGraph = programJobGraph._2
client =
clusterDescriptor.retrieve(standAloneDescriptor._1).getClusterClient
val jobId = client.submitJob(jobGraph).get().toString
logInfo(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 9c83659d0..928e06414 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -24,9 +24,7 @@ import org.apache.streampark.common.util.{HdfsUtils, Utils}
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
-import org.apache.streampark.flink.util.FlinkUtils
-import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.ClusterClient
@@ -38,10 +36,8 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
-import java.io.File
import java.util
import java.util.Collections
-import java.util.concurrent.Callable
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
@@ -99,41 +95,25 @@ object YarnApplicationClient extends YarnClientTrait {
// yarn application Type
.safeSet(YarnConfigOptions.APPLICATION_TYPE,
submitRequest.applicationType.getName)
- if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
- val pythonVenv: String = workspace.APP_PYTHON_VENV
- if (!FsOperator.hdfs.exists(pythonVenv)) {
- throw new RuntimeException(s"$pythonVenv File does not exist")
- }
- val pyflinkFile: File = new File(submitRequest.pyflinkFilePath)
-
- val argList = new util.ArrayList[String]()
- argList.add("-pym")
- argList.add(pyflinkFile.getName.replace(ConfigConst.PYTHON_SUFFIX, ""))
-
- val pythonFlinkconnectorJars: String =
-
FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome)
- if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) {
- flinkConfig.setString(PipelineOptions.JARS.key(),
pythonFlinkconnectorJars)
+ if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
+ val pyVenv: String = workspace.APP_PYTHON_VENV
+ if (!FsOperator.hdfs.exists(pyVenv)) {
+ throw new RuntimeException(s"$pyVenv File does not exist")
}
// yarn.ship-files
- flinkConfig.setString(
- YarnConfigOptions.SHIP_FILES.key(),
- pyflinkFile.getParentFile.getAbsolutePath)
+ val shipFiles = new util.ArrayList[String]()
+ shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)
flinkConfig
- // python.archives
- .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
- // python.client.executable
- .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
- // python.executable
- .safeSet(PythonOptions.PYTHON_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
+ .safeSet(YarnConfigOptions.SHIP_FILES, shipFiles)
// python.files
- .safeSet(PythonOptions.PYTHON_FILES, pyflinkFile.getParentFile.getName)
- .safeSet(
- ApplicationConfiguration.APPLICATION_MAIN_CLASS,
- ConfigConst.PYTHON_DRIVER_CLASS_NAME)
- .safeSet(ApplicationConfiguration.APPLICATION_ARGS, argList)
+ .safeSet(PythonOptions.PYTHON_FILES,
submitRequest.userJarFile.getParentFile.getName)
+
+ val args = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
+ args.add("-pym")
+
args.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
+ flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, args)
}
logInfo(s"""
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 1c2bb0394..370064f4a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -83,10 +83,9 @@ object YarnPerJobClient extends YarnClientTrait {
|------------------------------------------------------------------
|""".stripMargin)
- val packageProgramJobGraph =
- super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
- packagedProgram = packageProgramJobGraph._1
- val jobGraph = packageProgramJobGraph._2
+ val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+ packagedProgram = programJobGraph._1
+ val jobGraph = programJobGraph._2
logInfo(s"""
|-------------------------<<applicationId>>------------------------
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 40a278e40..d3608e7cf 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -106,10 +106,9 @@ object YarnSessionClient extends YarnClientTrait {
val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
- val packageProgramJobGraph =
- super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
- packageProgram = packageProgramJobGraph._1
- val jobGraph = packageProgramJobGraph._2
+ val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
+ packageProgram = programJobGraph._1
+ val jobGraph = programJobGraph._2
client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
val jobId = client.submitJob(jobGraph).get().toString
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 3a179d8bc..2e48f7939 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -25,7 +25,6 @@ import org.apache.streampark.common.util.{DeflaterUtils,
EnvUtils, Logger}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
-import org.apache.streampark.flink.util.FlinkUtils
import com.google.common.collect.Lists
import org.apache.commons.cli.{CommandLine, Options}
@@ -92,13 +91,26 @@ trait FlinkClientTrait extends Logger {
|""".stripMargin)
val (commandLine, flinkConfig) =
getCommandLineAndFlinkConfig(submitRequest)
- if (submitRequest.userJarFile != null) {
- val uri =
PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
- val programOptions = ProgramOptions.create(commandLine)
- val executionParameters = ExecutionConfigAccessor.fromProgramOptions(
- programOptions,
- Collections.singletonList(uri.toString))
- executionParameters.applyToConfiguration(flinkConfig)
+
+ submitRequest.developmentMode match {
+ case DevelopmentMode.PYFLINK =>
+ val flinkOptPath: String =
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
+ if (StringUtils.isBlank(flinkOptPath)) {
+ logWarn(s"Get environment variable
${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
+ val flinkHome = submitRequest.flinkVersion.flinkHome
+ EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR,
s"$flinkHome/opt");
+ logInfo(
+ s"Set temporary environment variables
${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
+ }
+ case _ =>
+ if (submitRequest.userJarFile != null) {
+ val uri =
PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
+ val programOptions = ProgramOptions.create(commandLine)
+ val executionParameters = ExecutionConfigAccessor.fromProgramOptions(
+ programOptions,
+ Collections.singletonList(uri.toString))
+ executionParameters.applyToConfiguration(flinkConfig)
+ }
}
// set common parameter
@@ -144,17 +156,6 @@ trait FlinkClientTrait extends Logger {
})
}
- if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
- val flinkOptPath: String =
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
- if (StringUtils.isBlank(flinkOptPath)) {
- logWarn(s"Get environment variable
${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
- val flinkHome = submitRequest.flinkVersion.flinkHome
- EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt");
- logInfo(
- s"Set temporary environment variables
${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
- }
- }
-
setConfig(submitRequest, flinkConfig)
doSubmit(submitRequest, flinkConfig)
@@ -214,31 +215,28 @@ trait FlinkClientTrait extends Logger {
@throws[Exception]
def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration):
CancelResponse
- def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration,
jarFile: File)(
- restApiFunc: (SubmitRequest, Configuration, File) => SubmitResponse)(
- jobGraphFunc: (SubmitRequest, Configuration, File) => SubmitResponse):
SubmitResponse = {
+ def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration)(
+ restApiFunc: (SubmitRequest, Configuration) => SubmitResponse)(
+ jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse):
SubmitResponse = {
// Prioritize using Rest API submit while using JobGraph submit plan as
backup
Try {
logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
- restApiFunc(submitRequest, flinkConfig, jarFile)
+ restApiFunc(submitRequest, flinkConfig)
}.getOrElse {
logWarn(s"[flink-submit] RestAPI Submit Plan failed,try JobGraph Submit
Plan now.")
- Try(jobGraphFunc(submitRequest, flinkConfig, jarFile)) match {
+ Try(jobGraphFunc(submitRequest, flinkConfig)) match {
case Success(r) => r
case Failure(e) =>
logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph
Submit Plan failed.")
throw e
}
-
}
}
private[client] def getJobGraph(
- flinkConfig: Configuration,
submitRequest: SubmitRequest,
- jarFile: File): (PackagedProgram, JobGraph) = {
- var packageProgram: PackagedProgram = null
- if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
+ flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
+ if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
if (!FsOperator.lfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
@@ -250,31 +248,20 @@ trait FlinkClientTrait extends Logger {
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
-
- val pythonFlinkconnectorJars: String =
-
FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome)
- if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) {
- flinkConfig.setString(PipelineOptions.JARS.key(),
pythonFlinkconnectorJars)
- }
-
- packageProgram = PackagedProgram.newBuilder
- .setEntryPointClassName(ConfigConst.PYTHON_DRIVER_CLASS_NAME)
- .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
- .setArguments("-py", submitRequest.pyflinkFilePath)
- .build()
- } else {
- packageProgram = PackagedProgram.newBuilder
- .setJarFile(jarFile)
- .setEntryPointClassName(
-
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
- .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
- .setArguments(
- flinkConfig
- .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
- .orElse(Lists.newArrayList()): _*)
- .build()
}
+ val packageProgram = PackagedProgram.newBuilder
+ .setJarFile(submitRequest.userJarFile)
+ .setArguments(
+ flinkConfig
+ .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+ .orElse(Lists.newArrayList()): _*)
+ .setEntryPointClassName(
+
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+ )
+ .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
+ .build()
+
val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
flinkConfig,
@@ -521,6 +508,11 @@ trait FlinkClientTrait extends Logger {
}
}
+ if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
+ // python file
+ programArgs.add("-py")
+ programArgs.add(submitRequest.userJarFile.getAbsolutePath)
+ }
programArgs.toList.asJava
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
index 11364f02b..58d38d311 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
@@ -48,22 +48,6 @@ object FlinkUtils {
}
}
- /**
- * Return a sample value:
- *
- *
file:///flink-1.16.2/lib/flink-connector-jdbc-3.1.0-1.16.jar;file:///flink-1.16.2/lib/flink-sql-connector-mysql-cdc-2.4.0.jar
- * @param flinkHome
- * @return
- * flink-connector-xxx.jar and flink-sql-connector-xxx.jar
- */
- def getPythonFlinkconnectorJars(flinkHome: String): String = {
- new
File(s"$flinkHome/lib").list().filter(_.matches("flink.*connector.*\\.jar"))
match {
- case array if array.length > 0 =>
- array.map(jar => s"file://$flinkHome/lib/$jar").mkString(";")
- case _ => ""
- }
- }
-
def isCheckpointEnabled(map: util.Map[String, String]): Boolean = {
val checkpointInterval: Duration = TimeUtils.parseDuration(
map.getOrDefault(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key,
"0ms"))