Repository: cxf
Updated Branches:
  refs/heads/master a9c8de0d5 -> cb36642a8


[CXF-6784]WS-Notification subscription should support renew


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/cb36642a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/cb36642a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/cb36642a

Branch: refs/heads/master
Commit: cb36642a838a145c993ed7bf9b5bf1d60ee235a8
Parents: a9c8de0
Author: Freeman Fang <[email protected]>
Authored: Wed Mar 2 14:16:49 2016 +0800
Committer: Freeman Fang <[email protected]>
Committed: Wed Mar 2 14:16:49 2016 +0800

----------------------------------------------------------------------
 .../cxf/wsn/client/NotificationBroker.java      | 15 ++++--
 .../apache/cxf/wsn/AbstractSubscription.java    |  8 +--
 .../org/apache/cxf/wsn/jms/JmsSubscription.java | 53 ++++++++++++++++++--
 .../java/org/apache/cxf/wsn/WsnBrokerTest.java  | 33 ++++++++++++
 4 files changed, 97 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
----------------------------------------------------------------------
diff --git 
a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
 
b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
index 1ed7b36..a532f3a 100644
--- 
a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
+++ 
b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
@@ -68,6 +68,8 @@ public class NotificationBroker implements Referencable {
     public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, 
"TopicExpression");
 
     public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, 
"MessageContent");
+    
+    public static final QName QNAME_INITIAL_TERMINATION_TIME = new 
QName(WSN_URI, "InitialTerminationTime");
 
     
     private org.oasis_open.docs.wsn.brw_2.NotificationBroker broker;
@@ -159,8 +161,10 @@ public class NotificationBroker implements Referencable {
         NotifyMessageNotSupportedFault, 
InvalidProducerPropertiesExpressionFault {
         //CHECKSTYLE:ON
         
-        return subscribe(consumer, topic, null, false);
+        return subscribe(consumer, topic, null, false, null);
     }
+    
+
 
     public Subscription subscribe(Referencable consumer, String topic, String 
xpath) 
         //CHECKSTYLE:OFF - WS-Notification spec throws a lot of faults
@@ -170,11 +174,11 @@ public class NotificationBroker implements Referencable {
         UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault, 
NotifyMessageNotSupportedFault, 
         InvalidProducerPropertiesExpressionFault {
         //CHECKSTYLE:ON
-        return subscribe(consumer, topic, xpath, false);
+        return subscribe(consumer, topic, xpath, false, null);
     }
 
     public Subscription subscribe(Referencable consumer, String topic,
-                                  String xpath, boolean raw)
+                                  String xpath, boolean raw, String 
initialTerminationTime)
         //CHECKSTYLE:OFF - WS-Notification spec throws a lot of faults
         throws TopicNotSupportedFault, InvalidFilterFault, 
TopicExpressionDialectUnknownFault, 
         UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, 
@@ -184,6 +188,11 @@ public class NotificationBroker implements Referencable {
         //CHECKSTYLE:ON
 
         Subscribe subscribeRequest = new Subscribe();
+        if (initialTerminationTime != null) {
+            subscribeRequest.setInitialTerminationTime(
+                  new JAXBElement<String>(QNAME_INITIAL_TERMINATION_TIME,
+                  String.class, initialTerminationTime));
+        }
         subscribeRequest.setConsumerReference(consumer.getEpr());
         subscribeRequest.setFilter(new FilterType());
         if (topic != null) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
----------------------------------------------------------------------
diff --git 
a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
 
b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
index 48bf45c..6036c86 100644
--- 
a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
+++ 
b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
@@ -126,6 +126,7 @@ public abstract class AbstractSubscription extends 
AbstractEndpoint implements P
             Renew renewRequest) throws ResourceUnknownFault, 
UnacceptableTerminationTimeFault {
 
         XMLGregorianCalendar time = 
validateTerminationTime(renewRequest.getTerminationTime());
+        this.setTerminationTime(time);
         renew(time);
         RenewResponse response = new RenewResponse();
         response.setTerminationTime(time);
@@ -401,12 +402,7 @@ public abstract class AbstractSubscription extends 
AbstractEndpoint implements P
             throw new InvalidMessageContentExpressionFault("Unsupported 
MessageContent dialect: '"
                     + contentFilter.getDialect() + "'", fault);
         }
-        if (terminationTime != null) {
-            UnacceptableInitialTerminationTimeFaultType fault 
-                = new UnacceptableInitialTerminationTimeFaultType();
-            throw new 
UnacceptableInitialTerminationTimeFault("InitialTerminationTime is not 
supported", 
-                                                              fault);
-        }
+        
     }
 
     public AbstractNotificationBroker getBroker() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
