This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 5a58d73d4 Fix spark yarn cluster mode bug (#4872)
5a58d73d4 is described below
commit 5a58d73d480f167506ea3bb5c0c6a1546e55e1e2
Author: ChengJie1053 <[email protected]>
AuthorDate: Thu Aug 24 17:02:03 2023 +0800
Fix spark yarn cluster mode bug (#4872)
* fix spark yarn cluster mode bug
* fix spark yarn cluster mode bug
---
.../engineconn/callback/hook/CallbackEngineConnHook.scala | 7 ++++---
.../am/service/impl/DefaultEngineConnPidCallbackService.java | 10 +++++-----
.../linkis/manager/rm/service/impl/DefaultResourceManager.java | 1 +
3 files changed, 10 insertions(+), 8 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
index adcbb1a69..07cfa51d0 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
@@ -61,15 +61,16 @@ class CallbackEngineConnHook extends EngineConnHook with
Logging {
newMap.put("spring.mvc.servlet.path",
ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue)
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap))
- val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
- Utils.tryAndError(engineConnIdentifierCallback.callback())
logger.info("<--------------------SpringBoot App init
succeed-------------------->")
}
override def beforeExecutionExecute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
- ): Unit = {}
+ ): Unit = {
+ val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
+ Utils.tryAndError(engineConnIdentifierCallback.callback())
+ }
override def afterExecutionExecute(
engineCreationContext: EngineCreationContext,
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
index 4acfb70f9..5fbbb7c32 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
@@ -54,7 +54,8 @@ public class DefaultEngineConnPidCallbackService extends
AbstractEngineService
protocol.pid(),
protocol.ticketId());
- EngineNode engineNode =
defaultEngineNodeManager.getEngineNode(protocol.serviceInstance());
+ EngineNode engineNode =
+
defaultEngineNodeManager.getEngineNodeInfoByTicketId(protocol.ticketId());
if (engineNode == null) {
logger.error(
"DefaultEngineConnPidCallbackService dealPid failed, engineNode is
null, serviceInstance:{}",
@@ -63,13 +64,12 @@ public class DefaultEngineConnPidCallbackService extends
AbstractEngineService
}
engineNode.setIdentifier(protocol.pid());
-
+ ServiceInstance oldServiceInstance = engineNode.getServiceInstance();
if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
ServiceInstance serviceInstance = protocol.serviceInstance();
engineNode.setServiceInstance(serviceInstance);
- getEngineNodeManager().updateEngineNode(serviceInstance, engineNode);
- nodeLabelService.labelsFromInstanceToNewInstance(
- engineNode.getServiceInstance(), serviceInstance);
+ getEngineNodeManager().updateEngineNode(oldServiceInstance, engineNode);
+ nodeLabelService.labelsFromInstanceToNewInstance(oldServiceInstance,
serviceInstance);
}
defaultEngineNodeManager.updateEngine(engineNode);
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
index c40270145..7ecf3f48d 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
@@ -316,6 +316,7 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
engineNode.setServiceInstance(
ServiceInstance.apply(labelContainer.getEngineServiceName(),
tickedId));
engineNode.setNodeResource(resource);
+ engineNode.setTicketId(tickedId);
nodeManagerPersistence.addEngineNode(engineNode);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]