Author: schor
Date: Thu Dec 20 14:40:58 2018
New Revision: 1849399
URL: http://svn.apache.org/viewvc?rev=1849399&view=rev
Log:
[UIMA-5937] update pom version and scm, merge v2 updates, record merge
Added:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java
- copied unchanged from r1846917,
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotatorWithDOCTYPE.xml
- copied unchanged from r1846917,
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotatorWithDOCTYPE.xml
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregateWithJmsService.xml
- copied unchanged from r1846917,
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregateWithJmsService.xml
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWithTargetingSupport.xml
- copied unchanged from r1846917,
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWithTargetingSupport.xml
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWithTargetingSupport.xml
- copied unchanged from r1846917,
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWithTargetingSupport.xml
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/ (props changed)
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpCC.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith2SecDelay.xml
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith5SecDelay.xml
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregate.xml
Propchange: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 20 14:40:58 2018
@@ -1,2 +1,3 @@
/uima/uima-as/branches/depend-on-parent-pom-4/uimaj-as-activemq:961335-961760
/uima/uima-as/branches/mavenAlign/uimaj-as-activemq:941450-944450
+/uima/uima-as/trunk/uimaj-as-activemq:1786000-1846917
Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml (original)
+++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml Thu Dec 20 14:40:58 2018
@@ -15,7 +15,7 @@
<parent>
<groupId>org.apache.uima</groupId>
<artifactId>uima-as-parent</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>3.0.1-SNAPSHOT</version>
<relativePath>../uima-as-parent/pom.xml</relativePath>
</parent>
@@ -31,13 +31,13 @@
cutting/pasting the <scm> element, and just changing the
following two properties -->
<scm>
<connection>
-
scm:svn:http://svn.apache.org/repos/asf/uima/uima-as/trunk/uimaj-as-activemq
+
scm:svn:http://svn.apache.org/repos/asf/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq
</connection>
<developerConnection>
-
scm:svn:https://svn.apache.org/repos/asf/uima/uima-as/trunk/uimaj-as-activemq
+
scm:svn:https://svn.apache.org/repos/asf/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq
</developerConnection>
<url>
- http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq
+ http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq
</url>
</scm>
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
---
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
(original)
+++
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
Thu Dec 20 14:40:58 2018
@@ -30,12 +30,10 @@ import javax.jms.Message;
import javax.jms.Session;
import org.apache.uima.UIMAFramework;
-import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.UimaBlockingExecutor;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
-import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
---
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
(original)
+++
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Thu Dec 20 14:40:58 2018
@@ -227,7 +227,7 @@ public class JmsEndpointConnection_impl
}
if ( brokerDestinations.getConnection() != null )
{
try {
- // Close the connection to avoid leaks in
the broker
+ // Close the connection to avoid leaks
in the broker
brokerDestinations.getConnection().close();
} catch( Exception e) {
// Ignore exceptions on a close of a bad
connection
@@ -249,7 +249,9 @@ public class JmsEndpointConnection_impl
ActiveMQConnectionFactory factory
= new ActiveMQConnectionFactory(brokerUri);
// White list packages for
deserialization
factory.setTrustAllPackages(true);
+
factory.setConnectionIDPrefix("JmsOutputChannel");
factory.setWatchTopicAdvisories(false);
+
// Create shared jms connection
to a broker
conn = factory.createConnection();
factory.setDispatchAsync(true);
@@ -755,7 +757,7 @@ public class JmsEndpointConnection_impl
if ( command == AsynchAEMessage.ServiceInfo ) {
return false;
}
- if ( (msgType == AsynchAEMessage.Response || msgType ==
AsynchAEMessage.Request ) &&
+ if ( (msgType == AsynchAEMessage.Response || msgType ==
AsynchAEMessage.Request ) &&
command == AsynchAEMessage.Process ) {
String casReferenceId="";
try {
@@ -764,7 +766,7 @@ public class JmsEndpointConnection_impl
String key = "";
String endpointName = "";
if ( delegateEndpoint != null ) {
- delegateEndpoint.getDelegateKey();
+ key = delegateEndpoint.getDelegateKey();
endpointName = ((ActiveMQDestination)
delegateEndpoint.getDestination())
.getPhysicalName();
}
@@ -781,20 +783,22 @@ public class JmsEndpointConnection_impl
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-
+
String key = "";
String endpointName = "";
if ( delegateEndpoint != null ) {
- delegateEndpoint.getDelegateKey();
+ key = delegateEndpoint.getDelegateKey();
endpointName = ((ActiveMQDestination)
delegateEndpoint.getDestination())
.getPhysicalName();
- }
+
+ }
if ( "Client".equals(target) ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
CLASS_NAME.getName(),
"send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_delivery_to_client_exception__WARNING",
new Object[] { controller.getComponentName(),endpointName });
} else {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
CLASS_NAME.getName(),
"send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_delivery_exception__WARNING",new Object[] {
controller.getComponentName(), key, endpointName});
@@ -1021,8 +1025,8 @@ public class JmsEndpointConnection_impl
}
} else {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
CLASS_NAME.getName(),
- "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_delivery_exception__WARNING",new
Object[] { controller.getComponentName(), "", endpointName});
+ "UimaAsAsyncCallbackListener.onException()",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_delivery_exception_WARNING",new
Object[] { controller.getComponentName(), exception} );
}
}
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
---
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
(original)
+++
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
Thu Dec 20 14:40:58 2018
@@ -19,6 +19,7 @@
package org.apache.uima.adapter.jms.activemq;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -34,12 +35,12 @@ import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import
org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
@@ -54,12 +55,12 @@ import org.apache.uima.aae.jmx.RemoteJMX
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.MessageWrapper;
import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.message.JmsMessageContext;
import org.apache.uima.util.Level;
import org.springframework.jms.listener.SessionAwareMessageListener;
-import org.springframework.jms.support.destination.DestinationResolver;
/**
* Thin adapter for receiving JMS messages from Spring. It delegates
processing of all messages to
@@ -558,6 +559,19 @@ public class JmsInputChannel implements
validEndpoint(messageContext) &&
isReplyRequired((Message)messageContext.getRawMessage()) );
}
+ public void onMessage(MessageWrapper wrapper) {
+ try {
+ onMessage((Message)wrapper.getMessage(),
(Session)wrapper.getSession());
+ } finally {
+ // semaphore is only added by target and process listeners.
The semaphore is used
+ // to throttle work into the service. A JMS thread blocks if
semaphore permits are
+ // exhausted. The blocking is done in PriorityMessageHandler
class. The JmsInputChannel
+ // and PriorityMessageHandler are the only classes operating
on the semaphore.
+ if ( wrapper.getSemaphore() != null ) {
+ wrapper.getSemaphore().release();
+ }
+ }
+ }
/**
* Receives Messages from the JMS Provider. It checks the message header to
determine the type of
* message received. Based on the type, a MessageContext is created to
facilitate access to the
@@ -810,10 +824,11 @@ public class JmsInputChannel implements
}
return ll;
}
- public synchronized void
setListenerContainer(UimaDefaultMessageListenerContainer messageListener) {
- this.messageListener = messageListener;
+ public synchronized void
setListenerContainer(UimaDefaultMessageListenerContainer jmsL) {
+ this.messageListener = jmsL;
System.setProperty("BrokerURI", messageListener.getBrokerUrl());
- if ( messageListener.getMessageSelector() !=null &&
messageListener.getMessageSelector().equals("Command=2001") ) {
+
+ if ( jmsL.isGetMetaListener() ) {
brokerURL = messageListener.getBrokerUrl();
getController().getOutputChannel().setServerURI(brokerURL);
}
@@ -825,8 +840,9 @@ public class JmsInputChannel implements
getController().addInputChannel(this);
messageListener.setController(getController());
} catch (Exception e) {
+ e.printStackTrace();
}
- }
+ }
}
public ActiveMQConnectionFactory getConnectionFactory() {
@@ -1099,6 +1115,66 @@ public class JmsInputChannel implements
new Object[] {
getController().getComponentName(), connector.getEndpointName() });
}
}
+ public void createListenerForTargetedMessages() throws Exception {
+ List<UimaDefaultMessageListenerContainer> listeners =
+ getListeners();
+ // the TargetServiceId property value will become part of a jms
selector.
+ String targetStringSelector = "";
+ if (
System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) != null ) {
+ targetStringSelector =
System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty);
+ } else {
+ // the default selector is IP:PID
+ String ip = InetAddress.getLocalHost().getHostAddress();
+ targetStringSelector = ip+":"+controller.getPID();
+ }
+ // find a listener instance which handles Process requests. The
targeted
+ // listener created here will share a Connection Factory and
ThreadFactory.
+ //
+ for(UimaDefaultMessageListenerContainer listener : listeners ) {
+ // Is this a Process listener instance? Check the selector
+ if (
listener.getMessageSelector().endsWith(UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX)
) {
+ // this will be a dedicated listener which handles targeted
messages
+ UimaDefaultMessageListenerContainer targetedListener
= new UimaDefaultMessageListenerContainer();
+ // setup jms selector
+ if ( getController().isCasMultiplier()) {
+
targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+"
= '"+targetStringSelector+"'
AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000
OR Command=2002)");
+ } else {
+
targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+"
= '"+targetStringSelector+"'
AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000
OR Command=2002)");
+ }
+
+ // use shared ConnectionFactory
+
targetedListener.setConnectionFactory(listener.getConnectionFactory());
+ // mark the listener as a 'Targeted' listener
+ targetedListener.setTargetedListener();
+ targetedListener.setController(getController());
+ // there will only be one AMQ delivery thread. Its job will
be to
+ // add a targeted message to a BlockingQueue. Such thread
will block
+ // in an enqueue if a dequeue is not available. This will be
prevent
+ // the overwhelming the service with messages.
+ targetedListener.setConcurrentConsumers(1);
+ if ( listener.getMessageListener() instanceof
PriorityMessageHandler ) {
+ // the targeted listener will use the same
message handler as the
+ // Process listener. This handler will add a
message wrapper
+ // to enable prioritizing messages.
+
targetedListener.setMessageListener(listener.getMessageListener());
+ }
+ // Same queue as the Process queue
+
targetedListener.setDestination(listener.getDestination());
+ registerListener(targetedListener);
+ targetedListener.afterPropertiesSet();
+ targetedListener.initialize();
+ targetedListener.start();
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
CLASS_NAME.getName(),
+ "createListenerForTargetedMessages",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_TARGET_LISTENER__INFO",
+ new Object[]
{targetedListener.getMessageSelector(), controller.getComponentName() });
+ }
+ break;
+
+ }
+ }
+ }
public void createListener(String aDelegateKey, Endpoint endpointToUpdate)
throws Exception {
if (getController() instanceof AggregateAnalysisEngineController) {
Delegate delegate = ((AggregateAnalysisEngineController) getController())
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
---
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
(original)
+++
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Thu Dec 20 14:40:58 2018
@@ -25,16 +25,16 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.ConnectException;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -74,8 +74,8 @@ import org.apache.uima.aae.jmx.ServicePe
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.aae.monitor.Monitor;
-import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
@@ -83,14 +83,6 @@ import org.apache.uima.cas.impl.XmiSeria
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.util.Level;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import org.apache.uima.resource.ResourceProcessException;
-
-
import com.thoughtworks.xstream.XStream;
public class JmsOutputChannel implements OutputChannel {
@@ -140,6 +132,8 @@ public class JmsOutputChannel implements
private Semaphore connectionSemaphore = new Semaphore(1);
public JmsOutputChannel() {
+ UimaSerializer.initXStream(xstream);
+
try {
if( System.getenv("IP") != null ) {
hostIP = System.getenv("IP");
@@ -364,16 +358,17 @@ public class JmsOutputChannel implements
private void invalidateConnectionAndEndpoints(BrokerConnectionEntry
brokerConnectionEntry ) {
Connection conn = brokerConnectionEntry.getConnection();
try {
- if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
+ if ( conn != null)
+ if ( conn != null && !((ActiveMQConnection)conn).isClosed()) {
for (Entry<Object, JmsEndpointConnection_impl> endpoints :
brokerConnectionEntry.endpointMap
.entrySet()) {
endpoints.getValue().close(); // close session and producer
}
- brokerConnectionEntry.getConnection().stop();
brokerConnectionEntry.getConnection().close();
brokerConnectionEntry.setConnection(null);
}
} catch (Exception e) {
+ e.printStackTrace();
// Ignore this for now. Attempting to close connection that has been
closed
// Ignore we are shutting down
} finally {
@@ -648,7 +643,6 @@ public class JmsOutputChannel implements
connectionSemaphore.release();
}
- //System.out.println("+++++++++++++++++++++ ConnectionMap
Size:"+connectionMap.size());
return endpointConnection;
}
@@ -850,10 +844,21 @@ public class JmsOutputChannel implements
// If this service is a Cas Multiplier add to the message a
FreeCasQueue.
// The client may need send Stop request to that queue.
if (aCommand == AsynchAEMessage.ServiceInfo
- && getAnalysisEngineController().isCasMultiplier() &&
freeCASTempQueue != null) {
- // Attach a temp queue to the outgoing message. This a queue
where
- // Free CAS notifications need to be sent from the client
- tm.setJMSReplyTo(freeCASTempQueue);
+ && getAnalysisEngineController().isCasMultiplier() ) {
+ if ( freeCASTempQueue != null ) {
+ // Attach a temp queue to the outgoing message.
This a queue where
+ // Free CAS notifications need to be sent from the
client
+ tm.setJMSReplyTo(freeCASTempQueue);
+ }
+ // new services will receive FreeCas request via a targeted
queue
+ StringBuffer selector = new StringBuffer().
+ append("TargetServiceId = ").
+ append("'").append(hostIP).append(":").
+
append(getAnalysisEngineController().getPID()).
+ append("' AND").
+
append(UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);
+
tm.setStringProperty(AsynchAEMessage.TargetingSelector,selector.toString());
+
}
// Check if there was a failure while sending a message
if ( !endpointConnection.send(tm, 0, false,
notifyOnJmsException) && notifyOnJmsException ) {
@@ -1387,7 +1392,7 @@ public class JmsOutputChannel implements
*/
private void populateHeaderWithRequestContext(Message aMessage, Endpoint
anEndpoint, int aCommand)
throws Exception {
- aMessage.setIntProperty(AsynchAEMessage.MessageType,
AsynchAEMessage.Request);
+ aMessage.setIntProperty(AsynchAEMessage.MessageType,
AsynchAEMessage.Request);
aMessage.setIntProperty(AsynchAEMessage.Command, aCommand);
// TODO override default based on system property
aMessage.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
---
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
(original)
+++
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Thu Dec 20 14:40:58 2018
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -38,6 +39,9 @@ import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
import javax.jms.TemporaryQueue;
import org.apache.activemq.ActiveMQConnection;
@@ -46,8 +50,11 @@ import org.apache.activemq.command.Activ
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.UimaAsPriorityBasedThreadFactory;
//import org.apache.uima.aae.UimaASCredentials;
import org.apache.uima.aae.UimaAsThreadFactory;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+//import org.apache.uima.aae.UimaAsThreadFactory.UsedFor;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import
org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
@@ -57,21 +64,30 @@ import org.apache.uima.aae.delegate.Dele
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.Threshold;
import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageWrapper;
import org.apache.uima.adapter.jms.JmsConstants;
+import
org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Level;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.JmsException;
import org.springframework.jms.listener.AbstractJmsListeningContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class UimaDefaultMessageListenerContainer extends
DefaultMessageListenerContainer implements
ExceptionListener {
- private static final Class CLASS_NAME =
UimaDefaultMessageListenerContainer.class;
-
+ private static final Class<?> CLASS_NAME =
UimaDefaultMessageListenerContainer.class;
+ public static final String PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR
Command=2002)";
+ public static final String CM_PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR
Command=2002 OR Command=2005)";
+ public static final String GETMETA_SELECTOR_SUFFIX = "(Command=2001)";
+
+ public static final int HIGH_PRIORITY = 9;
+
private String destinationName = "";
private Endpoint endpoint;
@@ -125,6 +141,10 @@ public class UimaDefaultMessageListenerC
private volatile boolean logListenerFailure=true;
private static CountDownLatch recoveryLatch = new CountDownLatch(4);
+ // indicates if this listener is dedicated to pull targeted messages which
are
+ // messages with a selector.
+ private boolean targetedListener = false;
+
public UimaDefaultMessageListenerContainer() {
super();
// reset global static. This only effects unit testing as services are
deployed
@@ -143,12 +163,28 @@ public class UimaDefaultMessageListenerC
this();
this.freeCasQueueListener = freeCasQueueListener;
}
+
+ public void setTargetedListener() {
+ targetedListener = true;
+ }
+ private static boolean connectionClosedOrFailed(ActiveMQConnection
connection) {
+ if (connection == null
+ || connection.isClosed()
+ || connection.isClosing()
+ || connection.isTransportFailed()) {
+ return true;
+ }
+ return false;
+ }
+
/**
* Overriden Spring's method that tries to recover from lost connection. We
dont
* want to recover when the service is stopping.
*/
protected void refreshConnectionUntilSuccessful() {
- boolean doLogFailureMsg = true;
+ // System.out.println("............refreshConnectionUntilSuccessful()
called");
+
+ boolean doLogFailureMsg = true;
try {
// Only one listener thread should enter to recover lost connection.
// Seems like spring recovery api is not reentrant. If multiple
listeners
@@ -156,6 +192,16 @@ public class UimaDefaultMessageListenerC
// on observing jconsole attached to uima-as service with multiple
listeners
// on an endpoint.
synchronized(UimaDefaultMessageListenerContainer.class ) {
+ ActiveMQConnection c =
null;//(ActiveMQConnection)super.getSharedConnection();
+ try {
+ c = (ActiveMQConnection)super.getSharedConnection();
+ } catch( SharedConnectionNotInitializedException ee) {
+ // the onnectionClosedOrFailed(c) below will test for
null
+ }
+ if ( !connectionClosedOrFailed(c) ) {
+ //System.out.println(".............
Thread:"+Thread.currentThread().getId()+" Connection restored - returning");
+ return;
+ }
while (isRunning() && !terminating ) {
Connection tcon = null;
try {
@@ -438,6 +484,7 @@ public class UimaDefaultMessageListenerC
* @param t
*/
private void handleQueueFailure(Throwable t) {
+ // System.out.println("............handleQueueFailure() called");
final String endpointName = (getDestination() == null) ? ""
: ((ActiveMQDestination) getDestination()).getPhysicalName();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -556,6 +603,10 @@ public class UimaDefaultMessageListenerC
* This method is called by Spring when a listener fails
*/
protected void handleListenerSetupFailure(Throwable t, boolean
alreadyHandled) {
+ if ( t.getCause() instanceof InterruptedException ) {
+// System.out.println("............handleListenerFailure(Throwable t,
boolean alreadyHandled) called - Cause:"+t);
+ return;
+ }
// If shutdown already, nothing to do
// If controller is stopping no need to recover the connection
if (awaitingShutdown || terminating || (controller != null &&
controller.isStopped()) ) {
@@ -646,10 +697,13 @@ public class UimaDefaultMessageListenerC
}
protected void handleListenerException(Throwable t) {
+ // System.out.println("............handleListenerException(Throwable
t)");
+
// Already shutdown, nothing to do
if (awaitingShutdown) {
return;
}
+ /*
String endpointName = (getDestination() == null) ? ""
: ((ActiveMQDestination) getDestination()).getPhysicalName();
@@ -659,6 +713,7 @@ public class UimaDefaultMessageListenerC
"UIMAJMS_jms_listener_failed_WARNING",
new Object[] { endpointName, getBrokerUrl(), t });
}
+ */
super.handleListenerException(t);
}
@@ -676,15 +731,14 @@ public class UimaDefaultMessageListenerC
super.setConnectionFactory(connectionFactory);
}
- private void injectTaskExecutor() {
- super.setTaskExecutor(taskExecutor);
- }
- private boolean isGetMetaListener() {
+ public boolean isGetMetaListener() {
+
return getMessageSelector() != null
- && __listenerRef.getMessageSelector().equals("Command=2001");
+ &&
__listenerRef.getMessageSelector().endsWith(GETMETA_SELECTOR_SUFFIX);
+// && __listenerRef.getMessageSelector().endsWith("(Command=2001)");
}
-
+
private boolean isActiveMQDestination() {
return getDestination() != null && getDestination() instanceof
ActiveMQDestination;
}
@@ -728,8 +782,10 @@ public class UimaDefaultMessageListenerC
**/
public void setMessageListener(Object messageListener) {
ml = messageListener;
- if (this.freeCasQueueListener) {
+ if (this.freeCasQueueListener || targetedListener ) {
super.setMessageListener(messageListener);
+ } else if ( endpoint != null && endpoint.isTempReplyDestination()) {
+ super.setMessageListener(messageListener);
}
}
public void afterPropertiesSet() {
@@ -783,13 +839,18 @@ public class UimaDefaultMessageListenerC
pluginThreadPool = true;
}
} else {
- super.setConcurrentConsumers(cc);
+ super.setConcurrentConsumers(1);
+ //if ( targetedListener ) {
+ //super.setConcurrentConsumers(1);
+ //} else {
+ // super.setConcurrentConsumers(cc);
+ //}
pluginThreadPool = true;
}
Thread t = new Thread(threadGroup, new Runnable() {
public void run() {
Destination destination = __listenerRef.getDestination();
- try {
+ try {
// Wait until the connection factory is injected by Spring
while (connectionFactory == null) {
try {
@@ -816,7 +877,7 @@ public class UimaDefaultMessageListenerC
}
// Plug in connection Factory to Spring's Listener
__listenerRef.injectConnectionFactory();
-
+
if ( pluginThreadPool ) {
setUimaASThreadPoolExecutor(cc);
}
@@ -824,13 +885,13 @@ public class UimaDefaultMessageListenerC
// Initialize the TaskExecutor. This call injects a custom Thread
Pool into the
// TaskExecutor provided in the spring xml. The custom thread pool
initializes
// an instance of AE in a dedicated thread
- if ( getMessageSelector() != null && !isGetMetaListener()) {
+ if ( !isGetMetaListener()) {
initializeTaskExecutor(cc);
}
- if ( threadPoolExecutor == null ) {
- // Plug in TaskExecutor to Spring's Listener
- __listenerRef.injectTaskExecutor();
- }
+// if ( threadPoolExecutor == null ) {
+// // Plug in TaskExecutor to Spring's Listener
+// __listenerRef.injectTaskExecutor();
+// }
if ( propagate ) {
// Notify Spring Listener that all properties are ready
__listenerRef.allPropertiesSet();
@@ -863,7 +924,6 @@ public class UimaDefaultMessageListenerC
"UIMAJMS_listener_ready__INFO",
new Object[] {controller.getComponentName(),
getBrokerUrl(), getDestination() });
}
-
} catch (Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
this.getClass().getName(),
@@ -892,7 +952,7 @@ public class UimaDefaultMessageListenerC
// by Spring on a different thread
while ((((JmsInputChannel) pojoListener).getController()) == null) {
try {
- Thread.currentThread().sleep(50);
+ Thread.currentThread().wait(50);
} catch (Exception e) {
}
}
@@ -956,7 +1016,7 @@ public class UimaDefaultMessageListenerC
public void closeConnection() throws Exception {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- String msg = ".................... ection() Called";
+ String msg = ".................... closeConnection() Called";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST,
CLASS_NAME.getName(), "closeConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
@@ -1041,6 +1101,7 @@ public class UimaDefaultMessageListenerC
if (awaitingShutdown) {
return;
}
+
String endpointName = (getDestination() == null) ? ""
: ((ActiveMQDestination) getDestination()).getPhysicalName();
@@ -1081,13 +1142,49 @@ public class UimaDefaultMessageListenerC
public void doDestroy() {
super.destroy();
}
+ /**
+ * Called by Spring
+ */
public void setMessageSelector( String messageSelector) {
+ if (
!messageSelector.startsWith(UimaAsynchronousEngine.TargetSelectorProperty)) {
+ messageSelector =
UimaAsynchronousEngine.TargetSelectorProperty+" is NULL
AND("+messageSelector+")";
+ }
+ //this.doInvokeListene
super.setMessageSelector(messageSelector);
// turn off auto startup. Selectors are only used on input queues. We dont
// want listeners on this queue to start now. Once the service
initializes
// we will start listeners on input queue.
this.setAutoStartup(false);
}
+ private boolean isProcessListener() {
+ return getMessageSelector().endsWith(PROCESS_SELECTOR_SUFFIX);
+ }
+ /**
+ * Callback called by Spring when it receives a messages. Its purpose is to
assign high
+ * priority for targeted messages.
+ */
+ @SuppressWarnings("unchecked")
+ protected void doInvokeListener(@SuppressWarnings("rawtypes")
SessionAwareMessageListener l, Session s, Message m) {
+ try {
+ if ( targetedListener ) {
+ m.setJMSPriority(HIGH_PRIORITY);
+ m.setJMSType("TargetMessage");
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST,
CLASS_NAME.getName(), "doInvokeListener",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_MSG_INTERCEPTOR__FINEST",
+ new Object[] {controller.getComponentName(),
m.getJMSPriority(), getMessageSelector() });
+
+ }
+ }
+ //System.out.println("................ doInvokeListener() -
Thread ID:"+Thread.currentThread().getId()+" Listener
Class:"+l.getClass().getName()+" Controller:"+controller.getComponentName());
+ l.onMessage(m, s);
+ } catch( Throwable t ) {
+ t.printStackTrace();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
this.getClass().getName(),
+ "destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", t);
+ }
+ }
public void shutdownTaskExecutor(ThreadPoolExecutor tpe, boolean
stopImmediate) throws InterruptedException {
tpe.awaitTermination(50, TimeUnit.MILLISECONDS);
@@ -1134,6 +1231,10 @@ public class UimaDefaultMessageListenerC
amqc.stop();
}
awaitingShutdown = true;
+
+ if ( getMessageListener() instanceof PriorityMessageHandler ) {
+
((PriorityMessageHandler)getMessageListener()).getQueue().put(new
MessageWrapper(null, null, null,HIGH_PRIORITY));
+ }
if (taskExecutor != null && taskExecutor instanceof
ThreadPoolTaskExecutor) {
// Modify task executor to terminate idle threads. While
the thread terminates
// it calls destroy() method on the pinned instance of AE
@@ -1159,6 +1260,12 @@ public class UimaDefaultMessageListenerC
shutdownTaskExecutor(threadPoolExecutor, true);
}
}
+ if ( getTaskExecutor() != null ) {
+
+ if ( getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
+
((ThreadPoolTaskExecutor)getTaskExecutor()).shutdown();
+ }
+ }
String controllerName = (__listenerRef.controller == null) ? ""
:__listenerRef.controller.getComponentName();
__listenerRef.shutdown();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1228,32 +1335,105 @@ public class UimaDefaultMessageListenerC
}
- private void setUimaASThreadPoolExecutor(int consumentCount) throws
Exception{
- super.setMessageListener(ml);
+ private void setUimaASThreadPoolExecutor(int consumerCount) throws Exception
{
+ if ( isGetMetaListener() ) {
+ super.setMessageListener(ml);
+ } else if ( isFreeCasQueueListener()) {
+ super.setMessageListener(ml);
+ } else if (endpoint != null && endpoint.isTempReplyDestination()) {
+ super.setMessageListener(ml);
+ } else {
+ if ( isProcessListener() ) { //controller != null &&
controller instanceof PrimitiveAnalysisEngineController ) {
+
+ // Singleton handler shared by Process CAS listener and a
targeted listener. The handler
+ // onMessage() is called by Spring when a message with a
matching selector is available.
+ // When onMessage() is called, it adds a message to the
Priority Queue
+ // PriorityMessageHandler h =
PriorityMessageHandler.getInstance();
+ // System.out.println("+++++++++++++
Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+"
Selector:"+super.getMessageSelector());
+
+ //super.setMessageListener(h);
+ // targeted listener should not have its own thread pool
because it needs to use
+ // threads created by the Process Cas Listener. Each of these
threads is pinned to
+ // a dedicated AE initialized at startup. Contract says that
each AE process() will be called
+ // on the same thread that initialized it. The targeted
listener and process listener share
+ // the same handler where CASes are pushed onto a Blocking
Priority Queue for processing.
+ if (!targetedListener &&
!isFreeCasQueueListener()) {
+ // System.out.println(">>>>>>>>>>>
Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+"
Selector:"+super.getMessageSelector());
+ PriorityMessageHandler h = new
PriorityMessageHandler(cc);
+
+ super.setMessageListener(h);
+ try {
+ while
(controller.getInputChannel() == null) {
+ synchronized (h) {
+ h.wait(100);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ //if (isPrimitiveService()) {
+
latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount);
+ // Create a Custom Thread
Factory. Provide it with an instance of
+ // PrimitiveController so that
every thread can call it to initialize
+ // the next available instance
of a AE.
+ tf = new
UimaAsPriorityBasedThreadFactory(threadGroup,
+ controller,
latchToCountNumberOfTerminatedThreads);
+
//(PrimitiveAnalysisEngineController) controller,
latchToCountNumberOfTerminatedThreads);
+
((UimaAsPriorityBasedThreadFactory) tf).withQueue(h.getQueue())
+
.withChannel(controller.getInputChannel());
+
+
((UimaAsPriorityBasedThreadFactory) tf).setDaemon(true);
+ if ( taskExecutor == null ) {
// true for aggregates
+ taskExecutor = new
ThreadPoolTaskExecutor();
+ }
+ // This ThreadExecutor will use
custom thread factory instead of defult one
+ ((ThreadPoolTaskExecutor)
taskExecutor).setThreadFactory(tf);
+ // Initialize the thread pool
+ ((ThreadPoolTaskExecutor)
taskExecutor).initialize();
+ // Make sure all threads are
started. This forces each thread to call
+ // PrimitiveController to
initialize the next instance of AE
+ ((ThreadPoolTaskExecutor)
taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
+// } else {
+//
+// }
+ }
+ } else {
+ if ( controller.getInputChannel() == null ) {
+ //System.out.println("............. Error -
JmsInputChannel not set yet...");
+ } else if ( !(controller.getInputChannel() instanceof
MessageListener ||
+ controller.getInputChannel()
instanceof SessionAwareMessageListener) ) {
+ //System.out.println("............. Error -
wrong MessageListener type - Getting
this:"+controller.getInputChannel().getClass().getName());
+ }
+
super.setMessageListener(controller.getInputChannel());
+ }
+ }
// create task executor with custom thread pool for:
// 1) GetMeta request processing
// 2) ReleaseCAS request
- if ( taskExecutor == null ) {
- UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
- tf.setDaemon(false);
+ if ( !targetedListener && taskExecutor == null ) {
+ UimaAsThreadFactory utf = new UimaAsThreadFactory(threadGroup);
+ utf.setDaemon(false);
+// tf.defineUsageAs(UsedFor.GetMetaHandling);//setForGetMetaHandling();
if ( isFreeCasQueueListener()) {
- tf.setThreadNamePrefix(controller.getComponentName()+" -
FreeCASRequest Thread");
+ utf.setThreadNamePrefix(controller.getComponentName()+" -
FreeCASRequest Thread");
} else if ( isGetMetaListener() ) {
- tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+ utf.setThreadNamePrefix(super.getBeanName()+" - Thread");
} else if ( getDestination() != null && getMessageSelector() != null ) {
- tf.setThreadNamePrefix(controller.getComponentName() + " Process
Thread");
+ utf.setThreadNamePrefix(controller.getComponentName() + " Process
Thread");
} else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
- tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+ utf.setThreadNamePrefix(super.getBeanName()+" - Thread");
} else {
throw new Exception("Unknown Context Detected in
setUimaASThreadPoolExecutor()");
}
- ExecutorService es = Executors.newFixedThreadPool(consumentCount,tf);
+ ExecutorService es = Executors.newFixedThreadPool(consumerCount,utf);
if ( es instanceof ThreadPoolExecutor ) {
threadPoolExecutor = (ThreadPoolExecutor)es;
super.setTaskExecutor(es);
}
- } else {
+ }
+ /*
+ else {
UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
tf.setDaemon(true);
if ( isFreeCasQueueListener()) {
@@ -1269,8 +1449,13 @@ public class UimaDefaultMessageListenerC
}
}
+ */
}
+ private boolean isPrimitiveService() {
+ return controller != null && controller instanceof
PrimitiveAnalysisEngineController &&
+ controller.getInputChannel() != null;
+ }
/**
* Called by Spring to inject TaskExecutor
@@ -1296,30 +1481,14 @@ public class UimaDefaultMessageListenerC
if (controller instanceof PrimitiveAnalysisEngineController) {
// in case the taskExecutor is not plugged in yet, wait until one
// becomes available. The TaskExecutor is plugged in by Spring
- synchronized (mux2) {
- while (taskExecutor == null) {
- mux2.wait(20);
- }
- }
- latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumers);
- // Create a Custom Thread Factory. Provide it with an instance of
- // PrimitiveController so that every thread can call it to initialize
- // the next available instance of a AE.
- tf = new UimaAsThreadFactory(threadGroup,
(PrimitiveAnalysisEngineController) controller,
latchToCountNumberOfTerminatedThreads);
- ((UimaAsThreadFactory)tf).setDaemon(true);
- // This ThreadExecutor will use custom thread factory instead of defult
one
- ((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
- // Initialize the thread pool
- ((ThreadPoolTaskExecutor) taskExecutor).initialize();
- // Make sure all threads are started. This forces each thread to call
- // PrimitiveController to initialize the next instance of AE
- ((ThreadPoolTaskExecutor)
taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
- // Change the state of a collocated service
- if ( !controller.isTopLevelComponent() ) {
- controller.changeState(ServiceState.RUNNING);
+ if ( !targetedListener && !isFreeCasQueueListener() ) {
+ synchronized (mux2) {
+ while (taskExecutor == null) {
+ mux2.wait(20);
+ }
+ }
}
}
-
if ( threadPoolExecutor != null ) {
threadPoolExecutor.prestartAllCoreThreads();
}
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
---
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
(original)
+++
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
Thu Dec 20 14:40:58 2018
@@ -24,191 +24,400 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.UimaMessageValidator;
import org.apache.uima.adapter.jms.JmsConstants;
+import
org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
import
org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
+import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.SerialFormat;
import org.apache.uima.util.Level;
+import org.apache.uima.util.impl.ProcessTrace_impl;
/**
- * Initializes JMS session and creates JMS MessageProducer to be used for
sending messages to a
- * given destination. It extends BaseMessageSender which starts the worker
thread and is tasked with
- * sending messages. The application threads share a common 'queue' with the
worker thread. The
- * application threads add messages to the pendingMessageList 'queue' and the
worker thread consumes
- * them.
+ * Initializes JMS session and creates JMS MessageProducer to be used for
+ * sending messages to a given destination. It extends BaseMessageSender which
+ * starts the worker thread and is tasked with sending messages. The
application
+ * threads share a common 'queue' with the worker thread. The application
+ * threads add messages to the pendingMessageList 'queue' and the worker thread
+ * consumes them.
*
*/
public class ActiveMQMessageSender extends BaseMessageSender {
- private static final Class CLASS_NAME = ActiveMQMessageSender.class;
+ private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
- private volatile Connection connection = null;
+ private volatile Connection connection = null;
- private Session session = null;
+ private Session session = null;
- private MessageProducer producer = null;
+ private MessageProducer producer = null;
- private String destinationName = null;
-
- private ConcurrentHashMap<Destination, MessageProducer> producerMap = new
ConcurrentHashMap<Destination, MessageProducer>();
-
- public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
- BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
- super(engine);
- connection = aConnection;
- destinationName = aDestinationName;
- }
-
- public synchronized MessageProducer getMessageProducer(Destination
destination) throws Exception {
- if (producerMap.containsKey(destination)) {
- return (MessageProducer) producerMap.get(destination);
- }
- createSession();
- MessageProducer mProducer = session.createProducer(destination);
- mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producerMap.put(destination, mProducer);
- return mProducer;
- }
- /**
- * This is called when a new Connection is created after broker is restarted
- */
- public void setConnection(Connection aConnection) {
- connection = aConnection;
- cleanup();
- try {
- initializeProducer();
- } catch( Exception e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
getClass().getName(),
- "setConnection", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_exception__WARNING", e);
- }
- }
-
- }
- private String getBrokerURL() {
- try {
- return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
- } catch (Exception ex) { /* handle silently. */
- }
- return "";
- }
-
- private void createSession() throws Exception {
- String broker = getBrokerURL();
- try {
- if (session == null || engine.producerInitialized == false) {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- } catch (JMSException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
CLASS_NAME.getName(),
- "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_failed_creating_session_INFO",
- new Object[] { destinationName, broker });
- }
- if (connection == null) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
CLASS_NAME.getName(),
- "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_connection_not_ready_INFO", new Object[] {
broker });
- }
- } else if (((ActiveMQConnection) connection).isClosed()
- || ((ActiveMQConnection) connection).isClosing()) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME)
- .logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_connection_closed_INFO",
- new Object[] { destinationName, broker });
- }
- }
- throw e;
- } catch (Exception e) {
- throw e;
- }
- }
-
- /**
- * Creates a jms session object used to instantiate message producer
- */
- protected void initializeProducer() throws Exception {
- createSession();
- producer = getMessageProducer(session.createQueue(destinationName));
- }
-
- /**
- * Returns the full name of the destination queue
- */
- protected String getDestinationEndpoint() throws Exception {
- return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
- }
-
- /**
- * Returns jsm MessageProducer
- */
- public MessageProducer getMessageProducer() {
- if ( engine.running && engine.producerInitialized == false ) {
- try {
- SharedConnection con = engine.lookupConnection(getBrokerURL());
- if ( con != null ) {
- setConnection(con.getConnection());
- initializeProducer();
- engine.producerInitialized = true;
- }
- } catch( Exception e) {
- e.printStackTrace();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
getClass().getName(),
- "getMessageProducer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_exception__WARNING", e);
- }
- }
- }
- return producer;
- }
-
- public TextMessage createTextMessage() throws Exception {
- if (session == null) {
- // Force initialization of Producer
- initializeProducer();
- }
- return session.createTextMessage("");
- }
-
- public BytesMessage createBytesMessage() throws Exception {
- if (session == null) {
- // Force initialization of Producer
- initializeProducer();
- }
- return session.createBytesMessage();
- }
-
- /**
- * Cleanup any jms resources used by the worker thread
- */
- protected void cleanup() {
- try {
- if (session != null) {
- session.close();
- session = null;
- }
- if (producer != null) {
- producer.close();
- producer = null;
- }
- } catch (Exception e) {
- // Ignore we are shutting down
- } finally {
- producerMap.clear();
- }
- }
+ private String destinationName = null;
+
+ private ConcurrentHashMap<Destination, MessageProducer> producerMap =
new ConcurrentHashMap<Destination, MessageProducer>();
+
+ public ActiveMQMessageSender(Connection aConnection, String
aDestinationName,
+ BaseUIMAAsynchronousEngineCommon_impl engine) throws
Exception {
+ super(engine);
+ connection = aConnection;
+ destinationName = aDestinationName;
+ }
+
+ public synchronized MessageProducer getMessageProducer(Destination
destination) throws Exception {
+ if (producerMap.containsKey(destination)) {
+ return (MessageProducer) producerMap.get(destination);
+ }
+ createSession();
+ MessageProducer mProducer = session.createProducer(destination);
+ mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producerMap.put(destination, mProducer);
+ return mProducer;
+ }
+
+ /**
+ * This is called when a new Connection is created after broker is
restarted
+ */
+ public void setConnection(Connection aConnection) {
+ connection = aConnection;
+ cleanup();
+ try {
+ initializeProducer();
+ } catch (Exception e) {
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"setConnection",
+
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ }
+
+ }
+
+ private String getBrokerURL() {
+ try {
+ return ((ActiveMQConnection)
connection).getBrokerInfo().getBrokerURL();
+ } catch (Exception ex) { /* handle silently. */
+ }
+ return "";
+ }
+
+ private void createSession() throws Exception {
+ String broker = getBrokerURL();
+ try {
+ if (session == null || engine.producerInitialized ==
false) {
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ }
+ } catch (JMSException e) {
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"createSession",
+
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_failed_creating_session_INFO",
+ new Object[] { destinationName,
broker });
+ }
+ if (connection == null) {
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"createSession",
+
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_connection_not_ready_INFO",
+ new Object[] { broker
});
+ }
+ } else if (((ActiveMQConnection) connection).isClosed()
|| ((ActiveMQConnection) connection).isClosing()) {
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"createSession",
+
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO",
+ new Object[] {
destinationName, broker });
+ }
+ }
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ /**
+ * Returns the full name of the destination queue
+ */
+ protected String getDestinationEndpoint() throws Exception {
+ return ((ActiveMQDestination)
producer.getDestination()).getPhysicalName();
+ }
+
+ /**
+ * Creates a jms session object used to instantiate message producer
+ */
+ protected void initializeProducer() throws Exception {
+ createSession();
+ producer =
getMessageProducer(session.createQueue(destinationName));
+ }
+
+ /**
+ * Returns jsm MessageProducer
+ */
+ public MessageProducer getMessageProducer() {
+ if (engine.running && engine.producerInitialized == false) {
+ try {
+ SharedConnection con =
engine.lookupConnection(getBrokerURL());
+ if (con != null) {
+ setConnection(con.getConnection());
+ initializeProducer();
+ engine.producerInitialized = true;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"getMessageProducer",
+
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ }
+ }
+ return producer;
+ }
+
+ public TextMessage createTextMessage() throws Exception {
+ synchronized (ActiveMQMessageSender.class) {
+ if (session == null) {
+ // Force initialization of Producer
+ initializeProducer();
+ }
+ // return session.createTextMessage("");
+ TextMessage msg = null;
+ try {
+ msg = session.createTextMessage("");
+ } catch (IllegalStateException e) {
+ // stale Session
+ session = null;
+ initializeProducer();
+ msg = session.createTextMessage("");
+ }
+ return msg;
+ }
+
+ }
+
+ public BytesMessage createBytesMessage() throws Exception {
+ synchronized (ActiveMQMessageSender.class) {
+ if (session == null) {
+ // Force initialization of Producer
+ initializeProducer();
+ }
+ BytesMessage msg = null;
+ try {
+ msg = session.createBytesMessage();
+ } catch (IllegalStateException e) {
+ // stale Session
+ session = null;
+ initializeProducer();
+ msg = session.createBytesMessage();
+ }
+ return msg;
+ }
+
+ // return session.createBytesMessage();
+ }
+
+ /**
+ * Cleanup any jms resources used by the worker thread
+ */
+ protected void cleanup() {
+ try {
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ if (producer != null) {
+ producer.close();
+ producer = null;
+ }
+ } catch (Exception e) {
+ // Ignore we are shutting down
+ } finally {
+ producerMap.clear();
+ }
+ }
+
+ protected void dispatchMessage(PendingMessage pm,
BaseUIMAAsynchronousEngineCommon_impl engine,
+ boolean casProcessRequest) throws Exception {
+ SharedConnection sc =
engine.lookupConnection(engine.getBrokerURI());
+ ClientRequest cacheEntry = null;
+ boolean doCallback = false;
+ boolean addTimeToLive = true;
+ Session jmsSession = null;
+
+ // Check the environment for existence of NoTTL tag. If present,
+ // the deployer of the service wants to disable message
expiration.
+ if (System.getProperty("NoTTL") != null) {
+ addTimeToLive = false;
+ }
+ try {
+ // long t1 = System.currentTimeMillis();
+ jmsSession = sc.getConnection().createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Request JMS Message from the concrete implementation
+ Message message = null;
+ // Determine if this a CAS Process Request
+ // boolean casProcessRequest = isProcessRequest(pm);
+ // Only Process request can be serialized as binary
+ if (casProcessRequest && (engine.getSerialFormat() !=
SerialFormat.XMI)) {
+ message = jmsSession.createBytesMessage();
+ } else {
+ message = jmsSession.createTextMessage();
+ }
+ // get the producer initialized from a valid connection
+ // producer = getMessageProducer();
+
+ Destination d = null;
+ String selector = null;
+ // UIMA-AS ver 2.10.0 + sends Free Cas request to a
service targeted queue
+ // instead of a temp queue. Regular queues can be
recovered in case of
+ // a broker restart. The test below will be true for
UIMA-AS v. 2.10.0 +.
+ // Code in JmsOutputChannel will add the selector if
the service is a CM.
+ if (pm.get(AsynchAEMessage.TargetingSelector) != null) {
+ selector = (String)
pm.get(AsynchAEMessage.TargetingSelector);
+ }
+ if (selector == null && (pm.getMessageType() ==
AsynchAEMessage.ReleaseCAS
+ || pm.getMessageType() ==
AsynchAEMessage.Stop)) {
+ d = (Destination)
pm.get(AsynchAEMessage.Destination);
+
+ } else {
+ d = jmsSession.createQueue(destinationName);
+ }
+ MessageProducer mProducer =
jmsSession.createProducer(d);
+ mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ // System.out.println(">>>>>>> Time to create and
initialize JMS
+ // Sesssion:"+(System.currentTimeMillis()-t1));
+ super.initializeMessage(pm, message);
+ String destination = ((ActiveMQDestination)
d).getPhysicalName();
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"run",
+
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+ new Object[] {
+
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+
message.getIntProperty(AsynchAEMessage.Command)),
+
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+
message.getIntProperty(AsynchAEMessage.MessageType)),
+ destination });
+ }
+ if (casProcessRequest) {
+ cacheEntry = (ClientRequest)
engine.getCache().get(pm.get(AsynchAEMessage.CasReference));
+ if (cacheEntry != null) {
+ // CAS cas = cacheEntry.getCAS();
+ // enable logging
+ if
(System.getProperty("UimaAsCasTracking") != null) {
+
message.setStringProperty("UimaAsCasTracking", "enable");
+ }
+ // Target specific service instance if
targeting for the CAS is provided
+ // by the client application
+ if (cacheEntry.getTargetServiceId() !=
null) {
+ //
System.out.println("------------Client Sending CAS to Service Instance With
+ //
Id:"+cacheEntry.getTargetServiceId());;
+
message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
+
cacheEntry.getTargetServiceId());
+ }
+ // Use Process Timeout value for the
time-to-live property in the
+ // outgoing JMS message. When this time
is exceeded
+ // while the message sits in a queue,
the JMS Server will remove it from
+ // the queue. What happens with the
expired message depends on the
+ // configuration. Most JMS Providers
create a special dead-letter queue
+ // where all expired messages are
placed. NOTE: In ActiveMQ expired msgs in the
+ // DLQ
+ // are not auto evicted yet and
accumulate taking up memory.
+ long timeoutValue =
cacheEntry.getProcessTimeout();
+
+ if (timeoutValue > 0 && addTimeToLive) {
+ // Set high time to live value
+ message.setJMSExpiration(10 *
timeoutValue);
+ }
+ if (pm.getMessageType() ==
AsynchAEMessage.Process) {
+
cacheEntry.setCASDepartureTime(System.nanoTime());
+ }
+
cacheEntry.setCASDepartureTime(System.nanoTime());
+
+ doCallback = true;
+
+ } else {
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"run",
+
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING",
+ new Object[] {
pm.get(AsynchAEMessage.CasReference),
+
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+
message.getIntProperty(AsynchAEMessage.Command)),
+
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+
message.getIntProperty(AsynchAEMessage.MessageType)),
+
destination });
+ }
+ return; // no cacheEntry so just return
+ }
+
+ }
+ // start timers
+ if (casProcessRequest) {
+ CAS cas = cacheEntry.getCAS();
+
+ // Add the cas to a list of CASes pending
reply. Also start the timer if
+ // necessary
+
engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(),
cas.hashCode(),
+ engine.timerPerCAS); //
true=timer per cas
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"sendCAS",
+
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE",
+ new Object[] {
cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()),
+
engine.serviceDelegate.toString() });
+ }
+
+ } else if (pm.getMessageType() ==
AsynchAEMessage.GetMeta
+ &&
engine.serviceDelegate.getGetMetaTimeout() > 0) {
+ // timer for PING has been started in sendCAS()
+ if
(!engine.serviceDelegate.isAwaitingPingReply()) {
+
engine.serviceDelegate.startGetMetaRequestTimer();
+ }
+ } else {
+ doCallback = false; // dont call
onBeforeMessageSend() callback on CPC
+ }
+ // Dispatch asynchronous request to Uima AS service
+ mProducer.send(message);
+
+ if (doCallback) {
+ UimaASProcessStatus status = new
UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(),
+ cacheEntry.getCasReferenceId());
+ // Notify engine before sending a message
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"run",
+
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_calling_onBeforeMessageSend__FINE",
+ new Object[] {
pm.get(AsynchAEMessage.CasReference),
+
String.valueOf(cacheEntry.getCAS().hashCode()) });
+ }
+ // Note the callback is a misnomer. The
callback is made *after* the send now
+ // Application receiving this callback can
consider the CAS as delivere to a
+ // queue
+ engine.onBeforeMessageSend(status);
+
+ }
+ } catch( Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING))
{
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
getClass().getName(),
+ "run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
+ } finally {
+ if (jmsSession != null) {
+ try {
+ jmsSession.close();
+ } catch (Exception eee) {
+
+ }
+ }
+ }
+
+ }
}
\ No newline at end of file
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
---
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
(original)
+++
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
Thu Dec 20 14:40:58 2018
@@ -78,6 +78,7 @@ import org.apache.uima.adapter.jms.JmsCo
import org.apache.uima.adapter.jms.activemq.ConnectionFactoryIniter;
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
+import org.apache.uima.adapter.jms.message.PendingMessage;
import org.apache.uima.adapter.jms.service.Dd2spring;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
@@ -99,7 +100,7 @@ public class BaseUIMAAsynchronousEngine_
implements UimaAsynchronousEngine, MessageListener,
ControllerCallbackListener, ApplicationListener<ApplicationEvent>{
private static final Class CLASS_NAME =
BaseUIMAAsynchronousEngine_impl.class;
- private MessageSender sender = null;
+ private ActiveMQMessageSender sender = null;
private MessageProducer producer;
@@ -137,6 +138,9 @@ public class BaseUIMAAsynchronousEngine_
protected static Lock globalLock = new ReentrantLock();
+ //private String serviceTargetSelector = null;
+
+ protected volatile boolean stopped = false;
public BaseUIMAAsynchronousEngine_impl() {
super();
UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
@@ -219,7 +223,10 @@ public class BaseUIMAAsynchronousEngine_
SerialFormat serialFormat) throws ResourceProcessException {
try {
msg.setStringProperty(AsynchAEMessage.MessageFrom,
consumerDestination.getQueueName());
-
+// // check if this message should target specific service instance
+// if ( serviceTargetSelector != null ) {
+//
msg.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,serviceTargetSelector);
+// }
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process);
@@ -308,12 +315,16 @@ public class BaseUIMAAsynchronousEngine_
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
- if ( sharedConnection.getClientCount()
== 1 ) {
+ if ( sharedConnection.getClientCount()
<= 1 ) {
sharedConnection.destroy();
amqc.close();
}
- }
+ } else if ( sharedConnection.getClientCount() <= 1 ) {
+
+ sharedConnection.destroy();
+ amqc.close();
+ }
} catch (Exception exx) {exx.printStackTrace();}
}
// Delete client's temp reply queue from AMQ Broker
@@ -341,7 +352,13 @@ public class BaseUIMAAsynchronousEngine_
}
public void stop() {
try {
- System.out.println(this.getClass().getName()+".stop()
- Stopping UIMA-AS Client");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
+ Level.INFO)) {
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
+ CLASS_NAME.getName(),
"stop",
+
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+
"UIMAJMS_stopping_as_client_INFO");
+ }
stopConnection();
super.doStop();
@@ -390,7 +407,7 @@ public class BaseUIMAAsynchronousEngine_
}
}
- public void setCPCMessage(Message msg) throws Exception {
+ protected void setCPCMessage(Message msg) throws Exception {
msg.setStringProperty(AsynchAEMessage.MessageFrom,
consumerDestination.getQueueName());
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
@@ -402,7 +419,35 @@ public class BaseUIMAAsynchronousEngine_
((TextMessage) msg).setText("");
}
}
+ protected void setFreeCasMessage(Message msg, String aCasReferenceId, String
selector) throws Exception {
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ msg.setStringProperty(AsynchAEMessage.CasReference,
aCasReferenceId);
+ msg.setIntProperty(AsynchAEMessage.MessageType,
AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Command,
AsynchAEMessage.ReleaseCAS);
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ msg.setJMSReplyTo(consumerDestination);
+ if ( selector != null ) {
+
msg.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
+ selector);
+ }
+
+ if (msg instanceof TextMessage) {
+ ((TextMessage) msg).setText("");
+ }
+ }
+ protected void setStopMessage(Message msg, String aCasReferenceId) throws
Exception {
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ msg.setStringProperty(AsynchAEMessage.CasReference,
aCasReferenceId);
+ msg.setIntProperty(AsynchAEMessage.MessageType,
AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ msg.setJMSReplyTo(consumerDestination);
+
+ if (msg instanceof TextMessage) {
+ ((TextMessage) msg).setText("");
+ }
+ }
private boolean connectionClosedOrInvalid() {
SharedConnection sharedConnection =
lookupConnection(brokerURI);
if (sharedConnection == null
@@ -419,6 +464,9 @@ public class BaseUIMAAsynchronousEngine_
}
private SharedConnection createAndInitializeAMQConnection( Semaphore
semaphore, String aBrokerURI) throws Exception {
+ if ( stopped ) {
+ return null;
+ }
// This only effects Consumer
// Create AMQ specific connection validator. It uses
// AMQ specific approach to test the state of the connection
@@ -697,23 +745,7 @@ public class BaseUIMAAsynchronousEngine_
// throws an exception if verions of UIMA-AS is not compatible with UIMA
SDK
VersionCompatibilityChecker.check(CLASS_NAME, "UIMA AS Client",
"initialize");
-/*
- // Check for compatibility with a version of uima sdk. Only check major
versions.
- if (UimaAsVersion.getMajorVersion() != UimaVersion.getMajorVersion() ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.WARNING,
- CLASS_NAME.getName(),
- "initialize",
- UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_incompatible_version_WARNING",
- new Object[] { "UIMA AS Client",
UimaAsVersion.getUimajFullVersionString(),
- UimaVersion.getFullVersionString() });
- throw new ResourceInitializationException(new AsynchAEException(
- "Version of UIMA-AS is Incompatible with a Version of UIMA Core.
UIMA-AS Version is built to depend on Core UIMA version:"
- + UimaAsVersion.getUimajFullVersionString() + " but is
running with version:"
- + UimaVersion.getFullVersionString()));
- }
-*/
+
if (running) {
throw new ResourceInitializationException(new
UIMA_IllegalStateException());
}
@@ -758,7 +790,11 @@ public class BaseUIMAAsynchronousEngine_
.intValue();
clientSideJmxStats.setCasPoolSize(casPoolSize);
}
-
+// if (
anApplicationContext.containsKey(UimaAsynchronousEngine.TargetSelectorProperty)
) {
+// serviceTargetSelector =
+//
(String)anApplicationContext.get(UimaAsynchronousEngine.TargetSelectorProperty);
+// }
+
if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) {
processTimeout = ((Integer)
anApplicationContext.get(UimaAsynchronousEngine.Timeout))
.intValue();
@@ -1304,6 +1340,32 @@ public class BaseUIMAAsynchronousEngine_
// private void stopProducingCases(ClientRequest clientCachedRequest) {
private void stopProducingCases(String casReferenceId, Destination
cmFreeCasQueue) {
+ PendingMessage msg = new PendingMessage(AsynchAEMessage.Stop);
+ msg.put(AsynchAEMessage.Destination, cmFreeCasQueue);
+ msg.put(AsynchAEMessage.CasReference, casReferenceId);
+ try {
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
CLASS_NAME.getName(),
+ "stopProducingCases",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_sending_stop_to_service__INFO", new
Object[] {casReferenceId,cmFreeCasQueue});
+ }
+ sender.dispatchMessage(msg, this, false);
+
+ } catch (Exception ex) {
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
CLASS_NAME.getName(),
+ "stopProducingCases",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING",
+ ex);
+ }
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
CLASS_NAME.getName(),
+ "stopProducingCases",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
+ }
+ }
+
+ /*
try {
// if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
if (cmFreeCasQueue != null) {
@@ -1350,6 +1412,18 @@ public class BaseUIMAAsynchronousEngine_
"UIMAJMS_exception__WARNING", e);
}
}
+ */
+ }
+ protected void dispatchFreeCasRequest(String casReferenceId, Message
message) throws Exception {
+ PendingMessage msg = new PendingMessage(AsynchAEMessage.ReleaseCAS);
+// if ( message.getStringProperty(AsynchAEMessage.TargetingSelector) !=
null ) {
+//
msg.put(AsynchAEMessage.TargetingSelector,message.getStringProperty(AsynchAEMessage.TargetingSelector)
);
+// } else {
+// msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo());
+// }
+ msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo());
+ msg.put(AsynchAEMessage.CasReference, casReferenceId);
+ sender.dispatchMessage(msg, this, false);
}
protected MessageSender getDispatcher() {
return sender;
Modified:
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java
URL:
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
---
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java
(original)
+++
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java
Thu Dec 20 14:40:58 2018
@@ -26,6 +26,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -79,6 +80,9 @@ import org.apache.uima.util.XMLInputSour
* <li>-uimaEeDebug true causes various debugging things to happen, including
*not* deleting the
* generated spring file generated by running dd-2-spring. This parameter only
affects deployments
* specified using the -d parameter that follow it in the command line
sequence.</li>
+ * <li>-TargetServiceId specifies identifier of a service which should process
a CAS. This
+ * identifier must match service's identifier. By default a service is
launched with an IP:PID
+ * identifier but the identifier can be an arbitrary String. </li>
* </ul>
*/
public class RunRemoteAsyncAE {
@@ -109,6 +113,8 @@ public class RunRemoteAsyncAE {
private BufferedWriter logPerfWriter = null;
private boolean needPerfHeaders = true;
+
+ private String selector=null;
/**
* Start time of the processing - used to compute elapsed time.
@@ -196,6 +202,11 @@ public class RunRemoteAsyncAE {
cpc_timeout = Integer.parseInt(args[++i]);
} else if (args[i].equals(UimaAsynchronousEngine.UimaEeDebug)) {
appCtx.put(UimaAsynchronousEngine.UimaEeDebug, args[++i]);
+ } else if
(args[i].equals("-"+UimaAsynchronousEngine.TargetSelectorProperty)) {
+ selector = args[++i];
+ System.out.println("... Target Selector:"+selector);
+ // when a service is internally deployed (-d option) it will
use a selector defined below
+
System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,selector);
} else {
System.err.println("Unknown switch " + args[i]);
printUsageAndExit();
@@ -253,6 +264,11 @@ public class RunRemoteAsyncAE {
appCtx.put(UimaAsynchronousEngine.CasPoolSize, casPoolSize);
appCtx.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, Integer.valueOf(fsHeapSize
/ 4).toString());
+ // if configured to use service targeting, tell the client which service
to use
+ String target = null;
+ if ( (target =
System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) ) != null ) {
+ appCtx.put(UimaAsynchronousEngine.TargetSelectorProperty, target);
+ }
// initialize
uimaEEEngine.initialize(appCtx);
@@ -266,7 +282,13 @@ public class RunRemoteAsyncAE {
} else {
// send an empty CAS
CAS cas = uimaEEEngine.getCAS();
- uimaEEEngine.sendCAS(cas);
+ if ( selector != null ) {
+ System.out.println("Sending CAS to targeted service using
sendAndReceive() - target selector:"+selector);
+ ArrayList<AnalysisEnginePerformanceMetrics> perfList = new
ArrayList<AnalysisEnginePerformanceMetrics>();
+ uimaEEEngine.sendAndReceiveCAS(cas,perfList , selector);
+ } else {
+ uimaEEEngine.sendCAS(cas);
+ }
uimaEEEngine.collectionProcessingComplete();
}
if (logPerfWriter != null) {
@@ -319,7 +341,11 @@ public class RunRemoteAsyncAE {
+ "-uimaEeDebug true This is optional. Leave it out for
normal operation. If specified, causes"
+ " additional debugging things to happen, including *not*
deleting the generated Spring xml file generated"
+ " from running dd2spring. It only affects deployments
specified using the -d parameter that follow it on"
- + " the command line");
+ + " the command line\n"
+ + " -TargetServiceId - service identifier. This is
optional. If specified, this"
+ + " identifier enables targeting of a specific service.
The UIMA-AS client will send CASes to"
+ + " a service instance which was launched with a matching
identifier.")
+ ;
System.exit(1);
}