Repository: cxf Updated Branches: refs/heads/master b87fdb64d -> 4466cf3b9
[CXF-7144]support to specify pullpoint queue name of WS-N Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/4466cf3b Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/4466cf3b Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/4466cf3b Branch: refs/heads/master Commit: 4466cf3b9f1f5e75344b73c532b9da1e74ced345 Parents: b87fdb6 Author: Freeman Fang <[email protected]> Authored: Mon Nov 21 15:47:51 2016 +0800 Committer: Freeman Fang <[email protected]> Committed: Mon Nov 21 15:47:51 2016 +0800 ---------------------------------------------------------------------- .../apache/cxf/wsn/client/CreatePullPoint.java | 8 +++++++ .../cxf/wsn/client/NotificationBroker.java | 2 ++ .../apache/cxf/wsn/jms/JmsCreatePullPoint.java | 14 +++++++++++- .../java/org/apache/cxf/wsn/WsnBrokerTest.java | 23 ++++++++++++++++++++ 4 files changed, 46 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/4466cf3b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java ---------------------------------------------------------------------- diff --git a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java index 9b23077..40c8bb5 100644 --- a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java +++ b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java @@ -54,4 +54,12 @@ public class CreatePullPoint implements Referencable { CreatePullPointResponse response = createPullPoint.createPullPoint(request); return new PullPoint(response.getPullPoint()); } + + public PullPoint create(String queueName) throws UnableToCreatePullPointFault { + org.oasis_open.docs.wsn.b_2.CreatePullPoint request + = new org.oasis_open.docs.wsn.b_2.CreatePullPoint(); + request.getOtherAttributes().put(NotificationBroker.QNAME_PULLPOINT_QUEUE_NAME, queueName); + CreatePullPointResponse response = createPullPoint.createPullPoint(request); + return new PullPoint(response.getPullPoint()); + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/4466cf3b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java ---------------------------------------------------------------------- diff --git a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java index a532f3a..d267196 100644 --- a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java +++ b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java @@ -70,6 +70,8 @@ public class NotificationBroker implements Referencable { public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent"); public static final QName QNAME_INITIAL_TERMINATION_TIME = new QName(WSN_URI, "InitialTerminationTime"); + + public static final QName QNAME_PULLPOINT_QUEUE_NAME = new QName(WSN_URI, "pullPointQueueName"); private org.oasis_open.docs.wsn.brw_2.NotificationBroker broker; http://git-wip-us.apache.org/repos/asf/cxf/blob/4466cf3b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java ---------------------------------------------------------------------- diff --git a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java index 80a2162..3fe3780 100644 --- a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java +++ b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java @@ -23,6 +23,7 @@ import javax.jms.ConnectionFactory; import org.apache.cxf.wsn.AbstractCreatePullPoint; import org.apache.cxf.wsn.AbstractPullPoint; +import org.apache.cxf.wsn.client.NotificationBroker; import org.oasis_open.docs.wsn.b_2.CreatePullPoint; public class JmsCreatePullPoint extends AbstractCreatePullPoint { @@ -57,9 +58,20 @@ public class JmsCreatePullPoint extends AbstractCreatePullPoint { @Override protected String createPullPointName(CreatePullPoint createPullPointRequest) { + String name = null; + if (createPullPointRequest.getOtherAttributes().get( + NotificationBroker.QNAME_PULLPOINT_QUEUE_NAME) != null) { + //try use the sepcified pullpoint queue instead a generated one + //so that we can reuse this durable pullpoint queue between the + //broker restarts + name = createPullPointRequest.getOtherAttributes().get( + NotificationBroker.QNAME_PULLPOINT_QUEUE_NAME); + } else { + name = super.createPullPointName(createPullPointRequest); + } // For JMS, avoid using dashes in the pullpoint name (which is also the queue name, // as it will lead to problems with some JMS providers - String name = super.createPullPointName(createPullPointRequest); + name = name.replace("-", ""); return name; } http://git-wip-us.apache.org/repos/asf/cxf/blob/4466cf3b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java ---------------------------------------------------------------------- diff --git a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java index b81e70c..40b1c44 100644 --- a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java +++ b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java @@ -201,6 +201,29 @@ public abstract class WsnBrokerTest extends Assert { subscription.unsubscribe(); pullPoint.destroy(); } + + @Test + public void testPullPointWithQueueName() throws Exception { + PullPoint pullPoint = createPullPoint.create("testQueue"); + Subscription subscription = notificationBroker.subscribe(pullPoint, "myTopic"); + notificationBroker.notify("myTopic", + new JAXBElement<String>(new QName("urn:test:org", "foo"), + String.class, "bar")); + + boolean received = false; + for (int i = 0; i < 50; i++) { + List<NotificationMessageHolderType> messages = pullPoint.getMessages(10); + if (!messages.isEmpty()) { + received = true; + break; + } + Thread.sleep(100); + } + assertTrue(received); + + subscription.unsubscribe(); + pullPoint.destroy(); + } @Test public void testPublisher() throws Exception {
