This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 89a653d75 feat: support submit pyspark once job on k8s and add
clusterlabel to combinedlabel (#4906)
89a653d75 is described below
commit 89a653d75359b912bffade16426b0ab4b850d01a
Author: zlucelia <[email protected]>
AuthorDate: Thu Sep 21 21:45:40 2023 +0800
feat: support submit pyspark once job on k8s and add clusterlabel to
combinedlabel (#4906)
* feat: support submit pyspark once job on k8s
* feat: modify variable name
* feat: add method to build k8s client from kubeConfig
* feat: add Spark UI port configuration for spark on k8s once job
* feat: rename userCreatorEngineTypeLabel
* feat: merge podIP and port into url
* fix: replace 'empty' with 'blank'
---
.../manager/label/conf/LabelManagerConf.java | 3 ++
.../linkis/manager/rm/domain/RMLabelContainer.java | 49 +++++++++++++++----
.../kubernetes/KubernetesResourceRequester.java | 57 ++++++++++++++++------
.../manager/rm/message/RMMessageService.java | 4 +-
.../manager/rm/service/RequestResourceService.java | 6 +--
.../rm/service/impl/DefaultResourceManager.java | 32 +++++-------
.../rm/service/impl/ResourceLogService.java | 11 ++---
.../spark/client/context/SparkConfig.java | 24 +++++++++
...ernetesApplicationClusterDescriptorAdapter.java | 6 ++-
.../spark/config/SparkConfiguration.scala | 3 ++
.../SparkOnKubernetesSubmitOnceExecutor.scala | 13 ++---
.../spark/factory/SparkEngineConnFactory.scala | 2 +
.../spark/factory/SparkOnceExecutorFactory.scala | 3 ++
.../spark/utils/SparkJobProgressUtil.scala | 26 ++++++----
14 files changed, 161 insertions(+), 78 deletions(-)
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java
index f43625491..9aa5ff797 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java
@@ -23,4 +23,7 @@ public class LabelManagerConf {
public static final String LONG_LIVED_LABEL =
CommonVars.apply("wds.linkis.label.node.long.lived.label.keys",
"tenant").getValue();
+
+ public static final boolean COMBINED_WITHOUT_YARN_DEFAULT =
+ CommonVars.apply("linkis.combined.without.yarn.default",
true).getValue();
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java
index 5bda33919..9d3140267 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java
@@ -18,10 +18,13 @@
package org.apache.linkis.manager.rm.domain;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
+import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.label.builder.CombinedLabelBuilder;
+import org.apache.linkis.manager.label.conf.LabelManagerConf;
import org.apache.linkis.manager.label.entity.CombinedLabel;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.ResourceLabel;
+import org.apache.linkis.manager.label.entity.cluster.ClusterLabel;
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
@@ -49,7 +52,8 @@ public class RMLabelContainer {
private EngineTypeLabel engineTypeLabel;
private UserCreatorLabel userCreatorLabel;
private EngineInstanceLabel engineInstanceLabel;
- private CombinedLabel combinedUserCreatorEngineTypeLabel;
+ private ClusterLabel clusterLabel;
+ private CombinedLabel combinedResourceLabel;
private Label currentLabel;
public RMLabelContainer(List<Label<?>> labels) {
@@ -57,14 +61,16 @@ public class RMLabelContainer {
this.lockedLabels = Lists.newArrayList();
try {
if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) {
- this.combinedUserCreatorEngineTypeLabel =
- (CombinedLabel)
- combinedLabelBuilder.build(
- "", Lists.newArrayList(getUserCreatorLabel(),
getEngineTypeLabel()));
- this.labels.add(combinedUserCreatorEngineTypeLabel);
+ List<Label> combinedLabel = Lists.newArrayList(getUserCreatorLabel(),
getEngineTypeLabel());
+ ClusterLabel clusterLabel = getClusterLabel();
+ if (shouldCombinedClusterLabel(clusterLabel)) {
+ combinedLabel.add(clusterLabel);
+ }
+ this.combinedResourceLabel = (CombinedLabel)
combinedLabelBuilder.build("", combinedLabel);
+ this.labels.add(combinedResourceLabel);
}
} catch (Exception e) {
- logger.warn("failed to get combinedUserCreatorEngineTypeLabel", e);
+ logger.warn("failed to get combinedResourceLabel", e);
}
this.labels = LabelUtils.distinctLabel(this.labels, labels);
}
@@ -156,8 +162,31 @@ public class RMLabelContainer {
return null;
}
- public CombinedLabel getCombinedUserCreatorEngineTypeLabel() {
- return combinedUserCreatorEngineTypeLabel;
+ public ClusterLabel getClusterLabel() {
+ if (clusterLabel == null) {
+ for (Label label : labels) {
+ if (label instanceof ClusterLabel) {
+ return (ClusterLabel) label;
+ }
+ }
+ } else {
+ return clusterLabel;
+ }
+ logger.warn("ClusterLabel not found");
+ return null;
+ }
+
+ private boolean shouldCombinedClusterLabel(ClusterLabel clusterLabel) {
+ return !(clusterLabel == null
+ || (LabelManagerConf.COMBINED_WITHOUT_YARN_DEFAULT
+ && clusterLabel
+ .getClusterName()
+ .equals(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue())
+ &&
clusterLabel.getClusterType().equals(RMConfiguration.DEFAULT_YARN_TYPE.getValue())));
+ }
+
+ public CombinedLabel getCombinedResourceLabel() {
+ return combinedResourceLabel;
}
public Label getCurrentLabel() {
@@ -195,6 +224,8 @@ public class RMLabelContainer {
+ userCreatorLabel
+ ", engineInstanceLabel="
+ engineInstanceLabel
+ + ", clusterLabel="
+ + clusterLabel
+ ", currentLabel="
+ currentLabel
+ '}';
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
index 3bc7004e3..c886b622a 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
@@ -24,8 +24,11 @@ import
org.apache.linkis.manager.rm.external.domain.ExternalResourceProvider;
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,8 +36,10 @@ import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceQuota;
import io.fabric8.kubernetes.api.model.metrics.v1beta1.NodeMetrics;
+import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +50,7 @@ public class KubernetesResourceRequester implements
ExternalResourceRequester {
@Override
public NodeResource requestResourceInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider
provider) {
- String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
+ String k8sMasterUrl = getK8sMasterUrl(provider);
DefaultKubernetesClient client = clientMap.get(k8sMasterUrl);
if (client == null) {
constructKubernetesClient(provider);
@@ -154,8 +159,7 @@ public class KubernetesResourceRequester implements
ExternalResourceRequester {
@Override
public Boolean reloadExternalResourceAddress(ExternalResourceProvider
provider) {
if (null != provider) {
- DefaultKubernetesClient client =
- clientMap.get((String) provider.getConfigMap().get("k8sMasterUrl"));
+ DefaultKubernetesClient client =
clientMap.get(getK8sMasterUrl(provider));
if (client != null) {
client.close();
}
@@ -164,19 +168,42 @@ public class KubernetesResourceRequester implements
ExternalResourceRequester {
return true;
}
+ private String getK8sMasterUrl(ExternalResourceProvider provider) {
+ Map<String, Object> configMap = provider.getConfigMap();
+ String k8sMasterUrl = (String) configMap.get("k8sMasterUrl");
+ if (StringUtils.isBlank(k8sMasterUrl)) {
+ throw new IllegalArgumentException("k8sMasterUrl is empty, please check
the configuration.");
+ }
+ return k8sMasterUrl;
+ }
+
private void constructKubernetesClient(ExternalResourceProvider provider) {
- String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
- String k8sClientCertData = (String)
provider.getConfigMap().get("k8sClientCertData");
- String k8sClientKeyData = (String)
provider.getConfigMap().get("k8sClientKeyData");
- String k8sCaCertData = (String)
provider.getConfigMap().get("k8sCaCertData");
- DefaultKubernetesClient client =
- new DefaultKubernetesClient(
- new ConfigBuilder()
- .withMasterUrl(k8sMasterUrl)
- .withClientCertData(k8sClientCertData)
- .withClientKeyData(k8sClientKeyData)
- .withCaCertData(k8sCaCertData)
- .build());
+ DefaultKubernetesClient client;
+ Map<String, Object> configMap = provider.getConfigMap();
+ String k8sMasterUrl = getK8sMasterUrl(provider);
+ try {
+ String k8sConfig = (String) configMap.get("k8sConfig");
+ if (StringUtils.isNotBlank(k8sConfig)) {
+ Config kubeConfig =
+ Config.fromKubeconfig(
+ null, FileUtils.readFileToString(new File(k8sConfig),
"UTF-8"), null);
+ client = new DefaultKubernetesClient(kubeConfig);
+ } else {
+ String k8sClientCertData = (String) configMap.get("k8sClientCertData");
+ String k8sClientKeyData = (String) configMap.get("k8sClientKeyData");
+ String k8sCaCertData = (String) configMap.get("k8sCaCertData");
+ client =
+ new DefaultKubernetesClient(
+ new ConfigBuilder()
+ .withMasterUrl(k8sMasterUrl)
+ .withClientCertData(k8sClientCertData)
+ .withClientKeyData(k8sClientKeyData)
+ .withCaCertData(k8sCaCertData)
+ .build());
+ }
+ } catch (Exception e) {
+ throw new KubernetesClientException("Fail to build k8s client. ", e);
+ }
clientMap.put(k8sMasterUrl, client);
}
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/message/RMMessageService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/message/RMMessageService.java
index ff58f933b..88985bf52 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/message/RMMessageService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/message/RMMessageService.java
@@ -53,9 +53,7 @@ public class RMMessageService {
} catch (Exception e) {
RMLabelContainer nodeLabels = new RMLabelContainer(labels);
String value =
- Optional.of(nodeLabels.getCombinedUserCreatorEngineTypeLabel())
- .map(Object::toString)
- .orElse("");
+
Optional.of(nodeLabels.getCombinedResourceLabel()).map(Object::toString).orElse("");
logger.warn(
String.format(
"usedResource failed, request from:%s, request engine: %s, ",
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
index 4c1237cc4..c88b39e52 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
@@ -59,9 +59,7 @@ public abstract class RequestResourceService {
NodeResource labelResource =
labelResourceService.getLabelResource(labelContainer.getCurrentLabel());
Resource requestResource = resource.getMinResource();
- if (labelContainer
- .getCombinedUserCreatorEngineTypeLabel()
- .equals(labelContainer.getCurrentLabel())) {
+ if
(labelContainer.getCombinedResourceLabel().equals(labelContainer.getCurrentLabel()))
{
if (labelResource == null) {
labelResource = new CommonNodeResource();
labelResource.setResourceType(resource.getResourceType());
@@ -92,7 +90,7 @@ public abstract class RequestResourceService {
labelResourceService.setLabelResource(
labelContainer.getCurrentLabel(),
labelResource,
-
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+ labelContainer.getCombinedResourceLabel().getStringValue());
logger.debug(
labelContainer.getCurrentLabel()
+ " to request ["
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
index 7ecf3f48d..672963e30 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
@@ -247,8 +247,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
List<PersistenceLock> persistenceLocks = new ArrayList<>();
EMInstanceLabel emInstanceLabel = labelContainer.getEMInstanceLabel();
- CombinedLabel userCreatorEngineTypeLabel =
- labelContainer.getCombinedUserCreatorEngineTypeLabel();
+ CombinedLabel combinedLabel = labelContainer.getCombinedResourceLabel();
try {
// check ecm resource if not enough return
@@ -266,14 +265,12 @@ public class DefaultResourceManager extends
ResourceManager implements Initializ
// lock userCreatorEngineTypeLabel
persistenceLocks.add(
- tryLockOneLabel(
- userCreatorEngineTypeLabel, wait,
labelContainer.getUserCreatorLabel().getUser()));
+ tryLockOneLabel(combinedLabel, wait,
labelContainer.getUserCreatorLabel().getUser()));
try {
- labelContainer.setCurrentLabel(userCreatorEngineTypeLabel);
+ labelContainer.setCurrentLabel(combinedLabel);
if (!requestResourceService.canRequest(labelContainer, resource)) {
return new NotEnoughResource(
- String.format(
- "Labels:%s not enough resource",
userCreatorEngineTypeLabel.getStringValue()));
+ String.format("Labels:%s not enough resource",
combinedLabel.getStringValue()));
}
} catch (RMWarnException exception) {
return new NotEnoughResource(exception.getMessage());
@@ -294,9 +291,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
labelResource.setLockedResource(
labelResource.getLockedResource().add(resource.getLockedResource()));
labelResourceService.setLabelResource(
- label,
- labelResource,
-
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+ label, labelResource,
labelContainer.getCombinedResourceLabel().getStringValue());
logger.info(
String.format(
"ResourceChanged:%s --> %s", label.getStringValue(),
labelResource.toString()));
@@ -330,9 +325,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
// add ec resource
labelResourceService.setEngineConnLabelResource(
- engineInstanceLabel,
- resource,
-
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+ engineInstanceLabel, resource,
labelContainer.getCombinedResourceLabel().getStringValue());
// record engine locked resource
labelContainer.getLabels().add(engineInstanceLabel);
resourceLogService.recordUserResourceAction(
@@ -441,7 +434,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
labelResourceService.setLabelResource(
engineInstanceLabel,
lockedResource,
-
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+ labelContainer.getCombinedResourceLabel().getStringValue());
resourceLogService.success(
ChangeType.ENGINE_INIT, lockedResource.getLockedResource(),
engineInstanceLabel, null);
} catch (Exception exception) {
@@ -481,7 +474,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
labelResourceService.setLabelResource(
label,
labelResource,
-
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+
labelContainer.getCombinedResourceLabel().getStringValue());
labelResourceSet.add(
new LabelResourceMapping(label, addedResource,
ResourceOperationType.USED));
resourceCheck(label, labelResource);
@@ -493,7 +486,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
if (label
.getClass()
-
.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass()))
{
+
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId(),
@@ -704,9 +697,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
labelResourceService.setLabelResource(
label,
labelResource,
- labelContainer
- .getCombinedUserCreatorEngineTypeLabel()
- .getStringValue());
+
labelContainer.getCombinedResourceLabel().getStringValue());
resourceCheck(label, labelResource);
}
},
@@ -725,8 +716,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
if (label
.getClass()
- .isAssignableFrom(
-
labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass())) {
+
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId(),
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/ResourceLogService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/ResourceLogService.java
index dd563f5a9..c5890132c 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/ResourceLogService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/ResourceLogService.java
@@ -150,11 +150,10 @@ public class ResourceLogService {
if (RMUtils.RM_RESOURCE_ACTION_RECORD.getValue()) {
LinkisUtils.tryAndWarn(
() -> {
- CombinedLabel userCreatorEngineType =
- labelContainer.getCombinedUserCreatorEngineTypeLabel();
+ CombinedLabel combinedLabel =
labelContainer.getCombinedResourceLabel();
EngineInstanceLabel engineInstanceLabel =
labelContainer.getEngineInstanceLabel();
EMInstanceLabel eMInstanceLabel =
labelContainer.getEMInstanceLabel();
- if (userCreatorEngineType == null) {
+ if (combinedLabel == null) {
return;
}
ECResourceInfoRecord ecResourceInfoRecord =
@@ -171,11 +170,7 @@ public class ResourceLogService {
: "";
ecResourceInfoRecord =
new ECResourceInfoRecord(
- userCreatorEngineType.getStringValue(),
- user,
- ticketId,
- resource,
- logDirSuffix);
+ combinedLabel.getStringValue(), user, ticketId,
resource, logDirSuffix);
ecResourceRecordMapper.insertECResourceInfoRecord(ecResourceInfoRecord);
}
if (engineInstanceLabel != null) {
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 37a0e2c98..1768b77d0 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -53,6 +53,7 @@ public class SparkConfig {
private String k8sDriverRequestCores;
private String k8sExecutorRequestCores;
+ private String k8sSparkUIPort;
private String deployMode = "client"; // ("client") // todo cluster
private String appResource; // ("")
private String appName; // ("")
@@ -78,6 +79,7 @@ public class SparkConfig {
private String principal; // ("--principal", "")
private String keytab; // ("--keytab", "")
private String queue; // ("--queue", "")
+ private String pyFiles; // ("--py-files", "")
public String getK8sFileUploadPath() {
return k8sFileUploadPath;
@@ -195,6 +197,14 @@ public class SparkConfig {
this.k8sExecutorRequestCores = k8sExecutorRequestCores;
}
+ public String getK8sSparkUIPort() {
+ return k8sSparkUIPort;
+ }
+
+ public void setK8sSparkUIPort(String k8sSparkUIPort) {
+ this.k8sSparkUIPort = k8sSparkUIPort;
+ }
+
public String getJavaHome() {
return javaHome;
}
@@ -419,6 +429,14 @@ public class SparkConfig {
this.queue = queue;
}
+ public String getPyFiles() {
+ return pyFiles;
+ }
+
+ public void setPyFiles(String pyFiles) {
+ this.pyFiles = pyFiles;
+ }
+
@Override
public String toString() {
return "SparkConfig{"
@@ -467,6 +485,9 @@ public class SparkConfig {
+ ", k8sExecutorRequestCores='"
+ k8sExecutorRequestCores
+ '\''
+ + ", k8sSparkUIPort='"
+ + k8sSparkUIPort
+ + '\''
+ ", deployMode='"
+ deployMode
+ '\''
@@ -534,6 +555,9 @@ public class SparkConfig {
+ ", queue='"
+ queue
+ '\''
+ + ", pyFiles='"
+ + pyFiles
+ + '\''
+ '}';
}
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
index 0ee0380fb..ce709b2e7 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
@@ -72,6 +72,7 @@ public class KubernetesApplicationClusterDescriptorAdapter
extends ClusterDescri
this.driverPodName = generateDriverPodName(sparkConfig.getAppName());
this.namespace = sparkConfig.getK8sNamespace();
setConf(sparkLauncher, "spark.app.name", sparkConfig.getAppName());
+ setConf(sparkLauncher, "spark.ui.port", sparkConfig.getK8sSparkUIPort());
setConf(sparkLauncher, "spark.kubernetes.namespace", this.namespace);
setConf(sparkLauncher, "spark.kubernetes.container.image",
sparkConfig.getK8sImage());
setConf(sparkLauncher, "spark.kubernetes.driver.pod.name",
this.driverPodName);
@@ -111,6 +112,7 @@ public class KubernetesApplicationClusterDescriptorAdapter
extends ClusterDescri
addSparkArg(sparkLauncher, "--num-executors",
sparkConfig.getNumExecutors().toString());
addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
+ addSparkArg(sparkLauncher, "--py-files", sparkConfig.getPyFiles());
sparkLauncher.setAppResource(sparkConfig.getAppResource());
sparkLauncher.setMainClass(mainClass);
Arrays.stream(args.split("\\s+"))
@@ -164,12 +166,12 @@ public class
KubernetesApplicationClusterDescriptorAdapter extends ClusterDescri
return client.pods().inNamespace(namespace).withName(driverPodName).get();
}
- public String getSparkDriverPodIP() {
+ public String getSparkUIUrl() {
Pod sparkDriverPod = getSparkDriverPod();
if (null != sparkDriverPod) {
String sparkDriverPodIP = sparkDriverPod.getStatus().getPodIP();
if (StringUtils.isNotBlank(sparkDriverPodIP)) {
- return sparkDriverPodIP;
+ return sparkDriverPodIP + ":" + this.sparkConfig.getK8sSparkUIPort();
} else {
logger.info("spark driver pod IP is null, the application may be
pending");
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index b42fc0934..bb079b7b5 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -68,6 +68,7 @@ object SparkConfiguration extends Logging {
val SPARK_K8S_RESTART_POLICY =
CommonVars[String]("linkis.spark.k8s.restartPolicy", "Never")
val SPARK_K8S_SPARK_VERSION =
CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace",
"default")
+ val SPARK_K8S_UI_PORT = CommonVars[String]("linkis.spark.k8s.ui.port",
"4040")
val SPARK_K8S_EXECUTOR_REQUEST_CORES =
CommonVars[String]("linkis.spark.k8s.executor.request.cores", "1")
@@ -80,6 +81,8 @@ object SparkConfiguration extends Logging {
val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version",
"python")
+ val SPARK_PYTHON_FILES = CommonVars[String]("spark.submit.pyFiles", "")
+
val SPARK_PYTHON_TEST_MODE_ENABLE =
CommonVars[Boolean]("linkis.spark.python.test.mode.enable", false)
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
index 1c3873942..e8b7dfb48 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
@@ -132,9 +132,10 @@ class SparkOnKubernetesSubmitOnceExecutor(
if (oldProgress >= 1 || jobIsFinal) {
1
} else {
- val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
- if (StringUtils.isNotBlank(sparkDriverPodIP)) {
- val newProgress =
SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP)
+ val sparkUIUrl = this.clusterDescriptorAdapter.getSparkUIUrl
+ if (StringUtils.isNotBlank(sparkUIUrl)) {
+ val newProgress =
+ SparkJobProgressUtil.getProgress(this.getApplicationId, sparkUIUrl)
if (newProgress > oldProgress) {
oldProgress = newProgress
}
@@ -144,9 +145,9 @@ class SparkOnKubernetesSubmitOnceExecutor(
}
override def getProgressInfo: Array[JobProgressInfo] = {
- val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
- if (StringUtils.isNotBlank(sparkDriverPodIP)) {
- SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId,
sparkDriverPodIP)
+ val sparkUIUrl = this.clusterDescriptorAdapter.getSparkUIUrl
+ if (StringUtils.isNotBlank(sparkUIUrl)) {
+ SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId,
sparkUIUrl)
} else {
Array.empty
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index fbd38bcc6..a237b544b 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -113,6 +113,7 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options))
sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options))
sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options))
+ sparkConfig.setK8sSparkUIPort(SPARK_K8S_UI_PORT.getValue(options))
}
if (master.startsWith("yarn")) {
@@ -134,6 +135,7 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
sparkConfig.setExecutorCores(LINKIS_SPARK_EXECUTOR_CORES.getValue(options))
sparkConfig.setNumExecutors(LINKIS_SPARK_EXECUTOR_INSTANCES.getValue(options))
sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options))
+ sparkConfig.setPyFiles(SPARK_PYTHON_FILES.getValue(options))
logger.info(s"spark_info: ${sparkConfig}")
sparkConfig
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
index 12a87e22f..5802a1c85 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
@@ -56,5 +56,8 @@ class SparkOnceExecutorFactory extends OnceExecutorFactory {
}
}
+ override protected def getSupportRunTypes: Array[String] =
+ Array(RunType.JAR.toString, RunType.PYSPARK.toString)
+
override protected def getRunType: RunType = RunType.JAR
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
index 6968ffb61..94614f902 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
@@ -31,11 +31,12 @@ import java.util
object SparkJobProgressUtil extends Logging {
- def getProgress(applicationId: String, podIP: String = ""): Float = {
+ def getProgress(applicationId: String, sparkUIUrl: String = ""): Float = {
if (StringUtils.isBlank(applicationId)) return 0f
val sparkJobsResult =
- if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
- else getKubernetesSparkJobInfo(applicationId, podIP)
+ if (StringUtils.isBlank(sparkUIUrl))
+ getSparkJobInfo(applicationId)
+ else getKubernetesSparkJobInfo(applicationId, sparkUIUrl)
if (sparkJobsResult.isEmpty) return 0f
val tuple = sparkJobsResult
.filter(sparkJobResult => {
@@ -52,10 +53,14 @@ object SparkJobProgressUtil extends Logging {
tuple._2.toFloat / tuple._1
}
- def getSparkJobProgressInfo(applicationId: String, podIP: String = ""):
Array[JobProgressInfo] = {
+ def getSparkJobProgressInfo(
+ applicationId: String,
+ sparkUIUrl: String = ""
+ ): Array[JobProgressInfo] = {
val sparkJobsResult =
- if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
- else getKubernetesSparkJobInfo(applicationId, podIP)
+ if (StringUtils.isBlank(sparkUIUrl))
+ getSparkJobInfo(applicationId)
+ else getKubernetesSparkJobInfo(applicationId, sparkUIUrl)
if (sparkJobsResult.isEmpty) {
Array.empty
} else {
@@ -104,11 +109,11 @@ object SparkJobProgressUtil extends Logging {
def getKubernetesSparkJobInfo(
applicationId: String,
- podIP: String
+ sparkUIUrl: String
): Array[java.util.Map[String, Object]] =
- if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP))
Array.empty
+ if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(sparkUIUrl))
Array.empty
else {
- val getSparkJobsStateUrl =
s"http://$podIP:4040/api/v1/applications/$applicationId"
+ val getSparkJobsStateUrl =
s"http://$sparkUIUrl/api/v1/applications/$applicationId"
logger.info(s"get spark job state from kubernetes spark ui, url:
$getSparkJobsStateUrl")
val appStateResult =
JsonUtils.jackson.readValue(
@@ -121,7 +126,8 @@ object SparkJobProgressUtil extends Logging {
appAttemptList.get(appAttemptList.size() -
1).asInstanceOf[util.Map[String, Object]]
val isLastAttemptCompleted =
appLastAttempt.get("completed").asInstanceOf[Boolean]
if (isLastAttemptCompleted) return Array.empty
- val getSparkJobsInfoUrl =
s"http://$podIP:4040/api/v1/applications/$applicationId/jobs"
+ val getSparkJobsInfoUrl =
+ s"http://$sparkUIUrl/api/v1/applications/$applicationId/jobs"
logger.info(s"get spark job info from kubernetes spark ui:
$getSparkJobsInfoUrl")
val jobs = get(getSparkJobsInfoUrl)
if (StringUtils.isBlank(jobs)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]