Author: andreasmyth Date: Fri May 25 06:56:23 2007 New Revision: 541656 URL: http://svn.apache.org/viewvc?view=rev&rev=541656 Log: * Defer creation of the RMEndpoint (and insertion into a strong hashmap) until a) first message is processed on an interceptor chain including RM interceptors or b there actually are messages to recover/resend from previous sessions. * Delete unused sequences that were offered as part of CreateSequence requests thereby eliminating the need to terminate them explicitly by sending of an out-of-band LastMessage and a subsequent TerminateSequence. Should address the failure of the testTerminateOnShutdown test I disabled recently.
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=541656&r1=541655&r2=541656 ============================================================================== --- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original) +++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Fri May 25 06:56:23 2007 @@ -361,29 +361,36 @@ void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit) { if (null == store || null == retransmissionQueue) { return; - } - - RMEndpoint rme = createReliableEndpoint(endpoint); - rme.initialise(conduit, null); - reliableEndpoints.put(endpoint, rme); + } String id = RMUtils.getEndpointIdentifier(endpoint); - LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}", - new Object[] {null == conduit ? "client" : "server", id}); + Collection<SourceSequence> sss = store.getSourceSequences(id); if (null == sss || 0 == sss.size()) { return; } LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size()); - for (SourceSequence ss : sss) { - rme.getSource().addSequence(ss, false); + + RMEndpoint rme = null; + + for (SourceSequence ss : sss) { Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true); - if (null == ms) { + if (null == ms || 0 == ms.size()) { continue; } LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size()); - for (RMMessage m : ms) { + + if (null == rme) { + LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}", + new Object[] {null == conduit ? "client" : "server", id}); + rme = createReliableEndpoint(endpoint); + rme.initialise(conduit, null); + reliableEndpoints.put(endpoint, rme); + } + rme.getSource().addSequence(ss, false); + + for (RMMessage m : ms) { Message message = new MessageImpl(); Exchange exchange = new ExchangeImpl(); @@ -415,7 +422,7 @@ message.setContent(byte[].class, m.getContent()); retransmissionQueue.addUnacknowledged(message); - } + } } retransmissionQueue.start(); Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?view=diff&rev=541656&r1=541655&r2=541656 ============================================================================== --- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original) +++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Fri May 25 06:56:23 2007 @@ -19,6 +19,7 @@ package org.apache.cxf.ws.rm; +import java.math.BigInteger; import java.util.Collections; import java.util.List; import java.util.logging.Level; @@ -215,18 +216,28 @@ // the following may be necessary if the last message for this sequence was a oneway // request and hence there was no response to which a last message could have been added - for (SourceSequence outboundSeq : reliableEndpoint.getSource().getAllSequences()) { + // REVISIT: A last message for the correlated sequence should have been sent by the time + // the last message for the underlying sequence was received. + + Source source = reliableEndpoint.getSource(); + + for (SourceSequence outboundSeq : source.getAllSequences()) { if (outboundSeq.offeredBy(sid) && !outboundSeq.isLastMessage()) { + if (BigInteger.ZERO.equals(outboundSeq.getCurrentMessageNr())) { + source.removeSequence(outboundSeq); + } // send an out of band message with an empty body and a // sequence header containing a lastMessage element. + /* Proxy proxy = new Proxy(reliableEndpoint); try { proxy.lastMessage(outboundSeq); } catch (RMException ex) { LogUtils.log(LOG, Level.SEVERE, "CORRELATED_SEQ_TERMINATION_EXC", ex); } + */ break; } Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?view=diff&rev=541656&r1=541655&r2=541656 ============================================================================== --- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original) +++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Fri May 25 06:56:23 2007 @@ -530,9 +530,7 @@ RetransmissionQueue queue = control.createMock(RetransmissionQueue.class); manager.setStore(store); manager.setRetransmissionQueue(queue); - manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>()); - RMEndpoint rme = control.createMock(RMEndpoint.class); - EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme); + Collection<SourceSequence> sss = new ArrayList<SourceSequence>(); if (null != ss) { sss.add(ss); @@ -540,11 +538,7 @@ EasyMock.expect(store.getSourceSequences("{S}s.{P}p")).andReturn(sss); if (null == ss) { return; - } - Source source = control.createMock(Source.class); - EasyMock.expect(rme.getSource()).andReturn(source); - source.addSequence(ss, false); - EasyMock.expectLastCall(); + } Collection<RMMessage> ms = new ArrayList<RMMessage>(); if (null != m) { @@ -557,6 +551,15 @@ if (null == m) { return; } + + manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>()); + RMEndpoint rme = control.createMock(RMEndpoint.class); + EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme); + Source source = control.createMock(Source.class); + EasyMock.expect(rme.getSource()).andReturn(source); + source.addSequence(ss, false); + EasyMock.expectLastCall(); + Service service = control.createMock(Service.class); EasyMock.expect(endpoint.getService()).andReturn(service); Binding binding = control.createMock(Binding.class); Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=541656&r1=541655&r2=541656 ============================================================================== --- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original) +++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Fri May 25 06:56:23 2007 @@ -1219,8 +1219,7 @@ mf.verifyLastMessage(new boolean[3], false); mf.verifyAcknowledgements(new boolean[] {false, true, true}, false); } - - @Ignore + @Test public void testTerminateOnShutdown() throws Exception { if (!doTestTerminateOnShutdown) {