Author: asanka
Date: Tue Jan 27 07:54:50 2009
New Revision: 738021
URL: http://svn.apache.org/viewvc?rev=738021&view=rev
Log:
fixing the 202 response issue with event dispatching.
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java?rev=738021&r1=738020&r2=738021&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
Tue Jan 27 07:54:50 2009
@@ -19,12 +19,9 @@
*/
import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
-import org.apache.synapse.endpoints.AddressEndpoint;
-import org.apache.synapse.endpoints.EndpointDefinition;
import org.apache.synapse.eventing.SynapseEventSource;
import org.apache.synapse.eventing.SynapseSubscription;
import org.apache.synapse.eventing.SynapseSubscriptionManager;
@@ -41,24 +38,13 @@
if (log.isDebugEnabled()) {
log.debug("Mediation for Event Publisher started");
}
- //sendResponce(synCtx); TODO need to investigate this further
SynapseEventSource eventSource =
synCtx.getConfiguration().getEventSource(eventSourceName);
SynapseSubscriptionManager subscriptionManager =
eventSource.getSubscriptionManager();
List<SynapseSubscription> subscribers =
subscriptionManager.getMatchingSubscribers(synCtx);
- for (SynapseSubscription subscription : subscribers) {
- //TODO: send a 202 responce to the client, client wait and time
outs
- synCtx.setProperty("OUT_ONLY", "true"); // Set one way message
for events
- try {
-
subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
- } catch (AxisFault axisFault) {
- log.error("Event sending failure " + axisFault.toString());
- return false;
- }
- if (log.isDebugEnabled()) {
- log.debug("Event push to : " + subscription.getEndpointUrl());
- }
- }
- return true; //To change body of implemented methods use File |
Settings | File Templates.
+ // Call event dispatcher
+ synCtx.getEnvironment().getExecutorService()
+ .execute(new EventDispatcher(synCtx, subscribers));
+ return true;
}
public String getEventSourceName() {
@@ -69,23 +55,30 @@
this.eventSourceName = eventSourceName;
}
- private void sendResponce(MessageContext synCtx) {
- MessageContext mc = null;
- try {
- mc = MessageHelper.cloneMessageContext(synCtx);
- String replyAddress = mc.getReplyTo().getAddress();
- AddressEndpoint endpoint = new AddressEndpoint();
- EndpointDefinition def = new EndpointDefinition();
- def.setAddress(replyAddress.trim());
- def.setAddressingOn(true);
- endpoint.setDefinition(def);
- //mc.setEnvelope(null);
- mc.setTo(new EndpointReference(replyAddress));
- mc.setResponse(true);
- endpoint.send(mc);
- } catch (AxisFault axisFault) {
- axisFault
- .printStackTrace(); //To change body of catch statement
use File | Settings | File Templates.
+ /**
+ * Dispatching events async on a different thread
+ */
+ class EventDispatcher implements Runnable {
+ MessageContext synCtx;
+ List<SynapseSubscription> subscribers;
+
+ EventDispatcher(MessageContext synCtx, List<SynapseSubscription>
subscribers) {
+ this.synCtx = synCtx;
+ this.subscribers = subscribers;
+ }
+
+ public void run() {
+ for (SynapseSubscription subscription : subscribers) {
+ synCtx.setProperty("OUT_ONLY", "true"); // Set one way
message for events
+ try {
+
subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
+ } catch (AxisFault axisFault) {
+ log.error("Event sending failure " + axisFault.toString());
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Event push to : " +
subscription.getEndpointUrl());
+ }
+ }
}
}
}