Author: asanka
Date: Tue Jan 27 07:54:50 2009
New Revision: 738021

URL: http://svn.apache.org/viewvc?rev=738021&view=rev
Log:
fixing the 202 response issue with event dispatching. 

Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java?rev=738021&r1=738020&r2=738021&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
 Tue Jan 27 07:54:50 2009
@@ -19,12 +19,9 @@
 */
 
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.MessageContext;
-import org.apache.synapse.endpoints.AddressEndpoint;
-import org.apache.synapse.endpoints.EndpointDefinition;
 import org.apache.synapse.eventing.SynapseEventSource;
 import org.apache.synapse.eventing.SynapseSubscription;
 import org.apache.synapse.eventing.SynapseSubscriptionManager;
@@ -41,24 +38,13 @@
         if (log.isDebugEnabled()) {
             log.debug("Mediation for Event Publisher started");
         }
-        //sendResponce(synCtx); TODO need to investigate this further 
         SynapseEventSource eventSource = 
synCtx.getConfiguration().getEventSource(eventSourceName);
         SynapseSubscriptionManager subscriptionManager = 
eventSource.getSubscriptionManager();
         List<SynapseSubscription> subscribers = 
subscriptionManager.getMatchingSubscribers(synCtx);
-        for (SynapseSubscription subscription : subscribers) {
-            //TODO: send a 202 responce to the client, client wait and time 
outs
-            synCtx.setProperty("OUT_ONLY", "true");    // Set one way message 
for events
-            try {
-                
subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
-            } catch (AxisFault axisFault) {
-                log.error("Event sending failure " + axisFault.toString());
-                return false;
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Event push to  : " + subscription.getEndpointUrl());
-            }
-        }
-        return true;  //To change body of implemented methods use File | 
Settings | File Templates.
+        // Call event dispatcher
+        synCtx.getEnvironment().getExecutorService()
+                .execute(new EventDispatcher(synCtx, subscribers));
+        return true;
     }
 
     public String getEventSourceName() {
@@ -69,23 +55,30 @@
         this.eventSourceName = eventSourceName;
     }
 
-    private void sendResponce(MessageContext synCtx) {
-        MessageContext mc = null;
-        try {
-            mc = MessageHelper.cloneMessageContext(synCtx);
-            String replyAddress = mc.getReplyTo().getAddress();
-            AddressEndpoint endpoint = new AddressEndpoint();
-            EndpointDefinition def = new EndpointDefinition();
-            def.setAddress(replyAddress.trim());
-            def.setAddressingOn(true);
-            endpoint.setDefinition(def);
-            //mc.setEnvelope(null);
-            mc.setTo(new EndpointReference(replyAddress));
-            mc.setResponse(true);
-            endpoint.send(mc);
-        } catch (AxisFault axisFault) {
-            axisFault
-                    .printStackTrace();  //To change body of catch statement 
use File | Settings | File Templates.
+    /**
+     * Dispatching events async on a different thread
+     */
+    class EventDispatcher implements Runnable {
+        MessageContext synCtx;
+        List<SynapseSubscription> subscribers;
+
+        EventDispatcher(MessageContext synCtx, List<SynapseSubscription> 
subscribers) {
+            this.synCtx = synCtx;
+            this.subscribers = subscribers;
+        }
+
+        public void run() {
+            for (SynapseSubscription subscription : subscribers) {
+                synCtx.setProperty("OUT_ONLY", "true");    // Set one way 
message for events
+                try {
+                    
subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
+                } catch (AxisFault axisFault) {
+                    log.error("Event sending failure " + axisFault.toString());
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug("Event push to  : " + 
subscription.getEndpointUrl());
+                }
+            }
         }
     }
 }


Reply via email to