Author: cwiklik
Date: Fri Nov 7 21:43:16 2014
New Revision: 1637458
URL: http://svn.apache.org/r1637458
Log:
UIMA-4076 Initial implementation for JD and JP
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java?rev=1637458&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
Fri Nov 7 21:43:16 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jd;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import
org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
+
+public class JobDriverComponent extends AbstractDuccComponent
+implements IJobDriverComponent {
+
+ private JobDriverConfiguration configuration;
+
+ public JobDriverComponent(String componentName, CamelContext
ctx,JobDriverConfiguration jdc) {
+ super(componentName,ctx);
+ this.configuration = jdc;
+ }
+
+ @Override
+ public DuccLogger getLogger() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java?rev=1637458&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java
Fri Nov 7 21:43:16 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jd;
+
+import org.apache.camel.Body;
+import
org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import
org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+public class JobDriverEventListener implements DuccEventDelegateListener {
+
+ IJobDriverComponent component;
+
+ public JobDriverEventListener(IJobDriverComponent component) {
+ this.component = component;
+ }
+ public void onOrchestratorAbbreviatedStateDuccEvent(@Body
OrchestratorAbbreviatedStateDuccEvent duccEvent) throws Exception {
+ //component.evaluateJobDriverConstraints(duccEvent);
+ }
+
+ public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher)
{
+ }
+
+}
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java?rev=1637458&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
Fri Nov 7 21:43:16 2014
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jd.iface;
+
+public interface IJobDriverComponent {
+
+}
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java?rev=1637458&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
Fri Nov 7 21:43:16 2014
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.util.List;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import
org.apache.uima.ducc.container.jp.iface.IJobProcessManagerCallbackListener;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
+import org.apache.uima.ducc.transport.configuration.jp.iface.IAgentSession;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Responsible for delegating state changes to a remote Agent.
+ *
+ */
+public class AgentSession
+implements IAgentSession, IJobProcessManagerCallbackListener {
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(), "UIMA AS
Service");
+
+ // Dispatcher is responsible for sending state update event to jms
endpoint
+ private DuccEventDispatcher dispatcher;
+ // Caches process PID
+ private String pid=null;
+ // Unique ID assigned to the process. This is different from OS PID
+ private String duccProcessId;
+
+ private ProcessState state;
+
+ private String endpoint;
+
+ private Object stateLock = new Object();
+
+ /**
+ * JMS based adapter C'tor
+ *
+ * @param dispatcher - initialized instance of {@link
DuccEventDispatcher}
+ * @param duccProcessId - unique ID assigned by Ducc infrastructure
+ */
+ public AgentSession(DuccEventDispatcher dispatcher, String
duccProcessId, String endpoint) {
+ this.dispatcher = dispatcher;
+ this.duccProcessId = duccProcessId;
+ this.endpoint = endpoint;
+ }
+ public void notify(ProcessState state) {
+ notify(state, null);
+ }
+ public void notify(ProcessState state, String message) {
+ synchronized( stateLock ) {
+ this.state = state;
+ if ( pid == null ) {
+ // Get the PID once and cache for future reference
+ pid = Utils.getPID();
+ }
+ ProcessStateUpdate processUpdate = null;
+ if ( message == null ) {
+ processUpdate = new ProcessStateUpdate(state, pid,
duccProcessId,null);
+ } else {
+ processUpdate = new ProcessStateUpdate(state, pid,
duccProcessId,message, null);
+ }
+ //System.out.println("................. >>>
ProcessStateUpdate==NULL?"+(processUpdate==null)+" JmxUrl="+processJmxUrl);
+ if (endpoint != null ) {
+ processUpdate.setSocketEndpoint(endpoint);
+ }
+ this.notify(processUpdate);
+ }
+ }
+ /**
+ * Called on UIMA AS status change. Sends a {@link
ProcessStateUpdateDuccEvent} message
+ * via configured dispatcher to a configured endpoint.
+ *
+ */
+ public void notify(ProcessStateUpdate state) {
+ try {
+ ProcessStateUpdateDuccEvent duccEvent =
+ new ProcessStateUpdateDuccEvent(state);
+ logger.info("notifyAgentWithStatus",null," >>>>>>> UIMA AS Service
Deployed - PID:"+pid);
+
+ if (endpoint != null ) {
+ state.setSocketEndpoint(endpoint);
+ }
+ // send the process update to the remote
+ dispatcher.dispatch(duccEvent, System.getenv("IP"));
+ String jmx = state.getProcessJmxUrl() == null ? "N/A" :
state.getProcessJmxUrl();
+ logger.info("notifyAgentWithStatus",null,"... UIMA AS
Service Deployed - PID:"+pid+". Service State: "+state+". JMX Url:"+jmx+"
Dispatched State Update Event to Agent with IP:"+System.getenv("IP"));
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ public void notify(List<IUimaPipelineAEComponent> pipeline) {
+ synchronized( stateLock ) {
+ // Only send update if the AE is initializing
+ if ( state.equals(ProcessState.Initializing)) {
+ try {
+ ProcessStateUpdate processUpdate =
+ new ProcessStateUpdate(state, pid, duccProcessId, null,
pipeline);
+ notify(processUpdate);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ public void stop() throws Exception {
+ dispatcher.stop();
+ }
+}
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java?rev=1637458&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java
Fri Nov 7 21:43:16 2014
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.UimaPipelineAEComponent;
+
+
+public class JmxAEProcessInitMonitor implements Runnable {
+ MBeanServer server = null;
+ AgentSession agent;
+ static int howManySeenSoFar = 1;
+ public List<IUimaPipelineAEComponent> aeStateList = new
ArrayList<IUimaPipelineAEComponent>();
+
+ public JmxAEProcessInitMonitor(AgentSession agent)
+ throws Exception {
+ server = ManagementFactory.getPlatformMBeanServer();
+ this.agent = agent;
+ }
+
+ private IUimaPipelineAEComponent getUimaAeByName(String name) {
+ for (IUimaPipelineAEComponent aeState : aeStateList) {
+ if (aeState.getAeName().equals(name)) {
+ return aeState;
+ }
+ }
+ return null;
+ }
+
+ public void run() {
+ try {
+ // create an ObjectName with UIMA As JMS naming
convention to
+ // enable
+ // finding deployed uima components.
+ ObjectName uimaServicePattern = new ObjectName(
+
"org.apache.uima:type=ee.jms.services,*");
+ // Fetch UIMA AS MBean names from JMX Server that match
above
+ // name pattern
+ Set<ObjectInstance> mbeans = new
HashSet<ObjectInstance>(
+ server.queryMBeans(uimaServicePattern,
null));
+ List<IUimaPipelineAEComponent> componentsToDelete = new
ArrayList<IUimaPipelineAEComponent>();
+ boolean updateAgent = false;
+ for (ObjectInstance instance : mbeans) {
+ String targetName = instance.getObjectName()
+ .getKeyProperty("name");
+ if (targetName.endsWith("FlowController")) { //
skip FC
+ continue;
+ }
+ // Only interested in AEs
+ if (instance
+ .getClassName()
+
.equals("org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl")) {
+ String[] aeObjectNameParts =
instance.getObjectName()
+ .toString().split(",");
+ if (aeObjectNameParts.length == 3) {
+ // this is uima aggregate
MBean. Skip it. We only
+ // care about this
+ // aggregate's pipeline
components.
+ continue;
+ }
+ StringBuffer sb = new StringBuffer();
+ // int partCount = 0;
+ // compose component name from jmx
ObjectName
+ for (String part : aeObjectNameParts) {
+ // partCount++;
+ if
(part.startsWith("org.apache.uima:type")
+ ||
part.startsWith("s=")) {
+ continue; // skip
service name part of the name
+ } else {
+ sb.append("/");
+ if
(part.endsWith("Components")) {
+ part =
part.substring(0,
+
part.indexOf("Components")).trim();
+ }
+
sb.append(part.substring(part.indexOf("=") + 1));
+ }
+ }
+ // Fetch a proxy to the AE Management
object which holds
+ // AE stats
+ AnalysisEngineManagement proxy =
JMX.newMBeanProxy(
+ server,
instance.getObjectName(),
+
AnalysisEngineManagement.class);
+
+ IUimaPipelineAEComponent aeState = null;
+ // if ((aeState =
getUimaAeByName(aeStateList,
+ // sb.toString())) == null) {
+ if ((aeState =
getUimaAeByName(sb.toString())) == null) {
+ // Not interested in AEs that
are in a Ready State
+ if
(AnalysisEngineManagement.State.valueOf(
+
proxy.getState()).equals(
+
AnalysisEngineManagement.State.Ready)) {
+ continue;
+ }
+ aeState = new
UimaPipelineAEComponent(
+ sb.toString(),
proxy.getThreadId(),
+
AnalysisEngineManagement.State
+
.valueOf(proxy.getState()));
+ aeStateList.add(aeState);
+ ((UimaPipelineAEComponent)
aeState).startInitialization = System
+
.currentTimeMillis();
+
aeState.setAeState(AnalysisEngineManagement.State.Initializing);
+ updateAgent = true;
+ } else {
+ // continue publishing AE state
while the AE is
+ // initializing
+ if
(AnalysisEngineManagement.State
+
.valueOf(proxy.getState())
+
.equals(AnalysisEngineManagement.State.Initializing)) {
+ updateAgent = true;
+
aeState.setInitializationTime(System
+
.currentTimeMillis()
+ -
((UimaPipelineAEComponent) aeState).startInitialization);
+ // publish state if the
AE just finished
+ // initializing and is
now in Ready state
+ } else if (aeState
+ .getAeState()
+
.equals(AnalysisEngineManagement.State.Initializing)
+ &&
AnalysisEngineManagement.State
+
.valueOf(proxy.getState())
+
.equals(AnalysisEngineManagement.State.Ready)) {
+
aeState.setAeState(AnalysisEngineManagement.State.Ready);
+ updateAgent = true;
+ synchronized (this) {
+ try {
+ wait(5);
+ } catch
(InterruptedException ex) {
+ }
+ }
+
aeState.setInitializationTime(proxy
+
.getInitializationTime());
+ // AE reached ready
state we no longer need to
+ // publish its state
+
componentsToDelete.add(aeState);
+ }
+ }
+
DuccService.getDuccLogger(this.getClass().getName()).debug(
+
"UimaAEJmxMonitor.run()",
+ null,
+ "---- AE Name:" +
proxy.getName()
+ + " AE
State:" + proxy.getState()
+ + " AE
init time="
+ +
aeState.getInitializationTime()
+ + "
Proxy Init time="
+ +
proxy.getInitializationTime()
+ + "
Proxy Thread ID:"
+ +
proxy.getThreadId());
+ }
+ }
+ howManySeenSoFar = 1; // reset error counter
+ if (updateAgent) {
+
DuccService.getDuccLogger(this.getClass().getName()).debug("UimaAEJmxMonitor.run()",
null,
+ "---- Publishing
UimaPipelineAEComponent List - size="
+ +
aeStateList.size());
+ try {
+ agent.notify(aeStateList);
+ } catch (Exception ex) {
+ throw ex;
+ } finally {
+ // remove components that reached Ready
state
+ for (IUimaPipelineAEComponent aeState :
componentsToDelete) {
+ aeStateList.remove(aeState);
+ }
+ }
+ }
+
+ } catch (UndeclaredThrowableException e) {
+ if (!(e.getCause() instanceof
InstanceNotFoundException)) {
+ if (howManySeenSoFar > 3) { // allow up three
errors of this
+
// kind
+
DuccService.getDuccLogger(this.getClass().getName()).info("UimaAEJmxMonitor.run()",
null, e);
+ howManySeenSoFar = 1;
+ throw e;
+ }
+ howManySeenSoFar++;
+ } else {
+ // AE not fully initialized yet, ignore the
exception
+ }
+ } catch (Throwable e) {
+ howManySeenSoFar = 1;
+
DuccService.getDuccLogger(this.getClass().getName()).info("UimaAEJmxMonitor.run()",
null, e);
+ }
+ }
+}
\ No newline at end of file
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1637458&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
Fri Nov 7 21:43:16 2014
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Route;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.container.jp.JobProcessManager;
+import org.apache.uima.ducc.container.jp.iface.IUimaProcessor;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+public class JobProcessComponent extends AbstractDuccComponent{
+
+
+ private JobProcessConfiguration configuration=null;
+ private String jmxConnectString="";
+ private AgentSession agent = null;
+ private JobProcessManager jobProcessManager = null;
+ protected ProcessState currentState = ProcessState.Undefined;
+ protected ProcessState previousState = ProcessState.Undefined;
+
+ public JobProcessComponent(String componentName, CamelContext
ctx,JobProcessConfiguration jpc) {
+ super(componentName,ctx);
+ this.configuration = jpc;
+ jmxConnectString = super.getProcessJmxUrl();
+ }
+
+ protected void setAgentSession(AgentSession session ) {
+ agent = session;
+ }
+ protected void setJobProcessManager(JobProcessManager
jobProcessManager) {
+ this.jobProcessManager = jobProcessManager;
+ }
+ public String getProcessJmxUrl() {
+ return jmxConnectString;
+ }
+
+ public DuccLogger getLogger() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ public void start(DuccService service, String[] args) throws Exception {
+ super.start(service, args);
+ //this.configuration.start(args);
+ try {
+ String jps =
System.getProperty("org.apache.uima.ducc.userjarpath");
+ if (null == jps) {
+ System.err
+ .println("Missing the
-Dorg.apache.uima.jarpath=XXXX property");
+ System.exit(1);
+ }
+ String processJmxUrl = super.getProcessJmxUrl();
+ agent.notify(ProcessState.Initializing, processJmxUrl);
+ IUimaProcessor uimaProcessor = null;
+ ScheduledThreadPoolExecutor executor = null;
+
+ try {
+ executor = new ScheduledThreadPoolExecutor(1);
+ executor.prestartAllCoreThreads();
+ // Instantiate a UIMA AS jmx monitor to poll
for status of the AE.
+ // This monitor checks if the AE is
initializing or ready.
+ JmxAEProcessInitMonitor monitor = new
JmxAEProcessInitMonitor(agent);
+ /*
+ * This will execute the UimaAEJmxMonitor
continuously for every 15
+ * seconds with an initial delay of 20 seconds.
This monitor polls
+ * initialization status of AE deployed in UIMA
AS.
+ */
+ executor.scheduleAtFixedRate(monitor, 20, 30,
TimeUnit.SECONDS);
+
+ // Deploy UIMA pipelines. This blocks until the
pipelines initializes or
+ // there is an exception. The IUimaProcessor is a
wrapper around
+ // processing container where the analysis is being
done.
+ uimaProcessor =
+ jobProcessManager.deploy(jps, args,
"org.apache.uima.ducc.user.jp.UserProcessContainer");
+
+ // pipelines deployed and initialized. This is process
is Ready
+ // for processing
+ currentState = ProcessState.Running;
+ // Update agent with the most up-to-date state
of the pipeline
+ // monitor.run();
+ // all is well, so notify agent that this
process is in Running state
+ agent.notify(currentState, processJmxUrl);
+ // Create thread pool and begin processing
+
+
+
+ } catch( Exception ee) {
+ currentState = ProcessState.FailedInitialization;
+ System.out
+ .println(">>> Failed to Deploy
UIMA Service. Check UIMA Log for Details");
+ agent.notify(ProcessState.FailedInitialization);
+ } finally {
+ // Stop executor. It was only needed to poll AE
initialization status.
+ // Since deploy() completed
+ // the UIMA AS service either succeeded
initializing or it failed. In
+ // either case we no longer
+ // need to poll for initialization status
+ if ( executor != null ) {
+ executor.shutdownNow();
+ }
+
+ }
+
+
+
+ } catch( Exception e) {
+ currentState = ProcessState.FailedInitialization;
+ agent.notify(currentState);
+
+
+ }
+
+ }
+ public void stop() {
+ if ( super.isStopping() ) {
+ return; // already stopping - nothing to do
+ }
+ //configuration.stop();
+ System.out.println("... AbstractManagedService - Stopping
Service Adapter");
+// serviceAdapter.stop();
+ System.out.println("... AbstractManagedService - Calling
super.stop() ");
+ try {
+ if (getContext() != null) {
+ for (Route route : getContext().getRoutes()) {
+
+ route.getConsumer().stop();
+ System.out.println(">>> configFactory.stop() -
stopped route:"
+ + route.getId());
+ }
+ }
+
+ agent.stop();
+ super.stop();
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java?rev=1637458&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java
Fri Nov 7 21:43:16 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp;
+
+//import org.apache.uima.ducc.agent.deploy.ManagedService;
+import org.apache.camel.Body;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+
+public class JobProcessEventListener implements DuccEventDelegateListener{
+ private JobProcessComponent duccComponent;
+
+ public JobProcessEventListener(JobProcessComponent component) {
+ duccComponent = component;
+ }
+
+ public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher)
{
+ }
+
+ public void onProcessStop(@Body ProcessStopDuccEvent event) {
+ duccComponent.stop();
+ }
+
+
+}
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java?rev=1637458&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java
Fri Nov 7 21:43:16 2014
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp.iface;
+
+import java.util.List;
+
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Interface to
+ *
+ *
+ */
+public interface IAgentSession {
+ public void notify(ProcessState state);
+ public void notify(ProcessState state, String message);
+ public void notify(List<IUimaPipelineAEComponent> pipeline);
+ public void stop() throws Exception;
+}