----------------------------------------------------------------------
diff --git 
a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
 
b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
index 6420400..56f5ed2 100644
--- 
a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
+++ 
b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
@@ -34,6 +34,7 @@ import javax.jms.Topic;
 import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
+import javax.xml.datatype.DatatypeConstants;
 import javax.xml.datatype.XMLGregorianCalendar;
 import javax.xml.stream.XMLStreamReader;
 import javax.xml.xpath.XPath;
@@ -57,7 +58,6 @@ import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
 import org.oasis_open.docs.wsn.b_2.Subscribe;
 import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
 import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
 import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
 import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
 import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
@@ -86,6 +86,12 @@ public abstract class JmsSubscription extends 
AbstractSubscription implements Me
     private Topic jmsTopic;
 
     private JAXBContext jaxbContext;
+    
+    private boolean checkTermination = true;
+    
+    private boolean isSessionActive = true;
+    
+    private Thread terminationThread;
 
     public JmsSubscription(String name) {
         super(name);
@@ -102,6 +108,12 @@ public abstract class JmsSubscription extends 
AbstractSubscription implements Me
             session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             MessageConsumer consumer = session.createConsumer(jmsTopic);
             consumer.setMessageListener(this);
+            checkTermination = true;
+            isSessionActive = true;
+            if (getTerminationTime() != null) {
+                terminationThread = new TerminationThread();
+                terminationThread.start();
+            }
         } catch (JMSException e) {
             SubscribeCreationFailedFaultType fault = new 
SubscribeCreationFailedFaultType();
             throw new SubscribeCreationFailedFault("Error starting 
subscription", fault, e);
@@ -134,6 +146,7 @@ public abstract class JmsSubscription extends 
AbstractSubscription implements Me
         } else {
             try {
                 session.close();
+                isSessionActive = false;
             } catch (JMSException e) {
                 PauseFailedFaultType fault = new PauseFailedFaultType();
                 throw new PauseFailedFault("Error pausing subscription", 
fault, e);
@@ -153,6 +166,7 @@ public abstract class JmsSubscription extends 
AbstractSubscription implements Me
                 session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                 MessageConsumer consumer = session.createConsumer(jmsTopic);
                 consumer.setMessageListener(this);
+                isSessionActive = true;
             } catch (JMSException e) {
                 ResumeFailedFaultType fault = new ResumeFailedFaultType();
                 throw new ResumeFailedFault("Error resuming subscription", 
fault, e);
@@ -162,8 +176,15 @@ public abstract class JmsSubscription extends 
AbstractSubscription implements Me
 
     @Override
     protected void renew(XMLGregorianCalendar terminationTime) throws 
UnacceptableTerminationTimeFault {
-        UnacceptableTerminationTimeFaultType fault = new 
UnacceptableTerminationTimeFaultType();
-        throw new UnacceptableTerminationTimeFault("TerminationTime is not 
supported", fault);
+        try {
+            this.resume();
+            if (this.terminationThread == null) {
+                terminationThread = new TerminationThread();
+                terminationThread.start();
+            }
+        } catch (ResumeFailedFault e) {
+            LOGGER.log(Level.WARNING, "renew failed", e);
+        }
     }
 
     @Override
@@ -172,6 +193,7 @@ public abstract class JmsSubscription extends 
AbstractSubscription implements Me
         if (session != null) {
             try {
                 session.close();
+                checkTermination = false;
             } catch (JMSException e) {
                 UnableToDestroySubscriptionFaultType fault = new 
UnableToDestroySubscriptionFaultType();
                 throw new UnableToDestroySubscriptionFault("Unable to 
unsubscribe", fault, e);
@@ -245,5 +267,30 @@ public abstract class JmsSubscription extends 
AbstractSubscription implements Me
     }
 
     protected abstract void doNotify(Notify notify);
+    
+    class TerminationThread extends Thread {
+        public void run() {
+            while (checkTermination) {
+                XMLGregorianCalendar tt = getTerminationTime();
+                if (tt != null && isSessionActive) {
+                    XMLGregorianCalendar ct = getCurrentTime();
+                    int c = tt.compare(ct);
+                    if (c == DatatypeConstants.LESSER || c == 
DatatypeConstants.EQUAL) {
+                        LOGGER.log(Level.INFO, "Need Pause this subscribe");
+                        try {
+                            pause();
+                        } catch (PauseFailedFault e) {
+                            LOGGER.log(Level.WARNING, "Pause failed", e);
+                        }
+                    }
+                }
+                try {
+                    Thread.sleep(10000); // check if should terminate every 10 
sec
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARNING, "TerminationThread sleep 
interrupted", e);
+                }
+            }
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java 
b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
index e8f388d..b81e70c 100644
--- a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
+++ b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
@@ -130,6 +130,7 @@ public abstract class WsnBrokerTest extends Assert {
         Consumer consumer = new Consumer(callback, "http://localhost:"; + port2 
+ "/test/consumer");
 
         Subscription subscription = notificationBroker.subscribe(consumer, 
"myTopic");
+        
 
         synchronized (callback.notifications) {
             notificationBroker.notify("myTopic", 
@@ -145,6 +146,38 @@ public abstract class WsnBrokerTest extends Assert {
         subscription.unsubscribe();
         consumer.stop();
     }
+    
+    @Test
+    public void testRenew() throws Exception {
+        TestConsumer callback = new TestConsumer();
+        Consumer consumer = new Consumer(callback, "http://localhost:"; + port2 
+ "/test/consumer");
+
+        //create subscription with InitialTerminationTime 20 sec, so that the 
+        //subscription would be expired after 20 sec
+        Subscription subscription = notificationBroker.subscribe(consumer, 
"myTopic", null, false, "PT20S");
+        Thread.sleep(30000);
+        synchronized (callback.notifications) {
+            notificationBroker.notify("myTopic", 
+                                      new JAXBElement<String>(new 
QName("urn:test:org", "foo"),
+                                          String.class, "bar"));
+            callback.notifications.wait(10000);
+        }
+        assertEquals(0, callback.notifications.size()); //the subscription is 
expired so can't get the notification
+        subscription.renew("PT60S"); //renew another 60 sec to resend the 
notification
+        synchronized (callback.notifications) {
+            notificationBroker.notify("myTopic", 
+                                      new JAXBElement<String>(new 
QName("urn:test:org", "foo"),
+                                          String.class, "bar"));
+            callback.notifications.wait(10000);
+        }
+        assertEquals(1, callback.notifications.size()); //the subscription is 
expired so can't get the notification
+        NotificationMessageHolderType message = callback.notifications.get(0);
+        
assertEquals(WSNHelper.getInstance().getWSAAddress(subscription.getEpr()), 
+                     
WSNHelper.getInstance().getWSAAddress(message.getSubscriptionReference()));
+
+        subscription.unsubscribe();
+        consumer.stop();
+    }
 
     @Test
     public void testPullPoint() throws Exception {

Reply via email to