Author: cwiklik
Date: Fri Sep 28 15:07:10 2018
New Revision: 1842258

URL: http://svn.apache.org/viewvc?rev=1842258&view=rev
Log:
UIMA-5864 improve detection of ghost processes

Modified:
    
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java
    
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
    
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
    
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java

Modified: 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java?rev=1842258&r1=1842257&r2=1842258&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java
 Fri Sep 28 15:07:10 2018
@@ -18,7 +18,7 @@
 */
 package org.apache.uima.ducc.agent;
 
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -35,8 +35,8 @@ public interface Agent extends ProcessLi
        
        public NodeIdentity getIdentity();
        
-       public HashMap<DuccId, IDuccProcess> getInventoryCopy();
-       public HashMap<DuccId,IDuccProcess> getInventoryRef();
+       public Map<DuccId, IDuccProcess> getInventoryCopy();
+       public Map<DuccId,IDuccProcess> getInventoryRef();
        public boolean isRogueProcess(String uid, 
Set<NodeUsersCollector.ProcessInfo> processList, NodeUsersCollector.ProcessInfo 
cpi ) throws Exception;       
        public void copyAllUserReservations(TreeMap<String,NodeUsersInfo> map);
        public RogueProcessReaper getRogueProcessReaper();

Modified: 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1842258&r1=1842257&r2=1842258&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
 Fri Sep 28 15:07:10 2018
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -120,7 +121,8 @@ public class NodeAgent extends AbstractD
   public static String cgroupFailureReason;
   // Map of known processes this agent is managing. This map is published
   // at regular intervals as part of agent's inventory update.
-  private HashMap<DuccId, IDuccProcess> inventory = new HashMap<DuccId, 
IDuccProcess>();
+  private Map<DuccId, IDuccProcess> inventory = 
+                 new ConcurrentHashMap<>();
 
   // Semaphore controlling access to inventory Map
   private Semaphore inventorySemaphore = new Semaphore(1);
@@ -663,23 +665,23 @@ public class NodeAgent extends AbstractD
    * Returns deep copy (by way of java serialization) of the Agents inventory.
    */
   @SuppressWarnings("unchecked")
