Author: cwiklik
Date: Thu Sep 13 20:07:38 2018
New Revision: 1840856

URL: http://svn.apache.org/viewvc?rev=1840856&view=rev
Log:
UIMA-5865 modified to send state info to agent

Modified:
    
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java?rev=1840856&r1=1840855&r2=1840856&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java
 Thu Sep 13 20:07:38 2018
@@ -19,7 +19,10 @@
 
 package org.apache.uima.ducc.transport.configuration.service;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.lang.reflect.Method;
+import java.net.Socket;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -35,11 +38,17 @@ import org.apache.uima.ducc.common.utils
 import org.apache.uima.ducc.transport.configuration.jp.AgentSession;
 import org.apache.uima.ducc.transport.configuration.jp.JmxAEProcessInitMonitor;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.util.Level;
 
 public class ServiceComponent extends AbstractDuccComponent implements
                IJobProcessor {
+       private static final String SERVICE_JMX_PORT = "SERVICE_JMX_PORT=";
+       private static final String SERVICE_UNIQUE_ID= "DUCC_PROCESS_UNIQUEID=";
+       private static final String SERVICE_STATE = "DUCC_PROCESS_STATE=";
+       private static final String SERVICE_DATA = "SERVICE_DATA=";
+       private static final String SEPARATOR = ",";
 
-       private AgentSession agent = null;
+       //private AgentSession agent = null;
        ScheduledThreadPoolExecutor executor = null;
 
        private String jmxConnectString = "";
@@ -68,19 +77,91 @@ public class ServiceComponent extends Ab
        public void setState(ProcessState state) {
                try {
                        stateLock.lock();
+                       
                        if (currentState.name().equals(
                                        
ProcessState.FailedInitialization.name())) {
                                return;
                        }
                        if (!state.name().equals(currentState.name())) {
                                currentState = state;
-                               agent.notify(currentState, 
super.getProcessJmxUrl());
+                       //      agent.notify(currentState, 
super.getProcessJmxUrl());
+                               sendStateUpdate(state.name(), new Properties());
                        }
                } finally {
                        stateLock.unlock();
                }
        }
 
+
+       private Socket connect() throws IOException {
+               int statusUpdatePort = -1;
+
+               String port = System.getenv("DUCC_STATE_UPDATE_PORT");
+               try {
+                       statusUpdatePort = Integer.valueOf(port);
+               } catch (NumberFormatException nfe) {
+                       return null; 
+               }
+           logger.info("connect",null, "Service Connecting Socket to localhost 
Monitor on port:" + statusUpdatePort);
+               String localhost = null;
+               // establish socket connection to an agent where this process 
will report its
+               // state
+               return new Socket(localhost, statusUpdatePort);
+
+       }
+
+       private void sendStateUpdate(String state, Properties additionalData){
+               DataOutputStream out = null;
+               Socket socket = null;
+               if ( System.getenv("DUCC_STATE_UPDATE_PORT") == null) {
+                       return; // agent update port not specified
+               }
+               try {
+                       socket = connect();
+                       if ( socket == null ) {
+                               return;
+                       }
+                       if ( additionalData == null ) {
+                               additionalData = new Properties();
+                       } 
+                       // Agent needs process unique ID to identify it within 
inventory.
+                       // The unique id was added as an env var by an agent 
before this
+                       // process was launched.
+                       StringBuilder sb = new StringBuilder()
+                          .append(SERVICE_UNIQUE_ID)
+                          .append(System.getenv("DUCC_PROCESS_UNIQUEID"))
+                          .append(SEPARATOR)
+                          .append(SERVICE_STATE)
+                          .append(state);
+                       if ( jmxConnectString != null && 
+                                       !jmxConnectString.trim().isEmpty()) {
+                               sb.append(SEPARATOR).
+                               append(SERVICE_JMX_PORT).
+                append(jmxConnectString.trim());
+                       }
+                       out = new DataOutputStream(socket.getOutputStream());
+                       out.writeUTF(sb.toString());
+                       out.flush();
+               } catch (Exception e) {
+                       
+               } finally {
+                       try {
+                               if (out != null) {
+                                       out.close();
+                               }
+                               if (socket != null) {
+                                       socket.close();
+                               }
+                       } catch( IOException ee) {
+                               
+                       }
+               
+               }
+
+       }
+
+       
+       
        protected void setDD(String dd) {
                this.dd = dd;
        }
@@ -94,7 +175,7 @@ public class ServiceComponent extends Ab
        }
 
        protected void setAgentSession(AgentSession session) {
-               agent = session;
+               //agent = session;
        }
 
        public String getProcessJmxUrl() {
@@ -125,20 +206,37 @@ public class ServiceComponent extends Ab
                }
                String processJmxUrl = super.getProcessJmxUrl();
                // tell the agent that this process is initializing
