2012/3/6 Dennis Sosnoski <[email protected]>: > Hi Aki, > > On reviewing the code, I think you'll also need to make a change to > Destination.acknowledge() for this to work correctly. Right now > Destination.acknowledge() is what persists the received message to the > store, so if it's not called until processing is complete messages will > never be persisted. This should be moved out to a separate method which > can be called by RMInInterceptor.
Hi Dennis, In the robust mode, the message does not need to be persisted but only its sequence needs to be updated after the call completes. I am using this robust mode, which was introduced for normal oneway robost-inonly calls, as a workaround for the ws-rm's missing retry mechanism for the failed inbound service invocation. This is the reason why we skip the persistence of the inbound message for the robust-oneway mode at RMInInterceptor. If we persisted the message and updated the sequence in that phase, and later when the service invocation fails, we would need to revert/rollback the sequence in memory as well as in persistence. So, to avoid this situation, my change delays updating the sequence (i.e., calling dest.acknowledged) until the call succeeds, and that is when the DeliveryInterceptor reports that the call is completed. So I think it is correct in this context. > > Separately, it looks like we need to change the code to handle passing > persisted messages on to the application when recovering from the store. > It looks to me like at present messages will be acknowledged by the RM > layer but never delivered to the application if there's a crash or > shutdown while they're waiting to be processed. What do you think? Yes. There are a few things we need to do. Currently, the original transported wire message is persisted, which is similar to the restriction which we have in the outbound side. For the inbound side, I think we can change this so that we capture the post ws-rm handling (that implies automatically post ws-sec handling) with the endpoint info so that we can restart this message from that step on. Here, we should introduce an automatic retry mechanism. To ensure the at-least-one invocation of the service, we can go without transaction and let the service check the duplicates in their domain. For the exactly-once invocation, we will need a transaction. I would like to work on this part if that is okay with you. We can discuss further on this topic. But a bigger remaining thing is the desired change in the persistence/retry mechanism for the outbound so that messages can be persisted even when a sequence is not yet available and the retransmitted messages can be retransmitted with the updated information. I only thought about it a little bit. Are you or will you be working on it? Thanks. Regards, aki > > - Dennis > > > On 03/06/2012 08:51 PM, [email protected] wrote: >> Author: ay >> Date: Tue Mar 6 07:51:02 2012 >> New Revision: 1297370 >> >> URL: http://svn.apache.org/viewvc?rev=1297370&view=rev >> Log: >> Merged revisions 1297296 via svnmerge from >> https://svn.apache.org/repos/asf/cxf/trunk >> >> ........ >> r1297296 | ay | 2012-03-06 00:57:14 +0100 (Tue, 06 Mar 2012) | 1 line >> >> [CXF-4164] Robust-InOnly processing with WS-RM must delay updating the >> sequence until message delivery >> ........ >> >> Added: >> >> cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java >> - copied unchanged from r1297296, >> cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java >> >> cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java >> - copied unchanged from r1297296, >> cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java >> >> cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml >> - copied unchanged from r1297296, >> cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml >> Modified: >> cxf/branches/2.5.x-fixes/ (props changed) >> >> cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java >> >> cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java >> >> cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java >> >> Propchange: cxf/branches/2.5.x-fixes/ >> ------------------------------------------------------------------------------ >> svn:mergeinfo = /cxf/trunk:1297296 >> >> Propchange: cxf/branches/2.5.x-fixes/ >> ------------------------------------------------------------------------------ >> Binary property 'svnmerge-integrated' - no diff available. >> >> Modified: >> cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java >> URL: >> http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff >> ============================================================================== >> --- >> cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java >> (original) >> +++ >> cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java >> Tue Mar 6 07:51:02 2012 >> @@ -23,6 +23,7 @@ import java.util.logging.Logger; >> >> import org.apache.cxf.common.logging.LogUtils; >> import org.apache.cxf.message.Message; >> +import org.apache.cxf.message.MessageUtils; >> import org.apache.cxf.phase.Phase; >> >> /** >> @@ -42,6 +43,12 @@ public class RMDeliveryInterceptor exten >> >> public void handle(Message message) throws SequenceFault, RMException { >> LOG.entering(getClass().getName(), "handleMessage"); >> - getManager().getDestination(message).processingComplete(message); >> + Destination dest = getManager().getDestination(message); >> + final boolean robust = >> + >> MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY)); >> + if (robust) { >> + dest.acknowledge(message); >> + } >> + dest.processingComplete(message); >> } >> } >> >> Modified: >> cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java >> URL: >> http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff >> ============================================================================== >> --- >> cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java >> (original) >> +++ >> cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java >> Tue Mar 6 07:51:02 2012 >> @@ -25,6 +25,7 @@ import java.util.logging.Logger; >> >> import org.apache.cxf.common.logging.LogUtils; >> import org.apache.cxf.message.Message; >> +import org.apache.cxf.message.MessageUtils; >> import org.apache.cxf.ws.addressing.AddressingPropertiesImpl; >> import org.apache.cxf.ws.addressing.ContextUtils; >> import org.apache.cxf.ws.addressing.MAPAggregator; >> @@ -150,7 +151,11 @@ public class RMInInterceptor extends Abs >> >> void processSequence(Destination destination, Message message) >> throws SequenceFault, RMException { >> - destination.acknowledge(message); >> + final boolean robust = >> + >> MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY)); >> + if (!robust) { >> + destination.acknowledge(message); >> + } >> } >> >> void processDeliveryAssurance(RMProperties rmps) { >> >> Modified: >> cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java >> URL: >> http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java?rev=1297370&r1=1297369&r2=1297370&view=diff >> ============================================================================== >> --- >> cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java >> (original) >> +++ >> cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java >> Tue Mar 6 07:51:02 2012 >> @@ -18,186 +18,12 @@ >> */ >> package org.apache.cxf.systest.ws.rm; >> >> -import java.net.MalformedURLException; >> -import java.util.logging.Logger; >> - >> -import javax.xml.ws.Endpoint; >> - >> -import org.apache.cxf.Bus; >> -import org.apache.cxf.BusFactory; >> -import org.apache.cxf.bus.spring.SpringBusFactory; >> -import org.apache.cxf.common.logging.LogUtils; >> -import org.apache.cxf.greeter_control.Control; >> -import org.apache.cxf.greeter_control.ControlService; >> -import org.apache.cxf.greeter_control.Greeter; >> -import org.apache.cxf.greeter_control.GreeterService; >> -import org.apache.cxf.greeter_control.types.FaultLocation; >> -import org.apache.cxf.interceptor.ServiceInvokerInterceptor; >> -import org.apache.cxf.phase.Phase; >> -import org.apache.cxf.test.TestUtilities; >> -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; >> -import org.apache.cxf.testutil.common.AbstractBusTestServerBase; >> -import org.apache.cxf.ws.rm.RMManager; >> - >> -import org.junit.After; >> -import org.junit.AfterClass; >> -import org.junit.BeforeClass; >> -import org.junit.Test; >> - >> /** >> * Tests the acknowledgement delivery back to the non-decoupled port when >> there is some >> * error at the provider side and how its behavior is affected by the >> robust in-only mode setting. >> */ >> -public class ServiceInvocationAckTest extends >> AbstractBusClientServerTestBase { >> - public static final String PORT = allocatePort(Server.class); >> - >> - private static final Logger LOG = >> LogUtils.getLogger(ServiceInvocationAckTest.class); >> - >> - private static final String CONTROL_PORT_ADDRESS = >> - "http://localhost:" + PORT + "/SoapContext/ControlPort"; >> - >> - public static class Server extends AbstractBusTestServerBase { >> - >> - protected void run() { >> - SpringBusFactory factory = new SpringBusFactory(); >> - Bus bus = factory.createBus(); >> - BusFactory.setDefaultBus(bus); >> - setBus(bus); >> - >> - ControlImpl implementor = new ControlImpl(); >> - implementor.setAddress("http://localhost:" + PORT + >> "/SoapContext/GreeterPort"); >> - GreeterImpl greeterImplementor = new GreeterImpl(); >> - implementor.setImplementor(greeterImplementor); >> - Endpoint.publish(CONTROL_PORT_ADDRESS, implementor); >> - LOG.fine("Published control endpoint."); >> - } >> - >> - public static void main(String[] args) { >> - try { >> - Server s = new Server(); >> - s.start(); >> - } catch (Exception ex) { >> - ex.printStackTrace(); >> - System.exit(-1); >> - } finally { >> - System.out.println("done!"); >> - } >> - } >> - } >> - >> - private Bus controlBus; >> - private Control control; >> - private Bus greeterBus; >> - private Greeter greeter; >> - >> - >> - @BeforeClass >> - public static void startServers() throws Exception { >> - TestUtilities.setKeepAliveSystemProperty(false); >> - assertTrue("server did not launch correctly", >> launchServer(Server.class, true)); >> - } >> - >> - @AfterClass >> - public static void cleanup() { >> - TestUtilities.recoverKeepAliveSystemProperty(); >> - } >> - >> - @After >> - public void tearDown() { >> - if (null != greeter) { >> - assertTrue("Failed to stop greeter.", >> control.stopGreeter(null)); >> - greeterBus.shutdown(true); >> - greeterBus = null; >> - } >> - if (null != control) { >> - assertTrue("Failed to stop greeter", control.stopGreeter(null)); >> - controlBus.shutdown(true); >> - } >> - } >> - >> - @Test >> - public void testDefaultInvocationHandling() throws Exception { >> +public class ServiceInvocationAckTest extends ServiceInvocationAckBase { >> + protected void setupGreeter() throws Exception { >> setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml"); >> - >> - control.setRobustInOnlyMode(false); >> - >> - FaultLocation location = new >> org.apache.cxf.greeter_control.types.ObjectFactory() >> - .createFaultLocation(); >> - location.setPhase(Phase.INVOKE); >> - location.setBefore(ServiceInvokerInterceptor.class.getName()); >> - >> - RMManager manager = greeterBus.getExtension(RMManager.class); >> - >> - // the message is acked and the invocation takes place >> - greeter.greetMeOneWay("one"); >> - Thread.sleep(6000L); >> - assertTrue("RetransmissionQueue must be empty", >> manager.getRetransmissionQueue().isEmpty()); >> - >> - control.setFaultLocation(location); >> - >> - // the invocation fails but the message is acked because the >> delivery succeeds >> - greeter.greetMeOneWay("two"); >> - Thread.sleep(6000L); >> - assertTrue("RetransmissionQueue must be empty", >> manager.getRetransmissionQueue().isEmpty()); >> - } >> - >> - @Test >> - public void testRobustInvocationHandling() throws Exception { >> - setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml"); >> - >> - control.setRobustInOnlyMode(true); >> - >> - FaultLocation location = new >> org.apache.cxf.greeter_control.types.ObjectFactory() >> - .createFaultLocation(); >> - location.setPhase(Phase.INVOKE); >> - location.setBefore(ServiceInvokerInterceptor.class.getName()); >> - >> - RMManager manager = greeterBus.getExtension(RMManager.class); >> - >> - >> - // the message is acked and the invocation takes place >> - greeter.greetMeOneWay("one"); >> - Thread.sleep(6000L); >> - assertTrue("RetransmissionQueue must be empty", >> manager.getRetransmissionQueue().isEmpty()); >> - >> - control.setFaultLocation(location); >> - >> - // the invocation fails but the message is acked because the >> delivery succeeds >> - greeter.greetMeOneWay("two"); >> - Thread.sleep(6000L); >> - assertFalse("RetransmissionQueue must not be empty", >> manager.getRetransmissionQueue().isEmpty()); >> - >> - location.setPhase(null); >> - control.setFaultLocation(location); >> - >> - // the retransmission succeeds and the invocation succeeds, the >> message is acked >> - Thread.sleep(6000L); >> - assertTrue("RetransmissionQueue must be empty", >> manager.getRetransmissionQueue().isEmpty()); >> - >> - } >> - >> - private void setupGreeter(String cfgResource) throws >> NumberFormatException, MalformedURLException { >> - >> - SpringBusFactory bf = new SpringBusFactory(); >> - >> - controlBus = bf.createBus(); >> - BusFactory.setDefaultBus(controlBus); >> - >> - ControlService cs = new ControlService(); >> - control = cs.getControlPort(); >> - updateAddressPort(control, PORT); >> - >> - assertTrue("Failed to start greeter", >> control.startGreeter(cfgResource)); >> - >> - greeterBus = bf.createBus(cfgResource); >> - BusFactory.setDefaultBus(greeterBus); >> - LOG.fine("Initialised greeter bus with configuration: " + >> cfgResource); >> - >> - GreeterService gs = new GreeterService(); >> - >> - greeter = gs.getGreeterPort(); >> - updateAddressPort(greeter, PORT); >> - LOG.fine("Created greeter client."); >> - >> } >> } >> >>
