Added:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java?rev=1160748&view=auto
==============================================================================
---
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
(added)
+++
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
Tue Aug 23 15:41:21 2011
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.Date;
+import java.util.logging.Logger;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.frontend.ClientProxy;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.management.InstrumentationManager;
+import org.apache.cxf.testutil.common.AbstractClientServerTestBase;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RMUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ManagedEndpointsTest extends AbstractClientServerTestBase {
+ public static final String PORT = allocatePort(Server.class);
+// public static final String DECOUPLE_PORT =
allocatePort("decoupled.port");
+
+ private static final String SERVER_CFG =
"/org/apache/cxf/systest/ws/rm/managed-server.xml";
+ private static final String CLIENT_CFG =
"/org/apache/cxf/systest/ws/rm/managed-client.xml";
+
+ private static final Logger LOG =
LogUtils.getLogger(ManagedEndpointsTest.class);
+ private static Bus clientBus;
+ private static Bus serverBus;
+ private static InProcessServer server;
+
+ static class InProcessServer implements Runnable {
+ private boolean ready;
+ public void run() {
+ SpringBusFactory bf = new SpringBusFactory();
+ serverBus = bf.createBus(SERVER_CFG);
+ BusFactory.setDefaultBus(serverBus);
+
+ GreeterImpl implementor = new GreeterImpl();
+ String address = "http://localhost:" + PORT +
"/SoapContext/GreeterPort";
+
+ Endpoint ep = Endpoint.create(implementor);
+ ep.publish(address);
+
+ LOG.info("Published greeter endpoint.");
+ ready = true;
+ }
+
+ public boolean isReady() {
+ return ready;
+ }
+ }
+
+ @BeforeClass
+ public static void startServer() throws Exception {
+ server = new InProcessServer();
+ Thread th = new Thread(server);
+ th.start();
+ }
+
+ @AfterClass
+ public static void stopServer() throws Exception {
+ if (null != serverBus) {
+ serverBus.shutdown(false);
+ }
+ }
+
+ @Test
+ public void testManagedEndpointsOneway() throws Exception {
+ checkServerReady(30000);
+
+ SpringBusFactory bf = new SpringBusFactory();
+ clientBus = bf.createBus(CLIENT_CFG);
+ MessageLossSimulator mls = new MessageLossSimulator();
+ clientBus.getOutInterceptors().add(mls);
+
+ BusFactory.setDefaultBus(clientBus);
+
+ RMManager clientManager = clientBus.getExtension(RMManager.class);
+ RMManager serverManager = serverBus.getExtension(RMManager.class);
+
+ InstrumentationManager serverIM =
serverBus.getExtension(InstrumentationManager.class);
+ MBeanServer mbs = serverIM.getMBeanServer();
+ assertNotNull("MBeanServer must be available.", mbs);
+
+ ObjectName clientManagerName =
RMUtils.getManagedObjectName(clientManager);
+ ObjectName serverManagerName =
RMUtils.getManagedObjectName(serverManager);
+
+ Object o;
+ GreeterService gs = new GreeterService();
+ final Greeter greeter = gs.getGreeterPort();
+ updateAddressPort(greeter, ManagedEndpointsTest.PORT);
+ LOG.fine("Created greeter client.");
+
+ org.apache.cxf.endpoint.Endpoint ep =
ClientProxy.getClient(greeter).getEndpoint();
+ String epId = RMUtils.getEndpointIdentifier(ep);
+
+ greeter.greetMeOneWay("one"); // sent
+
+ o = mbs.invoke(clientManagerName, "getEndpointIdentifiers", null,
null);
+ verifyArray("Expected endpoint identifier", o, new String[]{epId});
+
+ o = mbs.invoke(serverManagerName, "getEndpointIdentifiers", null,
null);
+ verifyArray("Expected endpoint identifier", o, new String[]{epId});
+
+ ObjectName clientEndpointName =
RMUtils.getManagedObjectName(clientManager, ep);
+ ObjectName serverEndpointName =
RMUtils.getManagedObjectName(serverManager, ep);
+
+ o = mbs.invoke(clientEndpointName, "getSourceSequenceIds",
+ new Object[]{true}, new String[]{"boolean"});
+ assertTrue("One sequence expected", o instanceof String[] && 1 ==
((String[])o).length);
+ String sseqId = ((String[])o)[0];
+
+ o = mbs.invoke(clientEndpointName, "getCurrentSourceSequenceId", null,
null);
+ assertTrue("Expected sequence identifier", o instanceof String &&
sseqId.equals(o));
+
+ o = mbs.invoke(serverEndpointName, "getDestinationSequenceIds", null,
null);
+ verifyArray("Expected sequence identifier", o, new String[]{sseqId});
+
+ o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", null,
null);
+ assertTrue("One sequence expected", o instanceof String[] && 1 ==
((String[])o).length);
+ String dseqId = ((String[])o)[0];
+
+ o = mbs.invoke(serverEndpointName, "getSourceSequenceIds",
+ new Object[]{true}, new String[]{"boolean"});
+ verifyArray("Expected sequence identifier", o, new String[]{dseqId});
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null,
null);
+ assertTrue("No queued message", o instanceof Integer && 0 ==
((Integer)o).intValue());
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageCount",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ assertTrue("No queued message", o instanceof Integer && 0 ==
((Integer)o).intValue());
+
+ o = mbs.invoke(clientEndpointName, "getCurrentSourceSequence", null,
null);
+ verifySourceSequence(o, sseqId, 1, 0);
+
+ o = mbs.invoke(clientEndpointName, "getSourceSequences",
+ new Object[]{true}, new String[]{"boolean"});
+ assertTrue("One sequence message", o instanceof CompositeData[] && 1
== ((CompositeData[])o).length);
+ verifySourceSequence(((CompositeData[])o)[0], sseqId, 1, 0);
+
+ o = mbs.invoke(clientEndpointName,
"getSourceSequenceAcknowledgedRange",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ verifyArray("Expected range", o, new Long[]{1L, 1L});
+
+ o = mbs.invoke(clientEndpointName,
"getUnAcknowledgedMessageIdentifiers",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ assertTrue("No unacknowledged message", o instanceof Long[] && 0 ==
((Long[])o).length);
+
+ greeter.greetMeOneWay("two"); // getting lost
+ greeter.greetMeOneWay("three"); // sent
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null,
null);
+ assertTrue("One queued message", o instanceof Integer && 1 ==
((Integer)o).intValue());
+
+ o = mbs.invoke(clientEndpointName,
"getSourceSequenceAcknowledgedRange",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L});
+
+ o = mbs.invoke(clientEndpointName,
"getUnAcknowledgedMessageIdentifiers",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ assertTrue("One unacknowledged message", o instanceof Long[] && 1 ==
((Long[])o).length);
+
+ o = mbs.invoke(clientEndpointName, "getRetransmissionStatus",
+ new Object[]{sseqId, 2}, new
String[]{"java.lang.String", "long"});
+ verifyRetransmissionStatus(o, 2L, 0);
+
+ o = mbs.invoke(serverEndpointName,
"getDestinationSequenceAcknowledgedRange",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L});
+
+ // 7 sec retry interval + 3 sec
+ LOG.info("waiting for 10 secs for the retry to complete ...");
+ Thread.sleep(10000);
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null,
null);
+ assertTrue("No queued message", o instanceof Integer && 0 ==
((Integer)o).intValue());
+
+ o = mbs.invoke(clientEndpointName,
"getSourceSequenceAcknowledgedRange",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ verifyArray("Expected range", o, new Long[]{1L, 3L});
+
+ o = mbs.invoke(serverEndpointName,
"getDestinationSequenceAcknowledgedRange",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ verifyArray("Expected range", o, new Long[]{1L, 3L});
+
+ o = mbs.invoke(clientEndpointName,
"getUnAcknowledgedMessageIdentifiers",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ assertTrue("No unacknowledged message", o instanceof Long[] && 0 ==
((Long[])o).length);
+
+ }
+
+ @Test
+ public void testSuspendAndResumeSourceSequence() throws Exception {
+ checkServerReady(30000);
+
+ SpringBusFactory bf = new SpringBusFactory();
+ clientBus = bf.createBus(CLIENT_CFG);
+ MessageLossSimulator mls = new MessageLossSimulator();
+ clientBus.getOutInterceptors().add(mls);
+
+ BusFactory.setDefaultBus(clientBus);
+
+ RMManager clientManager = clientBus.getExtension(RMManager.class);
+
+ InstrumentationManager serverIM =
serverBus.getExtension(InstrumentationManager.class);
+ MBeanServer mbs = serverIM.getMBeanServer();
+ assertNotNull("MBeanServer must be available.", mbs);
+
+ Object o;
+ GreeterService gs = new GreeterService();
+ final Greeter greeter = gs.getGreeterPort();
+ updateAddressPort(greeter, ManagedEndpointsTest.PORT);
+ LOG.fine("Created greeter client.");
+
+ org.apache.cxf.endpoint.Endpoint ep =
ClientProxy.getClient(greeter).getEndpoint();
+
+ ObjectName clientEndpointName =
RMUtils.getManagedObjectName(clientManager, ep);
+
+ greeter.greetMeOneWay("one"); // sent
+
+ o = mbs.invoke(clientEndpointName, "getCurrentSourceSequenceId", null,
null);
+ assertTrue(o instanceof String);
+ String sseqId = (String)o;
+
+ o = mbs.invoke(clientEndpointName,
"getUnAcknowledgedMessageIdentifiers",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ assertTrue("No unacknowledged message", o instanceof Long[] && 0 ==
((Long[])o).length);
+
+ greeter.greetMeOneWay("two"); // sent but suspended
+ greeter.greetMeOneWay("three"); // sent but suspended
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null,
null);
+ assertTrue("One queued message", o instanceof Integer && 1 ==
((Integer)o).intValue());
+
+ mbs.invoke(clientEndpointName, "suspendSourceQueue",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ LOG.info("suspended the source queue: " + sseqId);
+
+
+ // 7 sec retry interval + 3 sec
+ LOG.info("waiting for 10 secs for the retry (suspended)...");
+ Thread.sleep(10000);
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null,
null);
+ assertTrue("One queued message", o instanceof Integer && 1 ==
((Integer)o).intValue());
+
+ mbs.invoke(clientEndpointName, "resumeSourceQueue",
+ new Object[]{sseqId}, new String[]{"java.lang.String"});
+ LOG.info("resumed the source queue: " + sseqId);
+
+ LOG.info("waiting for 15 secs for the retry (resumed)...");
+ Thread.sleep(10000);
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null,
null);
+ assertTrue("No queued messages", o instanceof Integer && 0 ==
((Integer)o).intValue());
+ }
+
+ private void checkServerReady(long max) {
+ long waited = 0;
+ while (waited < max) {
+ if (server.isReady()) {
+ return;
+ }
+ try {
+ Thread.sleep(1000);
+ waited += 1000;
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ fail("server not ready");
+ }
+
+ private <T> void verifyArray(String desc, Object value, T[] target) {
+ assertTrue(desc, target.getClass().isInstance(value));
+ @SuppressWarnings("unchecked")
+ T[] values = (T[])value;
+ for (int i = 0; i < target.length; i++) {
+ assertEquals(desc, target[i], values[i]);
+ }
+ }
+
+ private void verifySourceSequence(Object value, String sid, long num, int
qsize) {
+ assertTrue(value instanceof CompositeData);
+ CompositeData cd = (CompositeData)value;
+ verifyValue(cd, "sequenceId", sid);
+ verifyValue(cd, "currentMessageNumber", num);
+ verifyValue(cd, "queuedMessageCount", qsize);
+ }
+
+ private void verifyRetransmissionStatus(Object value, long num, int count)
{
+ assertTrue(value instanceof CompositeData);
+ CompositeData cd = (CompositeData)value;
+ verifyValue(cd, "messageNumber", num);
+ verifyValue(cd, "resends", count);
+ Date now = new Date();
+ if (count > 0) {
+ assertTrue(now.after((Date)getValue(cd, "previous")));
+ }
+ assertTrue(now.before((Date)getValue(cd, "next")));
+ }
+
+ private void verifyValue(CompositeData cd, String key, Object value) {
+ Object o = getValue(cd, key);
+ assertEquals("Expected value", value, o);
+ }
+
+ private Object getValue(CompositeData cd, String key) {
+ Object o = null;
+ try {
+ o = cd.get(key);
+ } catch (Exception e) {
+ fail("Unable to retrieve the value for " + key);
+ }
+ return o;
+ }
+}
Propchange:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
------------------------------------------------------------------------------
svn:executable = *
Added:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-client.xml
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-client.xml?rev=1160748&view=auto
==============================================================================
---
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-client.xml
(added)
+++
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-client.xml
Tue Aug 23 15:41:21 2011
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:cxf="http://cxf.apache.org/core"
+ xmlns:wsa="http://cxf.apache.org/ws/addressing"
+ xmlns:http="http://cxf.apache.org/transports/http/configuration"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+ xsi:schemaLocation="
+ http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
+ http://cxf.apache.org/transports/http/configuration
http://cxf.apache.org/schemas/configuration/http-conf.xsd
+ http://schemas.xmlsoap.org/ws/2005/02/rm/policy
http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+ http://cxf.apache.org/ws/rm/manager
http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <cxf:bus>
+ <cxf:features>
+ <cxf:logging/>
+ <wsa:addressing/>
+ <wsrm-mgr:reliableMessaging>
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval
Milliseconds="7000"/>
+ <wsrm-policy:AcknowledgementInterval Milliseconds="0"/>
+ </wsrm-policy:RMAssertion>
+ <wsrm-mgr:destinationPolicy>
+ <wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+ </wsrm-mgr:destinationPolicy>
+ </wsrm-mgr:reliableMessaging>
+ </cxf:features>
+ </cxf:bus>
+
+ <bean id="org.apache.cxf.management.InstrumentationManager"
+ class="org.apache.cxf.management.jmx.InstrumentationManagerImpl">
+ <property name="bus" ref="cxf" />
+ <property name="enabled" value="true" />
+ <property name="createMBServerConnectorFactory" value="false"/>
+ <property name="JMXServiceURL"
value="service:jmx:rmi:///jndi/rmi://localhost:9914/jmxrmi" />
+ </bean>
+
+</beans>
Propchange:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-client.xml
------------------------------------------------------------------------------
svn:executable = *
Added:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-server.xml
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-server.xml?rev=1160748&view=auto
==============================================================================
---
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-server.xml
(added)
+++
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-server.xml
Tue Aug 23 15:41:21 2011
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:cxf="http://cxf.apache.org/core"
+ xmlns:wsa="http://cxf.apache.org/ws/addressing"
+ xmlns:http="http://cxf.apache.org/transports/http/configuration"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+ xsi:schemaLocation="
+ http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
+ http://cxf.apache.org/transports/http/configuration
http://cxf.apache.org/schemas/configuration/http-conf.xsd
+ http://schemas.xmlsoap.org/ws/2005/02/rm/policy
http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+ http://cxf.apache.org/ws/rm/manager
http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <cxf:bus>
+ <cxf:features>
+ <cxf:logging/>
+ <wsa:addressing/>
+ <wsrm-mgr:reliableMessaging>
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval
Milliseconds="4000"/>
+ <wsrm-policy:AcknowledgementInterval Milliseconds="0"/>
+ </wsrm-policy:RMAssertion>
+ <wsrm-mgr:destinationPolicy >
+ <wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+ </wsrm-mgr:destinationPolicy>
+ </wsrm-mgr:reliableMessaging>
+ </cxf:features>
+ </cxf:bus>
+
+ <bean id="org.apache.cxf.management.InstrumentationManager"
+ class="org.apache.cxf.management.jmx.InstrumentationManagerImpl">
+ <property name="bus" ref="cxf" />
+ <property name="enabled" value="true" />
+ <property name="JMXServiceURL"
value="service:jmx:rmi:///jndi/rmi://localhost:9914/jmxrmi" />
+ </bean>
+
+</beans>
Propchange:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-server.xml
------------------------------------------------------------------------------
svn:executable = *