Author: asankaa
Date: Sat Dec 13 01:56:06 2008
New Revision: 26913
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=26913
Log:
Implement the static subscriptions, and a sample 501 to demonstrate it
Added:
branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_501.xml
- copied, changed from r26773,
/branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_500.xml
Modified:
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java
Modified:
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
URL:
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java?rev=26913&r1=26912&r2=26913&view=diff
==============================================================================
---
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
(original)
+++
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
Sat Dec 13 01:56:06 2008
@@ -26,22 +26,28 @@
import org.apache.synapse.SynapseException;
import org.apache.synapse.config.xml.PropertyHelper;
import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
import org.apache.synapse.eventing.SynapseEventSource;
+import org.apache.synapse.eventing.SynapseEventingConstants;
+import org.apache.synapse.eventing.SynapseSubscription;
import org.apache.synapse.eventing.SynapseSubscriptionManager;
+import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
+import org.apache.axis2.databinding.utils.ConverterUtil;
import javax.xml.namespace.QName;
import java.util.Iterator;
/**
* <eventSource name="blah">
- * <subscriptionManager
class="org.apache.synapse.events.DefaultInMemorySubscriptionManager">
- * <property name="other" value="some text property"/>
- * </subscriptionManager>
- * <subscription id="static1">
- * <filter....>
- * <sequence...>
- * <endpoint..>
- * </subscription>*
+ * <subscriptionManager
class="org.apache.synapse.events.DefaultInMemorySubscriptionManager">
+ * <property name="other" value="some text property"/>
+ * </subscriptionManager>
+ * <subscription id="static1">
+ * <filter....>
+ * <sequence...>
+ * <endpoint..>
+ * </subscription>*
* <eventSource>
*/
public class EventSourceFactory {
@@ -52,6 +58,15 @@
private static final QName PROPERTIES_QNAME = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "property");
private static final QName WS_EVENTING_QNAME
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "wsEventing");
+ private static final QName SUBSCRIPTION_QNAME = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "subscription");
+ private static final QName FILTER_QNAME = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "filter");
+ private static final QName ENDPOINT_QNAME = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "endpoint");
+ private static final QName ADDRESS_QNAME = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "address");
+ private static final QName EXPIRES_QNAME = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "expires");
+ private static final QName FILTER_SOURCE_QNAME = new QName("source");
+ private static final QName FILTER_DIALECT_QNAME = new QName("dialect");
+ private static final QName ID_QNAME = new QName("id");
+ private static final QName EP_URI_QNAME = new QName("uri");
public static SynapseEventSource createEventSource(OMElement elem) {
@@ -95,7 +110,7 @@
} else {
handleException("SynapseSubscription Manager has not been
specified for the event source");
}
-
+ createStaticSubscriptions(elem, eventSource);
return eventSource;
}
@@ -108,4 +123,67 @@
log.error(msg, e);
throw new SynapseException(msg, e);
}
+
+ /**
+ * Generate the static subscriptions
+ *
+ * @param elem
+ * @param synapseEventSource
+ */
+ private static void createStaticSubscriptions(OMElement elem,
SynapseEventSource synapseEventSource) {
+ for (Iterator iterator = elem.getChildrenWithName(SUBSCRIPTION_QNAME);
iterator.hasNext();) {
+ SynapseSubscription synapseSubscription = new
SynapseSubscription();
+ OMElement elmSubscription = (OMElement) iterator.next();
+
synapseSubscription.setId(elmSubscription.getAttribute(ID_QNAME).getAttributeValue());
+ //process the filter
+ OMElement elmFilter =
elmSubscription.getFirstChildWithName(FILTER_QNAME);
+ OMAttribute dialectAttr =
elmFilter.getAttribute(FILTER_DIALECT_QNAME);
+ if (dialectAttr != null && dialectAttr.getAttributeValue() !=
null) {
+ if
(SynapseEventingConstants.TOPIC_FILTER_DIALECT.equals(dialectAttr.getAttributeValue()))
{
+ XPathBasedEventFilter filter = new XPathBasedEventFilter();
+ OMAttribute sourceAttr =
elmFilter.getAttribute(FILTER_SOURCE_QNAME);
+ if (sourceAttr != null) {
+ filter.setResultValue(sourceAttr.getAttributeValue());
+ } else {
+ handleException("Error in creating static
subscription. Filter source not defined");
+ }
+ synapseSubscription.setFilter(filter);
+ }
+ } else {
+ handleException("Error in creating static subscription. Filter
dialect not defined");
+ }
+ OMElement elmEndpoint =
elmSubscription.getFirstChildWithName(ENDPOINT_QNAME);
+ if (elmEndpoint != null) {
+ OMElement elmAddress =
elmEndpoint.getFirstChildWithName(ADDRESS_QNAME);
+ if(elmAddress!=null){
+ AddressEndpoint endpoint = new AddressEndpoint();
+ EndpointDefinition def = new EndpointDefinition();
+ OMAttribute uriAttr = elmAddress.getAttribute(EP_URI_QNAME);
+ if(uriAttr!=null){
+ def.setAddress(uriAttr.getAttributeValue());
+ endpoint.setDefinition(def);
+ synapseSubscription.setEndpoint(endpoint);
+ }else{
+ handleException("Error in creating static subscription.
URI not defined");
+ }
+ }else{
+ handleException("Error in creating static subscription.
Address not defined");
+ }
+
+ } else {
+ handleException("Error in creating static subscription.
Endpoint not defined");
+ }
+ OMElement elmExpires =
elmSubscription.getFirstChildWithName(EXPIRES_QNAME);
+ if(elmExpires!=null){
+ try{
+
synapseSubscription.setExpires(ConverterUtil.convertToDateTime(elmExpires.getText()));
+ }catch(Exception e){
+ handleException("Error in creating static subscription.
invalid date format",e);
+ }
+ }else{
+ synapseSubscription.setExpires(null);
+ }
+
synapseEventSource.getSubscriptionManager().addSubscription(synapseSubscription);
+ }
+ }
}
Modified:
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
URL:
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java?rev=26913&r1=26912&r2=26913&view=diff
==============================================================================
---
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
(original)
+++
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
Sat Dec 13 01:56:06 2008
@@ -19,6 +19,9 @@
*/
import org.apache.synapse.mediators.AbstractMediator;
import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
import org.apache.synapse.util.MessageHelper;
import org.apache.synapse.eventing.SynapseEventSource;
import org.apache.synapse.eventing.SynapseSubscriptionManager;
@@ -26,6 +29,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
import java.util.List;
@@ -36,6 +41,7 @@
if (log.isDebugEnabled()) {
log.debug("Mediation for Event Publisher started");
}
+ //sendResponce(synCtx); TODO need to investigate this further
SynapseEventSource eventSource =
synCtx.getConfiguration().getEventSource(eventSourceName);
SynapseSubscriptionManager subscriptionManager =
eventSource.getSubscriptionManager();
List<SynapseSubscription> subscribers =
subscriptionManager.getMatchingSubscribers(synCtx);
@@ -62,4 +68,22 @@
public void setEventSourceName(String eventSourceName) {
this.eventSourceName = eventSourceName;
}
+ private void sendResponce(MessageContext synCtx){
+ MessageContext mc = null;
+ try {
+ mc = MessageHelper.cloneMessageContext(synCtx);
+ String replyAddress = mc.getReplyTo().getAddress();
+ AddressEndpoint endpoint = new AddressEndpoint();
+ EndpointDefinition def = new EndpointDefinition();
+ def.setAddress(replyAddress.trim());
+ def.setAddressingOn(true);
+ endpoint.setDefinition(def);
+ //mc.setEnvelope(null);
+ mc.setTo(new EndpointReference(replyAddress));
+ mc.setResponse(true);
+ endpoint.send(mc);
+ } catch (AxisFault axisFault) {
+ axisFault.printStackTrace(); //To change body of catch statement
use File | Settings | File Templates.
+ }
+ }
}
Modified:
branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java
URL:
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java?rev=26913&r1=26912&r2=26913&view=diff
==============================================================================
---
branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java
(original)
+++
branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSender.java
Sat Dec 13 01:56:06 2008
@@ -101,7 +101,7 @@
System.out.println("Sending Event : \n" + payload.toString());
try {
- serviceClient.sendRobust(payload);
+ serviceClient.sendRobust(payload);
System.out.println("Event sent to topic " + topic);
if (configContext != null) {
configContext.terminate();
Copied:
branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_501.xml (from
r26773,
/branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_500.xml)
URL:
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_501.xml?rev=26913&r1=26773&r2=26913&view=diff
==============================================================================
--- /branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_500.xml
(original)
+++ branches/synapse/1.2.wso2v1/repository/conf/sample/synapse_sample_501.xml
Sat Dec 13 01:56:06 2008
@@ -17,7 +17,7 @@
~ under the License.
-->
-<!-- Simple Eventing configuration -->
+<!-- Eventing configuration with static subscriptions-->
<definitions xmlns="http://ws.apache.org/ns/synapse">
<eventSource name="SampleEventSource">
<subscriptionManager
class="org.apache.synapse.eventing.managers.DefaultInMemorySubscriptionManager">
@@ -27,6 +27,15 @@
<property name="topicHeaderName" value="Topic"/>
<property name="topicHeaderNS" value="http://apache.org/aip"/>
</subscriptionManager>
+ <subscription id="mysub1">
+ <filter source ="synapse/event/test"
dialect="http://synapse.apache.org/eventing/dialect/topicFilter"/>
+ <endpoint><address
uri="http://localhost:9000/services/SimpleStockQuoteService"/></endpoint>
+ </subscription>
+ <subscription id="mysub2">
+ <filter source ="synapse/event/test"
dialect="http://synapse.apache.org/eventing/dialect/topicFilter"/>
+ <endpoint><address
uri="http://localhost:9000/services/SimpleStockQuoteService"/></endpoint>
+ <expires>2020-06-27T21:07:00.000-08:00</expires>
+ </subscription>
</eventSource>
<sequence name="PublicEventSource" >
_______________________________________________
Esb-java-dev mailing list
[email protected]
https://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev