Author: asankaa
Date: Sat Dec 13 01:56:06 2008
New Revision: 26913
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=26913

Log:
Implement the static subscriptions, and a sample 501 to demonstrate it 


Added:
   branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_501.xml
      - copied, changed from r26773, 
/branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_500.xml
Modified:
   
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
   
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
   
branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java

Modified: 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java?rev=26913&r1=26912&r2=26913&view=diff
==============================================================================
--- 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
       (original)
+++ 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
       Sat Dec 13 01:56:06 2008
@@ -26,22 +26,28 @@
 import org.apache.synapse.SynapseException;
 import org.apache.synapse.config.xml.PropertyHelper;
 import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
 import org.apache.synapse.eventing.SynapseEventSource;
+import org.apache.synapse.eventing.SynapseEventingConstants;
+import org.apache.synapse.eventing.SynapseSubscription;
 import org.apache.synapse.eventing.SynapseSubscriptionManager;
+import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
+import org.apache.axis2.databinding.utils.ConverterUtil;
 
 import javax.xml.namespace.QName;
 import java.util.Iterator;
 
 /**
  * <eventSource name="blah">
- *   <subscriptionManager 
class="org.apache.synapse.events.DefaultInMemorySubscriptionManager">
- *      <property name="other" value="some text property"/>
- *   </subscriptionManager>
- *   <subscription id="static1">
- *      <filter....>
- *      <sequence...>
- *      <endpoint..>
- *   </subscription>*
+ * <subscriptionManager 
class="org.apache.synapse.events.DefaultInMemorySubscriptionManager">
+ * <property name="other" value="some text property"/>
+ * </subscriptionManager>
+ * <subscription id="static1">
+ * <filter....>
+ * <sequence...>
+ * <endpoint..>
+ * </subscription>*
  * <eventSource>
  */
 public class EventSourceFactory {
@@ -52,6 +58,15 @@
     private static final QName PROPERTIES_QNAME = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "property");
     private static final QName WS_EVENTING_QNAME
             = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "wsEventing");
+    private static final QName SUBSCRIPTION_QNAME = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "subscription");
+    private static final QName FILTER_QNAME = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "filter");
+    private static final QName ENDPOINT_QNAME = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "endpoint");
+    private static final QName ADDRESS_QNAME = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "address");
+    private static final QName EXPIRES_QNAME = new 
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "expires");
+    private static final QName FILTER_SOURCE_QNAME = new QName("source");
+    private static final QName FILTER_DIALECT_QNAME = new QName("dialect");
+    private static final QName ID_QNAME = new QName("id");
+    private static final QName EP_URI_QNAME = new QName("uri");
 
     public static SynapseEventSource createEventSource(OMElement elem) {
 
@@ -95,7 +110,7 @@
         } else {
             handleException("SynapseSubscription Manager has not been 
specified for the event source");
         }
-
+        createStaticSubscriptions(elem, eventSource);
         return eventSource;
     }
 
@@ -108,4 +123,67 @@
         log.error(msg, e);
         throw new SynapseException(msg, e);
     }
