Author: cwiklik Date: Fri Sep 28 15:07:10 2018 New Revision: 1842258 URL: http://svn.apache.org/viewvc?rev=1842258&view=rev Log: UIMA-5864 improve detection of ghost processes
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java?rev=1842258&r1=1842257&r2=1842258&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/Agent.java Fri Sep 28 15:07:10 2018 @@ -18,7 +18,7 @@ */ package org.apache.uima.ducc.agent; -import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -35,8 +35,8 @@ public interface Agent extends ProcessLi public NodeIdentity getIdentity(); - public HashMap<DuccId, IDuccProcess> getInventoryCopy(); - public HashMap<DuccId,IDuccProcess> getInventoryRef(); + public Map<DuccId, IDuccProcess> getInventoryCopy(); + public Map<DuccId,IDuccProcess> getInventoryRef(); public boolean isRogueProcess(String uid, Set<NodeUsersCollector.ProcessInfo> processList, NodeUsersCollector.ProcessInfo cpi ) throws Exception; public void copyAllUserReservations(TreeMap<String,NodeUsersInfo> map); public RogueProcessReaper getRogueProcessReaper(); Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1842258&r1=1842257&r2=1842258&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Fri Sep 28 15:07:10 2018 @@ -40,6 +40,7 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -120,7 +121,8 @@ public class NodeAgent extends AbstractD public static String cgroupFailureReason; // Map of known processes this agent is managing. This map is published // at regular intervals as part of agent's inventory update. - private HashMap<DuccId, IDuccProcess> inventory = new HashMap<DuccId, IDuccProcess>(); + private Map<DuccId, IDuccProcess> inventory = + new ConcurrentHashMap<>(); // Semaphore controlling access to inventory Map private Semaphore inventorySemaphore = new Semaphore(1); @@ -663,23 +665,23 @@ public class NodeAgent extends AbstractD * Returns deep copy (by way of java serialization) of the Agents inventory. */ @SuppressWarnings("unchecked") - public HashMap<DuccId, IDuccProcess> getInventoryCopy() { + public Map<DuccId, IDuccProcess> getInventoryCopy() { Object deepCopy = null; try { inventorySemaphore.acquire(); - deepCopy = SerializationUtils.clone((HashMap<DuccId, IDuccProcess>) inventory); + deepCopy = SerializationUtils.clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory); } catch (InterruptedException e) { } finally { inventorySemaphore.release(); } - return (HashMap<DuccId, IDuccProcess>) deepCopy; + return (Map<DuccId, IDuccProcess>)deepCopy; } /** * Returns shallow copy of the Agent's inventory */ - public HashMap<DuccId, IDuccProcess> getInventoryRef() { - return (HashMap<DuccId, IDuccProcess>) inventory; + public Map<DuccId, IDuccProcess> getInventoryRef() { + return inventory; } /* @@ -777,6 +779,69 @@ public class NodeAgent extends AbstractD } } + private void stopProcessIfAlive(IDuccProcess process, ProcessLifecycleController lifecycleController) { + String methodName = "stopProcessIfAlive"; + if (isAlive(process)) { + logger.error(methodName, null, + "<<<<<<<<< Stopping Process with no Job Assignement (Ghost Process) - DuccId:" + process.getDuccId() + + " PID:" + process.getPID()); + process.setReasonForStoppingProcess(ReasonForStoppingProcess.JPHasNoActiveJob.toString()); + lifecycleController.stopProcess(process); + } else { + logger.error(methodName, null, "XXXXXXXXXX Purging Process:" + process.getDuccId() + " Process State:" + + process.getProcessState() + " Process Resource State:" + process.getResourceState()); + + getInventoryRef().remove(process.getDuccId()); + } + } + + /* + * Valid process exists in agent inventory and in an incoming OR state. If + * process exists in agent inventory but not in OR state than such process is + * invalid + */ + private boolean validProcess(IDuccProcess process, List<IDuccJobDeployment> jobDeploymentList) { + // iterate over all jobs + for (IDuccJobDeployment job : jobDeploymentList) { + // check if current process is a JD + if ((job.getJdProcess() != null && process.getDuccId().equals(job.getJdProcess().getDuccId()))) { + return true; + } else { + // check if current process is a JP + for (IDuccProcess jProcess : job.getJpProcessList()) { + if (process.getDuccId().equals(jProcess.getDuccId())) { + return true; + } + } + } + } + return false; + } + + public void takeDownProcessWithNoJobV2(ProcessLifecycleController lifecycleController, + List<IDuccJobDeployment> jobDeploymentList) { + String methodName = "takeDownProcessWithNoJobV2"; + try { + inventorySemaphore.acquire(); + + // iterate over all processes in agent inventory + for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { + // if OR deployment list is empty, take down all agent processes that + // are in the inventory + if (jobDeploymentList.isEmpty() || !validProcess(processEntry.getValue(), jobDeploymentList)) { + logger.info(methodName, null, + "...Agent Process:" + processEntry.getValue().getDuccId() + " Not in OR JobDeploymentList"); + stopProcessIfAlive(processEntry.getValue(), lifecycleController); + } + } + } catch (Exception e) { + logger.error(methodName, null, e); + } finally { + inventorySemaphore.release(); + } + } + + /** * Reconciles agent inventory with job processes sent by PM * Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java?rev=1842258&r1=1842257&r2=1842258&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java Fri Sep 28 15:07:10 2018 @@ -290,7 +290,7 @@ public class AgentEventListener implemen // Stop any process that is in this Agent's inventory but not // associated with any // of the jobs we just received - agent.takeDownProcessWithNoJob(agent, duccEvent.getJobList()); + agent.takeDownProcessWithNoJobV2(agent, duccEvent.getJobList()); Map<DuccId, IDuccProcess> processes = agent.getInventoryCopy(); // iterate over all jobs and reconcile those processes that are // assigned to this agent. First, Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java?rev=1842258&r1=1842257&r2=1842258&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/NodeInventoryUpdateDuccEvent.java Fri Sep 28 15:07:10 2018 @@ -19,6 +19,7 @@ package org.apache.uima.ducc.transport.event; import java.util.HashMap; +import java.util.Map; import org.apache.uima.ducc.common.NodeIdentity; import org.apache.uima.ducc.common.utils.id.DuccId; @@ -29,11 +30,11 @@ public class NodeInventoryUpdateDuccEven private static final long serialVersionUID = -240986007026771587L; - private HashMap<DuccId, IDuccProcess> processes = null; + private Map<DuccId, IDuccProcess> processes = null; private long lastORSequence; private NodeIdentity nodeIdentity; - public NodeInventoryUpdateDuccEvent(HashMap<DuccId, IDuccProcess> processes, long lastORSequence, NodeIdentity node) { + public NodeInventoryUpdateDuccEvent(Map<DuccId, IDuccProcess> processes, long lastORSequence, NodeIdentity node) { super(EventType.START_PROCESS); this.processes = processes; this.lastORSequence = lastORSequence; @@ -47,6 +48,9 @@ public class NodeInventoryUpdateDuccEven return lastORSequence; } public HashMap<DuccId, IDuccProcess> getProcesses() { - return processes; + HashMap<DuccId, IDuccProcess> processMap = + new HashMap<>(); + processMap.putAll(processes); + return processMap; } }