This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a58d73d4 Fix spark yarn cluster mode bug (#4872)
5a58d73d4 is described below

commit 5a58d73d480f167506ea3bb5c0c6a1546e55e1e2
Author: ChengJie1053 <[email protected]>
AuthorDate: Thu Aug 24 17:02:03 2023 +0800

    Fix spark yarn cluster mode bug (#4872)
    
    * fix spark yarn cluster mode bug
    
    * fix spark yarn cluster mode bug
---
 .../engineconn/callback/hook/CallbackEngineConnHook.scala      |  7 ++++---
 .../am/service/impl/DefaultEngineConnPidCallbackService.java   | 10 +++++-----
 .../linkis/manager/rm/service/impl/DefaultResourceManager.java |  1 +
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
index adcbb1a69..07cfa51d0 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
@@ -61,15 +61,16 @@ class CallbackEngineConnHook extends EngineConnHook with 
Logging {
     newMap.put("spring.mvc.servlet.path", 
ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue)
     
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap))
 
-    val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
-    Utils.tryAndError(engineConnIdentifierCallback.callback())
     logger.info("<--------------------SpringBoot App init 
succeed-------------------->")
   }
 
   override def beforeExecutionExecute(
       engineCreationContext: EngineCreationContext,
       engineConn: EngineConn
-  ): Unit = {}
+  ): Unit = {
+    val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
+    Utils.tryAndError(engineConnIdentifierCallback.callback())
+  }
 
   override def afterExecutionExecute(
       engineCreationContext: EngineCreationContext,
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
index 4acfb70f9..5fbbb7c32 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
@@ -54,7 +54,8 @@ public class DefaultEngineConnPidCallbackService extends 
AbstractEngineService
         protocol.pid(),
         protocol.ticketId());
 
-    EngineNode engineNode = 
defaultEngineNodeManager.getEngineNode(protocol.serviceInstance());
+    EngineNode engineNode =
+        
defaultEngineNodeManager.getEngineNodeInfoByTicketId(protocol.ticketId());
     if (engineNode == null) {
       logger.error(
           "DefaultEngineConnPidCallbackService dealPid failed, engineNode is 
null, serviceInstance:{}",
@@ -63,13 +64,12 @@ public class DefaultEngineConnPidCallbackService extends 
AbstractEngineService
     }
 
     engineNode.setIdentifier(protocol.pid());
-
+    ServiceInstance oldServiceInstance = engineNode.getServiceInstance();
     if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
       ServiceInstance serviceInstance = protocol.serviceInstance();
       engineNode.setServiceInstance(serviceInstance);
-      getEngineNodeManager().updateEngineNode(serviceInstance, engineNode);
-      nodeLabelService.labelsFromInstanceToNewInstance(
-          engineNode.getServiceInstance(), serviceInstance);
+      getEngineNodeManager().updateEngineNode(oldServiceInstance, engineNode);
+      nodeLabelService.labelsFromInstanceToNewInstance(oldServiceInstance, 
serviceInstance);
     }
     defaultEngineNodeManager.updateEngine(engineNode);
   }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
index c40270145..7ecf3f48d 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
@@ -316,6 +316,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
     engineNode.setServiceInstance(
         ServiceInstance.apply(labelContainer.getEngineServiceName(), 
tickedId));
     engineNode.setNodeResource(resource);
+    engineNode.setTicketId(tickedId);
 
     nodeManagerPersistence.addEngineNode(engineNode);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to