+
+    /**
+     * Generate the static subscriptions
+     *
+     * @param elem
+     * @param synapseEventSource
+     */
+    private static void createStaticSubscriptions(OMElement elem, 
SynapseEventSource synapseEventSource) {
+        for (Iterator iterator = elem.getChildrenWithName(SUBSCRIPTION_QNAME); 
iterator.hasNext();) {
+            SynapseSubscription synapseSubscription = new 
SynapseSubscription();
+            OMElement elmSubscription = (OMElement) iterator.next();
+            
synapseSubscription.setId(elmSubscription.getAttribute(ID_QNAME).getAttributeValue());
+            //process the filter
+            OMElement elmFilter = 
elmSubscription.getFirstChildWithName(FILTER_QNAME);
+            OMAttribute dialectAttr = 
elmFilter.getAttribute(FILTER_DIALECT_QNAME);
+            if (dialectAttr != null && dialectAttr.getAttributeValue() != 
null) {
+                if 
(SynapseEventingConstants.TOPIC_FILTER_DIALECT.equals(dialectAttr.getAttributeValue()))
 {
+                    XPathBasedEventFilter filter = new XPathBasedEventFilter();
+                    OMAttribute sourceAttr = 
elmFilter.getAttribute(FILTER_SOURCE_QNAME);
+                    if (sourceAttr != null) {
+                        filter.setResultValue(sourceAttr.getAttributeValue());
+                    } else {
+                        handleException("Error in creating static 
subscription. Filter source not defined");
+                    }
+                    synapseSubscription.setFilter(filter);
+                }
+            } else {
+                handleException("Error in creating static subscription. Filter 
dialect not defined");
+            }
+            OMElement elmEndpoint = 
elmSubscription.getFirstChildWithName(ENDPOINT_QNAME);
+            if (elmEndpoint != null) {
+                OMElement elmAddress = 
elmEndpoint.getFirstChildWithName(ADDRESS_QNAME);
+                if(elmAddress!=null){
+                AddressEndpoint endpoint = new AddressEndpoint();
+                EndpointDefinition def = new EndpointDefinition();
+                OMAttribute uriAttr = elmAddress.getAttribute(EP_URI_QNAME);
+                if(uriAttr!=null){
+                def.setAddress(uriAttr.getAttributeValue());
+                endpoint.setDefinition(def);
+                synapseSubscription.setEndpoint(endpoint);
+                }else{
+                    handleException("Error in creating static subscription. 
URI not defined");
+                }
+                }else{
+                    handleException("Error in creating static subscription. 
Address not defined");
+                }
+
+            } else {
+                handleException("Error in creating static subscription. 
Endpoint not defined");
+            }
+            OMElement elmExpires = 
elmSubscription.getFirstChildWithName(EXPIRES_QNAME);
+            if(elmExpires!=null){
+               try{
+                    
synapseSubscription.setExpires(ConverterUtil.convertToDateTime(elmExpires.getText()));
+               }catch(Exception e){
+                    handleException("Error in creating static subscription. 
invalid date format",e);
+               }
+            }else{
+                synapseSubscription.setExpires(null);
+            }
+            
synapseEventSource.getSubscriptionManager().addSubscription(synapseSubscription);
+        }
+    }
 }

Modified: 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java?rev=26913&r1=26912&r2=26913&view=diff
==============================================================================
--- 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
    (original)
+++ 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
    Sat Dec 13 01:56:06 2008
@@ -19,6 +19,9 @@
 */
 import org.apache.synapse.mediators.AbstractMediator;
 import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
 import org.apache.synapse.util.MessageHelper;
 import org.apache.synapse.eventing.SynapseEventSource;
 import org.apache.synapse.eventing.SynapseSubscriptionManager;
@@ -26,6 +29,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
 
 import java.util.List;
 
@@ -36,6 +41,7 @@
         if (log.isDebugEnabled()) {
              log.debug("Mediation for Event Publisher started");
          }
+        //sendResponce(synCtx); TODO need to investigate this further 
         SynapseEventSource eventSource = 
synCtx.getConfiguration().getEventSource(eventSourceName);
         SynapseSubscriptionManager subscriptionManager = 
eventSource.getSubscriptionManager();
             List<SynapseSubscription> subscribers = 
subscriptionManager.getMatchingSubscribers(synCtx);
@@ -62,4 +68,22 @@
     public void setEventSourceName(String eventSourceName) {
         this.eventSourceName = eventSourceName;
     }
+    private void sendResponce(MessageContext synCtx){
+        MessageContext mc = null;
+        try {
+            mc = MessageHelper.cloneMessageContext(synCtx);
+        String replyAddress = mc.getReplyTo().getAddress();
+        AddressEndpoint endpoint = new AddressEndpoint();
+        EndpointDefinition def = new EndpointDefinition();
+        def.setAddress(replyAddress.trim());
+        def.setAddressingOn(true);
+        endpoint.setDefinition(def);
+        //mc.setEnvelope(null);
+        mc.setTo(new EndpointReference(replyAddress));
+        mc.setResponse(true);
+        endpoint.send(mc);
+            } catch (AxisFault axisFault) {
+            axisFault.printStackTrace();  //To change body of catch statement 
use File | Settings | File Templates.
+        }
+    }
 }

Modified: 
branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java?rev=26913&r1=26912&r2=26913&view=diff
==============================================================================
--- 
branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java
        (original)
+++ 
branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java
        Sat Dec 13 01:56:06 2008
@@ -101,7 +101,7 @@
 
         System.out.println("Sending Event : \n" + payload.toString());
         try {
-            serviceClient.sendRobust(payload);
+            serviceClient.sendRobust(payload);            
             System.out.println("Event sent to topic " + topic);
             if (configContext != null) {
                 configContext.terminate();

Copied: 
branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_501.xml (from 
r26773, 
/branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_500.xml)
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_501.xml?rev=26913&r1=26773&r2=26913&view=diff
==============================================================================
--- /branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_500.xml  
(original)
+++ branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_501.xml   
Sat Dec 13 01:56:06 2008
@@ -17,7 +17,7 @@
   ~  under the License.
   -->
 
-<!-- Simple Eventing configuration -->
+<!-- Eventing configuration with static subscriptions-->
 <definitions xmlns="http://ws.apache.org/ns/synapse";>
      <eventSource name="SampleEventSource">
            <subscriptionManager 
class="org.apache.synapse.eventing.managers.DefaultInMemorySubscriptionManager">
@@ -27,6 +27,15 @@
                <property name="topicHeaderName" value="Topic"/>
                <property name="topicHeaderNS" value="http://apache.org/aip"/>
            </subscriptionManager>
+           <subscription id="mysub1">
+                <filter source ="synapse/event/test" 
dialect="http://synapse.apache.org/eventing/dialect/topicFilter"/>
+                <endpoint><address 
uri="http://localhost:9000/services/SimpleStockQuoteService"/></endpoint>
+           </subscription>
+           <subscription id="mysub2">
+                <filter source ="synapse/event/test" 
dialect="http://synapse.apache.org/eventing/dialect/topicFilter"/>
+                <endpoint><address 
uri="http://localhost:9000/services/SimpleStockQuoteService"/></endpoint>
+                <expires>2020-06-27T21:07:00.000-08:00</expires>
+           </subscription>
      </eventSource>
 
     <sequence name="PublicEventSource" >

_______________________________________________
Esb-java-dev mailing list
[email protected]
https://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev

Reply via email to