-               agent.notify(ProcessState.Initializing, processJmxUrl);
-
+               //agent.notify(ProcessState.Initializing, processJmxUrl);
+               sendStateUpdate(ProcessState.Initializing.name(), new 
Properties());
                try {
                        executor = new ScheduledThreadPoolExecutor(1);
                        executor.prestartAllCoreThreads();
                        // Instantiate a UIMA AS jmx monitor to poll for status 
of the AE.
                        // This monitor checks if the AE is initializing or 
ready.
+
+                       
+/*                     
+       MUST SEND STATE TO AGENT
+                       
                        JmxAEProcessInitMonitor monitor = new 
JmxAEProcessInitMonitor(agent);
+                       
+               
+                       
+                       
+                       
+                       
                        /*
                         * This will execute the UimaAEJmxMonitor continuously 
for every 15
                         * seconds with an initial delay of 20 seconds. This 
monitor polls
                         * initialization status of AE deployed in UIMA AS.
                         */
-                       executor.scheduleAtFixedRate(monitor, 20, 30, 
TimeUnit.SECONDS);
+               //      executor.scheduleAtFixedRate(monitor, 20, 30, 
TimeUnit.SECONDS);
+                       
+                               
+                       
+                       
+                       
+                       
                        String[] jpArgs;
                        jpArgs = new String[] { "-dd", args[0], "-saxonURL", 
saxonJarPath,
                                        "-xslt", dd2SpringXslPath };
@@ -173,7 +271,8 @@ public class ServiceComponent extends Ab
                                // Update agent with the most up-to-date state 
of the pipeline
                                // all is well, so notify agent that this 
process is in Running
                                // state
-                               agent.notify(currentState, processJmxUrl);
+                               //agent.notify(currentState, processJmxUrl);
+                               sendStateUpdate(currentState.name(), new 
Properties());
                                // SUCCESSFUL DEPLOY - Now wait until the agent 
sends stop
                                // request. Processing continues in UIMA-AS 
without DUCCs 
                                // involvement.
@@ -188,7 +287,8 @@ public class ServiceComponent extends Ab
                        getLogger()
                                        .info("start", null,
                                                        ">>> Failed to Deploy 
UIMA Service. Check UIMA Log for Details");
-                       agent.notify(ProcessState.FailedInitialization);
+                       //agent.notify(ProcessState.FailedInitialization);
+                       
sendStateUpdate(ProcessState.FailedInitialization.name(), new Properties());
                        Runtime.getRuntime().halt(0);   // hard stop. 
Initialization failed
                }
 
@@ -204,9 +304,11 @@ public class ServiceComponent extends Ab
 
        public void stop() {
                currentState = ProcessState.Stopping;
-               if ( agent != null ) {
-                       agent.notify(currentState);
-               }
+//             if ( agent != null ) {
+//                     agent.notify(currentState);
+//             }
+               sendStateUpdate(currentState.name(), new Properties());
+
                if (super.isStopping()) {
                        return; // already stopping - nothing to do
                }
@@ -225,9 +327,9 @@ public class ServiceComponent extends Ab
                                executor.shutdownNow();
                        }
 
-                       if (agent != null) {
-                               agent.stop();
-                       }
+//                     if (agent != null) {
+//                             agent.stop();
+//                     }
                } catch (Exception e) {
                        getLogger().error("stop", null, e);
                } finally {


Reply via email to