Author: ay
Date: Wed Mar 21 08:09:28 2012
New Revision: 1303303
URL: http://svn.apache.org/viewvc?rev=1303303&view=rev
Log:
[CXF-4188] Robust-InOnly processing with WS-RM to perform AtMostOnce check
Added:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
(with props)
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
(original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Wed
Mar 21 08:09:28 2012
@@ -113,6 +113,9 @@ public class Destination extends Abstrac
if (null != seq) {
if (seq.applyDeliveryAssurance(sequenceType.getMessageNumber(),
message)) {
+ if
(MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY)))
{
+ return;
+ }
seq.acknowledge(message);
if (null != rmps.getCloseSequence()) {
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
---
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
(original)
+++
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
Wed Mar 21 08:09:28 2012
@@ -21,8 +21,10 @@ package org.apache.cxf.ws.rm;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,6 +34,7 @@ import org.apache.cxf.continuations.Cont
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.rm.manager.AcksPolicyType;
import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
@@ -60,6 +63,7 @@ public class DestinationSequence extends
private long inProcessNumber;
private long highNumberCompleted;
private List<Continuation> continuations = new LinkedList<Continuation>();
+ private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
public DestinationSequence(Identifier i, EndpointReferenceType a,
Destination d, ProtocolVariation pv) {
this(i, a, 0, null, pv);
@@ -238,10 +242,26 @@ public class DestinationSequence extends
Continuation cont = getContinuation(message);
DeliveryAssuranceType da =
destination.getManager().getDeliveryAssurance();
boolean canSkip = !da.isSetAtLeastOnce() && !da.isSetExactlyOnce();
+ boolean robust = false;
+ boolean robustDelivering = false;
+ if (message != null) {
+ robust =
MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
+ if (robust) {
+ robustDelivering =
+
MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY));
+ }
+ }
+ if (robust && !robustDelivering) {
+ // no check performed if in robust and not in delivering
+ deliveringMessageNumbers.remove(mn);
+ return true;
+ }
if (cont != null && da.isSetInOrder() && !cont.isNew()) {
return waitInQueue(mn, canSkip, message, cont);
}
- if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) &&
isAcknowledged(mn)) {
+ if ((da.isSetExactlyOnce() || da.isSetAtMostOnce())
+ && (isAcknowledged(mn)
+ || (robustDelivering &&
deliveringMessageNumbers.contains(mn)))) {
// acknowledge at first opportunity following duplicate message
scheduleImmediateAcknowledgement();
@@ -251,6 +271,9 @@ public class DestinationSequence extends
throw new RMException(msg);
}
+ if (robustDelivering) {
+ deliveringMessageNumbers.add(mn);
+ }
if (da.isSetInOrder()) {
return waitInQueue(mn, canSkip, message, cont);
}
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
---
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
(original)
+++
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
Wed Mar 21 08:09:28 2012
@@ -47,6 +47,7 @@ public class RMDeliveryInterceptor exten
final boolean robust =
MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
if (robust) {
+ message.remove(RMMessageConstants.DELIVERING_ROBUST_ONEWAY);
dest.acknowledge(message);
}
dest.processingComplete(message);
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
(original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
Wed Mar 21 08:09:28 2012
@@ -153,9 +153,11 @@ public class RMInInterceptor extends Abs
throws SequenceFault, RMException {
final boolean robust =
MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
- if (!robust) {
- destination.acknowledge(message);
- }
+ if (robust) {
+ // set this property to change the acknlowledging behavior
+ message.put(RMMessageConstants.DELIVERING_ROBUST_ONEWAY,
Boolean.TRUE);
+ }
+ destination.acknowledge(message);
}
void processDeliveryAssurance(RMProperties rmps) {
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
---
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
(original)
+++
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
Wed Mar 21 08:09:28 2012
@@ -42,6 +42,10 @@ public final class RMMessageConstants {
public static final String SAVED_CONTENT =
"org.apache.cxf.ws.rm.content";
+ // keep this constant in the ws-rm package until it finds a general use
outside of ws-rm
+ static final String DELIVERING_ROBUST_ONEWAY =
+ "org.apache.cxf.oneway.robust.delivering";
+
/**
* Prevents instantiation.
*/
Added:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java?rev=1303303&view=auto
==============================================================================
---
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
(added)
+++
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
Wed Mar 21 08:09:28 2012
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Logger;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.ws.rm.RMManager;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the addition of WS-RM properties to application messages and the
+ * exchange of WS-RM protocol messages.
+ */
+public class RobustServiceAtMostOnceTest extends
AbstractBusClientServerTestBase {
+ public static final String PORT = allocatePort(Server.class);
+ public static final String GREETMEONEWAY_ACTION
+ = "http://cxf.apache.org/greeter_control/Greeter/greetMeOneWayRequest";
+ private static final Logger LOG =
LogUtils.getLogger(RobustServiceAtMostOnceTest.class);
+
+ private static GreeterCounterImpl serverGreeter;
+ private Greeter greeter;
+
+
+ public static class Server extends AbstractBusTestServerBase {
+
+ protected void run() {
+ SpringBusFactory bf = new SpringBusFactory();
+ // use a at-most-once server with sync ack processing
+ Bus bus =
bf.createBus("/org/apache/cxf/systest/ws/rm/atmostonce.xml");
+ BusFactory.setDefaultBus(bus);
+
bus.getExtension(RMManager.class).getRMAssertion().getAcknowledgementInterval()
+ .setMilliseconds(0L);
+
+ // add some intentional processing delay at inbound
+ SlowProcessingSimulator sps = new SlowProcessingSimulator();
+
sps.setAction("http://cxf.apache.org/greeter_control/Greeter/greetMeOneWayRequest");
+ sps.setDelay(10000L);
+ bus.getInInterceptors().add(sps);
+ serverGreeter = new GreeterCounterImpl();
+ String address = "http://localhost:" + PORT +
"/SoapContext/GreeterPort";
+
+ // publish this robust oneway endpoint
+ Endpoint ep = Endpoint.create(serverGreeter);
+ ep.getProperties().put(Message.ROBUST_ONEWAY, Boolean.TRUE);
+ ep.publish(address);
+ LOG.info("Published greeter endpoint.");
+ }
+
+ public static void main(String[] args) {
+ try {
+ Server s = new Server();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+ assertTrue("server did not launch correctly",
launchServer(Server.class, true));
+ }
+
+ @Test
+ public void testRobustAtMostOnceWithSlowProcessing() throws Exception {
+ LOG.fine("Creating greeter client");
+ SpringBusFactory bf = new SpringBusFactory();
+ bus = bf.createBus("/org/apache/cxf/systest/ws/rm/seqlength1.xml");
+ // set the client retry interval much shorter than the slow processing
delay
+ RMManager manager = bus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval()
+ .setMilliseconds(3000L);
+
+ BusFactory.setDefaultBus(bus);
+ GreeterService gs = new GreeterService();
+ greeter = gs.getGreeterPort();
+ updateAddressPort(greeter, PORT);
+
+ LOG.fine("Invoking greeter");
+ greeter.greetMeOneWay("one");
+ Thread.sleep(10000);
+
+ assertEquals("invoked too many times", 1, serverGreeter.getCount());
+ assertTrue("still in retransmission",
manager.getRetransmissionQueue().isEmpty());
+ }
+
+ private static class GreeterCounterImpl extends GreeterImpl {
+ private int count;
+
+ public void greetMeOneWay(String arg0) {
+ super.greetMeOneWay(arg0);
+ count++;
+ }
+
+ public int getCount() {
+ return count;
+ }
+ }
+}
Added:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java?rev=1303303&view=auto
==============================================================================
---
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
(added)
+++
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
Wed Mar 21 08:09:28 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.binding.soap.SoapBindingConstants;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.ContextUtils;
+
+public class SlowProcessingSimulator extends AbstractPhaseInterceptor<Message>
{
+ private static final Logger LOG =
LogUtils.getLogger(SlowProcessingSimulator.class);
+
+ private long delay = 10000L;
+ private String action;
+
+ public SlowProcessingSimulator() {
+ this(Phase.USER_PROTOCOL);
+ }
+
+ public SlowProcessingSimulator(String p) {
+ super(p);
+ }
+
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+
+ public String getAction() {
+ return action;
+ }
+
+ public void setAction(String action) {
+ this.action = action;
+ }
+
+ public void handleMessage(Message message) throws Fault {
+ try {
+ // sleep delay msec for the specified action or any action if
unspecified.
+ String a = getAction(message);
+ LOG.log(Level.INFO, "action=" + a);
+ if (null == action || action.equals(a)) {
+ LOG.log(Level.INFO, "sleeping " + delay + " msec ...");
+ Thread.sleep(delay);
+ }
+ } catch (InterruptedException e) {
+ LOG.log(Level.INFO, "interrupted");
+ }
+ LOG.log(Level.INFO, "continuing");
+ }
+
+ private String getAction(Message message) {
+ final AddressingProperties ap = ContextUtils.retrieveMAPs(message,
false, false);
+ if (ap != null && ap.getAction() != null) {
+ return ap.getAction().getValue();
+ }
+ return (String)message.get(SoapBindingConstants.SOAP_ACTION);
+ }
+
+}
Propchange:
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
------------------------------------------------------------------------------
svn:executable = *