-  public HashMap<DuccId, IDuccProcess> getInventoryCopy() {
+  public Map<DuccId, IDuccProcess> getInventoryCopy() {
     Object deepCopy = null;
     try {
       inventorySemaphore.acquire();
-      deepCopy = SerializationUtils.clone((HashMap<DuccId, IDuccProcess>) 
inventory);
+      deepCopy = SerializationUtils.clone((ConcurrentHashMap<DuccId, 
IDuccProcess>) inventory);
     } catch (InterruptedException e) {
     } finally {
       inventorySemaphore.release();
     }
-    return (HashMap<DuccId, IDuccProcess>) deepCopy;
+    return (Map<DuccId, IDuccProcess>)deepCopy;
   }
 
   /**
    * Returns shallow copy of the Agent's inventory
    */
-  public HashMap<DuccId, IDuccProcess> getInventoryRef() {
-    return (HashMap<DuccId, IDuccProcess>) inventory;
+  public Map<DuccId, IDuccProcess> getInventoryRef() {
+    return inventory;
   }
 
   /*
@@ -777,6 +779,69 @@ public class NodeAgent extends AbstractD
     }
   }
 
+       private void stopProcessIfAlive(IDuccProcess process, 
ProcessLifecycleController lifecycleController) {
+               String methodName = "stopProcessIfAlive";
+               if (isAlive(process)) {
+                       logger.error(methodName, null,
+                                       "<<<<<<<<< Stopping Process with no Job 
Assignement (Ghost Process) - DuccId:" + process.getDuccId()
+                                                       + " PID:" + 
process.getPID());
+                       
process.setReasonForStoppingProcess(ReasonForStoppingProcess.JPHasNoActiveJob.toString());
+                       lifecycleController.stopProcess(process);
+               } else {
+                       logger.error(methodName, null, "XXXXXXXXXX Purging 
Process:" + process.getDuccId() + " Process State:"
+                                       + process.getProcessState() + " Process 
Resource State:" + process.getResourceState());
+
+                       getInventoryRef().remove(process.getDuccId());
+               }
+       }
+
+       /*
+        * Valid process exists in agent inventory and in an incoming OR state. 
If
+        * process exists in agent inventory but not in OR state than such 
process is
+        * invalid
+        */
+       private boolean validProcess(IDuccProcess process, 
List<IDuccJobDeployment> jobDeploymentList) {
+               // iterate over all jobs
+               for (IDuccJobDeployment job : jobDeploymentList) {
+                       // check if current process is a JD
+                       if ((job.getJdProcess() != null && 
process.getDuccId().equals(job.getJdProcess().getDuccId()))) {
+                               return true;
+                       } else {
+                               // check if current process is a JP
+                               for (IDuccProcess jProcess : 
job.getJpProcessList()) {
+                                       if 
(process.getDuccId().equals(jProcess.getDuccId())) {
+                                               return true;
+                                       }
+                               }
+                       }
+               }
+               return false;
+       }
+
+       public void takeDownProcessWithNoJobV2(ProcessLifecycleController 
lifecycleController,
+                       List<IDuccJobDeployment> jobDeploymentList) {
+               String methodName = "takeDownProcessWithNoJobV2";
+               try {
+                       inventorySemaphore.acquire();
+
+                       // iterate over all processes in agent inventory
+                       for (Entry<DuccId, IDuccProcess> processEntry : 
getInventoryRef().entrySet()) {
+                               // if OR deployment list is empty, take down 
all agent processes that
+                               // are in the inventory
+                               if (jobDeploymentList.isEmpty() || 
!validProcess(processEntry.getValue(), jobDeploymentList)) {
+                                       logger.info(methodName, null,
+                                                       "...Agent Process:" + 
processEntry.getValue().getDuccId() + " Not in OR JobDeploymentList");
+                                       
stopProcessIfAlive(processEntry.getValue(), lifecycleController);
+                               }
+                       }
+               } catch (Exception e) {
+                       logger.error(methodName, null, e);
+               } finally {
+                       inventorySemaphore.release();
+               }
+       }  
+  
+  
   /**
    * Reconciles agent inventory with job processes sent by PM
    *

Modified: 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java?rev=1842258&r1=1842257&r2=1842258&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
 Fri Sep 28 15:07:10 2018
@@ -290,7 +290,7 @@ public class AgentEventListener implemen
                                // Stop any process that is in this Agent's 
inventory but not
                                // associated with any
                                // of the jobs we just received
-                               agent.takeDownProcessWithNoJob(agent, 
duccEvent.getJobList());
+                               agent.takeDownProcessWithNoJobV2(agent, 
duccEvent.getJobList());
                                Map<DuccId, IDuccProcess> processes = 
agent.getInventoryCopy();
                                // iterate over all jobs and reconcile those 
processes that are
                                // assigned to this agent. First,

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java?rev=1842258&r1=1842257&r2=1842258&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java
 Fri Sep 28 15:07:10 2018
@@ -19,6 +19,7 @@
 package org.apache.uima.ducc.transport.event;
 
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.utils.id.DuccId;
@@ -29,11 +30,11 @@ public class NodeInventoryUpdateDuccEven
 
        private static final long serialVersionUID = -240986007026771587L;
 
-       private HashMap<DuccId, IDuccProcess> processes = null;
+       private Map<DuccId, IDuccProcess> processes = null;
        private long lastORSequence;
        private NodeIdentity nodeIdentity;
        
-       public NodeInventoryUpdateDuccEvent(HashMap<DuccId, IDuccProcess> 
processes, long lastORSequence, NodeIdentity node) {
+       public NodeInventoryUpdateDuccEvent(Map<DuccId, IDuccProcess> 
processes, long lastORSequence, NodeIdentity node) {
                super(EventType.START_PROCESS);
                this.processes = processes;
                this.lastORSequence = lastORSequence;
@@ -47,6 +48,9 @@ public class NodeInventoryUpdateDuccEven
                return lastORSequence;
        }
        public HashMap<DuccId, IDuccProcess> getProcesses() {
-               return processes;
+               HashMap<DuccId, IDuccProcess> processMap =
+                               new HashMap<>();
+               processMap.putAll(processes);
+               return processMap;
        }
 }


Reply via email to