This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch k8s-deploy
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/k8s-deploy by this push:
new 1a4c585ab [Improve] deploy yarn session cluster improvement
1a4c585ab is described below
commit 1a4c585abf2840ffdd6e9d49d432b2a87050f092
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 14 17:04:24 2024 +0800
[Improve] deploy yarn session cluster improvement
---
.../console/core/service/ApplicationService.java | 4 ++++
.../core/service/impl/ApplicationServiceImpl.java | 11 ++++++-----
.../core/service/impl/FlinkClusterServiceImpl.java | 11 ++++++++++-
.../streampark/flink/client/bean/DeployRequest.scala | 8 ++++++--
.../flink/client/impl/YarnSessionClient.scala | 17 +++++++++++++----
5 files changed, 39 insertions(+), 12 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 5ffa7a02e..42d30fec5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -23,6 +23,8 @@ import
org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.AppExistsState;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.web.multipart.MultipartFile;
@@ -124,4 +126,6 @@ public interface ApplicationService extends
IService<Application> {
String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception;
AppExistsState checkStart(Application app);
+
+ List<ApplicationReport> getYARNApplication(String appName);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 5ed782953..61bd08a90 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -656,7 +656,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return AppExistsState.INVALID;
}
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- boolean exists =
!getApplicationReports(application.getJobName()).isEmpty();
+ boolean exists = !getYARNApplication(application.getJobName()).isEmpty();
return exists ? AppExistsState.IN_YARN : AppExistsState.NO;
}
// todo on k8s check...
@@ -1451,7 +1451,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// check job on yarn is already running
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
ApiAlertException.throwIfTrue(
- !getApplicationReports(application.getJobName()).isEmpty(),
+ !getYARNApplication(application.getJobName()).isEmpty(),
"The same job name is already running in the yarn queue");
}
@@ -1763,7 +1763,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// kill application
if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
try {
- List<ApplicationReport> applications =
getApplicationReports(application.getJobName());
+ List<ApplicationReport> applications =
getYARNApplication(application.getJobName());
if (!applications.isEmpty()) {
YarnClient yarnClient = HadoopUtils.yarnClient();
yarnClient.killApplication(applications.get(0).getApplicationId());
@@ -1845,7 +1845,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
|| yarnQueueService.isDefaultQueue(application.getYarnQueue());
}
- private List<ApplicationReport> getApplicationReports(String jobName) {
+ @Override
+ public List<ApplicationReport> getYARNApplication(String appName) {
try {
YarnClient yarnClient = HadoopUtils.yarnClient();
Set<String> types =
@@ -1861,7 +1862,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Set<String> yarnTag = Sets.newHashSet("streampark");
List<ApplicationReport> applications = yarnClient.getApplications(types,
states, yarnTag);
return applications.stream()
- .filter(report -> report.getName().equals(jobName))
+ .filter(report -> report.getName().equals(appName))
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("The yarn api is abnormal. Ensure that yarn
is running properly.");
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 84e10e45a..f1e68bdbc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -151,10 +151,17 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Transactional(rollbackFor = {Exception.class})
public void start(Long id) {
FlinkCluster flinkCluster = getById(id);
+ ApiAlertException.throwIfTrue(
+
!applicationService.getYARNApplication(flinkCluster.getClusterName()).isEmpty(),
+ "The same job name: "
+ + flinkCluster.getClusterName()
+ + " is already running in the yarn queue");
+
try {
ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
DeployRequest deployRequest = getDeployRequest(flinkCluster);
log.info("deploy cluster request: " + deployRequest);
+
Future<DeployResponse> future =
executorService.submit(() -> FlinkClient.deploy(deployRequest));
DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
@@ -193,13 +200,15 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkEnv.getFlinkVersion(),
executionModeEnum,
flinkCluster.getProperties(),
- flinkCluster.getClusterId());
+ flinkCluster.getClusterId(),
+ flinkCluster.getClusterName());
case KUBERNETES_NATIVE_SESSION:
return KubernetesDeployRequest.apply(
flinkEnv.getFlinkVersion(),
executionModeEnum,
flinkCluster.getProperties(),
flinkCluster.getClusterId(),
+ flinkCluster.getClusterName(),
flinkCluster.getK8sNamespace(),
flinkCluster.getK8sConf(),
flinkCluster.getServiceAccount(),
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index 81b36a17b..936af108e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -31,7 +31,8 @@ case class DeployRequest(
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
properties: JavaMap[String, Any],
- clusterId: String) {
+ clusterId: String,
+ clusterName: String) {
private[client] lazy val hdfsWorkspace = {
@@ -65,12 +66,13 @@ class KubernetesDeployRequest(
override val executionMode: ExecutionMode,
override val properties: JavaMap[String, Any],
override val clusterId: String,
+ override val clusterName: String,
val kubernetesNamespace: String =
KubernetesConfigOptions.NAMESPACE.defaultValue(),
val kubeConf: String = "~/.kube/config",
val serviceAccount: String =
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
val flinkImage: String =
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
val flinkRestExposedType: FlinkK8sRestExposedType =
FlinkK8sRestExposedType.CLUSTER_IP)
- extends DeployRequest(flinkVersion, executionMode, properties, clusterId)
+ extends DeployRequest(flinkVersion, executionMode, properties, clusterId,
clusterName)
object KubernetesDeployRequest {
def apply(
@@ -78,6 +80,7 @@ object KubernetesDeployRequest {
executionMode: ExecutionMode,
properties: JavaMap[String, Any],
clusterId: String,
+ clusterName: String,
kubernetesNamespace: String,
kubeConf: String,
serviceAccount: String,
@@ -88,6 +91,7 @@ object KubernetesDeployRequest {
executionMode,
properties,
clusterId,
+ clusterName,
kubernetesNamespace,
kubeConf,
serviceAccount,
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 8fa9922ad..328806c68 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
@@ -58,7 +59,10 @@ object YarnSessionClient extends YarnClientTrait {
* @param deployRequest
* @param flinkConfig
*/
- def deployClusterConfig(deployRequest: DeployRequest, flinkConfig:
Configuration): Unit = {
+ private def deployClusterConfig(
+ deployRequest: DeployRequest,
+ flinkConfig: Configuration): Unit = {
+
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
deployRequest.flinkVersion.flinkHome)
val currentUser = UserGroupInformation.getCurrentUser
@@ -86,6 +90,10 @@ object YarnSessionClient extends YarnClientTrait {
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
// conf dir
.safeSet(DeploymentOptionsInternal.CONF_DIR,
s"${deployRequest.flinkVersion.flinkHome}/conf")
+ // app tags
+ .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
+ // app name
+ .safeSet(YarnConfigOptions.APPLICATION_NAME, deployRequest.clusterName)
logInfo(s"""
|------------------------------------------------------------------
@@ -164,10 +172,12 @@ object YarnSessionClient extends YarnClientTrait {
try {
val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
+
deployClusterConfig(deployRequest, flinkConfig)
+
val yarnClusterDescriptor =
getSessionClusterDeployDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
- if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty)
{
+ if (StringUtils.isNotBlank(deployRequest.clusterId)) {
try {
val applicationStatus =
clusterDescriptor.getYarnClient
@@ -183,8 +193,7 @@ object YarnSessionClient extends YarnClientTrait {
}
}
} catch {
- case _: ApplicationNotFoundException =>
- logInfo("this applicationId have not managed by yarn ,need deploy
...")
+ case e: ApplicationNotFoundException => return DeployResponse(null,
null, e)
}
}
val clientProvider =
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)