Author: cwiklik Date: Thu Sep 13 20:07:38 2018 New Revision: 1840856 URL: http://svn.apache.org/viewvc?rev=1840856&view=rev Log: UIMA-5865 modified to send state info to agent
Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java?rev=1840856&r1=1840855&r2=1840856&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java Thu Sep 13 20:07:38 2018 @@ -19,7 +19,10 @@ package org.apache.uima.ducc.transport.configuration.service; +import java.io.DataOutputStream; +import java.io.IOException; import java.lang.reflect.Method; +import java.net.Socket; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -35,11 +38,17 @@ import org.apache.uima.ducc.common.utils import org.apache.uima.ducc.transport.configuration.jp.AgentSession; import org.apache.uima.ducc.transport.configuration.jp.JmxAEProcessInitMonitor; import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; +import org.apache.uima.util.Level; public class ServiceComponent extends AbstractDuccComponent implements IJobProcessor { + private static final String SERVICE_JMX_PORT = "SERVICE_JMX_PORT="; + private static final String SERVICE_UNIQUE_ID= "DUCC_PROCESS_UNIQUEID="; + private static final String SERVICE_STATE = "DUCC_PROCESS_STATE="; + private static final String SERVICE_DATA = "SERVICE_DATA="; + private static final String SEPARATOR = ","; - private AgentSession agent = null; + //private AgentSession agent = null; ScheduledThreadPoolExecutor executor = null; private String jmxConnectString = ""; @@ -68,19 +77,91 @@ public class ServiceComponent extends Ab public void setState(ProcessState state) { try { stateLock.lock(); + if (currentState.name().equals( ProcessState.FailedInitialization.name())) { return; } if (!state.name().equals(currentState.name())) { currentState = state; - agent.notify(currentState, super.getProcessJmxUrl()); + // agent.notify(currentState, super.getProcessJmxUrl()); + sendStateUpdate(state.name(), new Properties()); } } finally { stateLock.unlock(); } } + + private Socket connect() throws IOException { + int statusUpdatePort = -1; + + String port = System.getenv("DUCC_STATE_UPDATE_PORT"); + try { + statusUpdatePort = Integer.valueOf(port); + } catch (NumberFormatException nfe) { + return null; + } + logger.info("connect",null, "Service Connecting Socket to localhost Monitor on port:" + statusUpdatePort); + String localhost = null; + // establish socket connection to an agent where this process will report its + // state + return new Socket(localhost, statusUpdatePort); + + } + + private void sendStateUpdate(String state, Properties additionalData){ + DataOutputStream out = null; + Socket socket = null; + if ( System.getenv("DUCC_STATE_UPDATE_PORT") == null) { + return; // agent update port not specified + } + try { + socket = connect(); + if ( socket == null ) { + return; + } + if ( additionalData == null ) { + additionalData = new Properties(); + } + // Agent needs process unique ID to identify it within inventory. + // The unique id was added as an env var by an agent before this + // process was launched. + StringBuilder sb = new StringBuilder() + .append(SERVICE_UNIQUE_ID) + .append(System.getenv("DUCC_PROCESS_UNIQUEID")) + .append(SEPARATOR) + .append(SERVICE_STATE) + .append(state); + if ( jmxConnectString != null && + !jmxConnectString.trim().isEmpty()) { + sb.append(SEPARATOR). + append(SERVICE_JMX_PORT). + append(jmxConnectString.trim()); + } + out = new DataOutputStream(socket.getOutputStream()); + out.writeUTF(sb.toString()); + out.flush(); + } catch (Exception e) { + + } finally { + try { + if (out != null) { + out.close(); + } + if (socket != null) { + socket.close(); + } + } catch( IOException ee) { + + } + + } + + } + + + protected void setDD(String dd) { this.dd = dd; } @@ -94,7 +175,7 @@ public class ServiceComponent extends Ab } protected void setAgentSession(AgentSession session) { - agent = session; + //agent = session; } public String getProcessJmxUrl() { @@ -125,20 +206,37 @@ public class ServiceComponent extends Ab } String processJmxUrl = super.getProcessJmxUrl(); // tell the agent that this process is initializing - agent.notify(ProcessState.Initializing, processJmxUrl); - + //agent.notify(ProcessState.Initializing, processJmxUrl); + sendStateUpdate(ProcessState.Initializing.name(), new Properties()); try { executor = new ScheduledThreadPoolExecutor(1); executor.prestartAllCoreThreads(); // Instantiate a UIMA AS jmx monitor to poll for status of the AE. // This monitor checks if the AE is initializing or ready. + + +/* + MUST SEND STATE TO AGENT + JmxAEProcessInitMonitor monitor = new JmxAEProcessInitMonitor(agent); + + + + + + /* * This will execute the UimaAEJmxMonitor continuously for every 15 * seconds with an initial delay of 20 seconds. This monitor polls * initialization status of AE deployed in UIMA AS. */ - executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS); + // executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS); + + + + + + String[] jpArgs; jpArgs = new String[] { "-dd", args[0], "-saxonURL", saxonJarPath, "-xslt", dd2SpringXslPath }; @@ -173,7 +271,8 @@ public class ServiceComponent extends Ab // Update agent with the most up-to-date state of the pipeline // all is well, so notify agent that this process is in Running // state - agent.notify(currentState, processJmxUrl); + //agent.notify(currentState, processJmxUrl); + sendStateUpdate(currentState.name(), new Properties()); // SUCCESSFUL DEPLOY - Now wait until the agent sends stop // request. Processing continues in UIMA-AS without DUCCs // involvement. @@ -188,7 +287,8 @@ public class ServiceComponent extends Ab getLogger() .info("start", null, ">>> Failed to Deploy UIMA Service. Check UIMA Log for Details"); - agent.notify(ProcessState.FailedInitialization); + //agent.notify(ProcessState.FailedInitialization); + sendStateUpdate(ProcessState.FailedInitialization.name(), new Properties()); Runtime.getRuntime().halt(0); // hard stop. Initialization failed } @@ -204,9 +304,11 @@ public class ServiceComponent extends Ab public void stop() { currentState = ProcessState.Stopping; - if ( agent != null ) { - agent.notify(currentState); - } +// if ( agent != null ) { +// agent.notify(currentState); +// } + sendStateUpdate(currentState.name(), new Properties()); + if (super.isStopping()) { return; // already stopping - nothing to do } @@ -225,9 +327,9 @@ public class ServiceComponent extends Ab executor.shutdownNow(); } - if (agent != null) { - agent.stop(); - } +// if (agent != null) { +// agent.stop(); +// } } catch (Exception e) { getLogger().error("stop", null, e); } finally {