Repository: cxf Updated Branches: refs/heads/3.0.x-fixes de2acc903 -> 0aec1458f
[CXF-6667] Closing a source sequence in WS-RM may lead to inconsistent sequence status Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0aec1458 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0aec1458 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0aec1458 Branch: refs/heads/3.0.x-fixes Commit: 0aec1458f4c69adc47c8613d9f41bef23655710c Parents: de2acc9 Author: Akitoshi Yoshida <[email protected]> Authored: Wed Nov 4 15:32:57 2015 +0100 Committer: Akitoshi Yoshida <[email protected]> Committed: Thu Nov 5 13:51:33 2015 +0100 ---------------------------------------------------------------------- .../cxf/ws/rm/RMCaptureOutInterceptor.java | 6 + .../java/org/apache/cxf/ws/rm/RMEndpoint.java | 1 - .../apache/cxf/systest/ws/rm/SequenceTest.java | 125 +++++++++++++++++++ 3 files changed, 131 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/0aec1458/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java index b52a6e8..8f1df9e 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java @@ -199,6 +199,12 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> { getManager().initializeInterceptorChain(msg); //doneCaptureMessage(msg); captureMessage(msg); + } else if (isLastMessage) { + // got either the rm11 CS or the rm10 empty LM + RMStore store = getManager().getStore(); + if (null != store) { + store.persistOutgoing(rmpsOut.getSourceSequence(), null); + } } } private void captureMessage(Message message) { http://git-wip-us.apache.org/repos/asf/cxf/blob/0aec1458/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java index 3146bac..e393124 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java @@ -816,7 +816,6 @@ public class RMEndpoint { // REVISIT: this may be non-standard // getProxy().ackRequested(seq); } else { - getProxy().lastMessage(seq); } } catch (RMException ex) { http://git-wip-us.apache.org/repos/asf/cxf/blob/0aec1458/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java ---------------------------------------------------------------------- diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java index 1b3b9c3..49414f8 100644 --- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java +++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java @@ -23,9 +23,14 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.logging.Logger; @@ -77,10 +82,15 @@ import org.apache.cxf.testutil.recorders.OutMessageRecorder; import org.apache.cxf.transport.http.HTTPConduit; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.ws.addressing.VersionTransformer.Names200408; +import org.apache.cxf.ws.rm.DestinationSequence; import org.apache.cxf.ws.rm.RM10Constants; import org.apache.cxf.ws.rm.RMContextUtils; import org.apache.cxf.ws.rm.RMManager; import org.apache.cxf.ws.rm.RMProperties; +import org.apache.cxf.ws.rm.SourceSequence; +import org.apache.cxf.ws.rm.persistence.RMMessage; +import org.apache.cxf.ws.rm.persistence.RMStore; +import org.apache.cxf.ws.rm.v200702.Identifier; import org.junit.After; import org.junit.BeforeClass; @@ -1392,6 +1402,11 @@ public class SequenceTest extends AbstractBusClientServerTestBase { public void testTerminateOnShutdown() throws Exception { init("org/apache/cxf/systest/ws/rm/terminate-on-shutdown.xml", true); + RMManager manager = greeterBus.getExtension(RMManager.class); + // this test also verify the DB is correctly being updated during the shutdown + RMMemoryStore store = new RMMemoryStore(); + manager.setStore(store); + greeter.greetMeOneWay("neutrophil"); greeter.greetMeOneWay("basophil"); greeter.greetMeOneWay("eosinophil"); @@ -1422,6 +1437,10 @@ public class SequenceTest extends AbstractBusClientServerTestBase { mf.verifyActions(expectedActions, false); mf.verifyAcknowledgements(new boolean[] {false, true}, false); + // additional check to verify the operations performed on DB + assertEquals("sequences not released from DB", 0, store.ssmap.size()); + assertEquals("messages not released from DB", 0, store.ommap.size()); + assertEquals("sequence not closed in DB", 1, store.ssclosed.size()); } @Test @@ -1693,4 +1712,110 @@ public class SequenceTest extends AbstractBusClientServerTestBase { } return null; } + + private static class RMMemoryStore implements RMStore { + // during this particular test, the operations are expected to be invoked sequentially so use just HashMap + Map<Identifier, SourceSequence> ssmap = new HashMap<Identifier, SourceSequence>(); + Map<Identifier, DestinationSequence> dsmap = new HashMap<Identifier, DestinationSequence>(); + Map<Identifier, Collection<RMMessage>> ommap = new HashMap<Identifier, Collection<RMMessage>>(); + Map<Identifier, Collection<RMMessage>> immap = new HashMap<Identifier, Collection<RMMessage>>(); + Set<Identifier> ssclosed = new HashSet<Identifier>(); + + @Override + public void createSourceSequence(SourceSequence seq) { + ssmap.put(seq.getIdentifier(), seq); + } + + @Override + public void createDestinationSequence(DestinationSequence seq) { + dsmap.put(seq.getIdentifier(), seq); + } + + @Override + public SourceSequence getSourceSequence(Identifier seq) { + return ssmap.get(seq); + } + + @Override + public DestinationSequence getDestinationSequence(Identifier seq) { + return dsmap.get(seq); + } + + @Override + public void removeSourceSequence(Identifier seq) { + ssmap.remove(seq); + } + + @Override + public void removeDestinationSequence(Identifier seq) { + dsmap.remove(seq); + } + + @Override + public Collection<SourceSequence> getSourceSequences(String endpointIdentifier) { + return ssmap.values(); + } + + @Override + public Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier) { + return dsmap.values(); + } + + @Override + public Collection<RMMessage> getMessages(Identifier sid, boolean outbound) { + return outbound ? ommap.get(sid) : immap.get(sid); + } + + @Override + public void persistOutgoing(SourceSequence seq, RMMessage msg) { + Collection<RMMessage> cm = getMessages(seq.getIdentifier(), ommap); + if (msg != null) { + // update the sequence status and add the message + cm.add(msg); + } else { + // update only the sequence status + if (seq.isLastMessage()) { + ssclosed.add(seq.getIdentifier()); + } + } + } + + @Override + public void persistIncoming(DestinationSequence seq, RMMessage msg) { + Collection<RMMessage> cm = getMessages(seq.getIdentifier(), immap); + if (msg != null) { + // update the sequence status and add the message + cm.add(msg); + } else { + // update only the sequence status + } + } + + @Override + public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean outbound) { + removeMessages(sid, messageNrs, outbound ? ommap : immap); + } + + private Collection<RMMessage> getMessages(Identifier seq, Map<Identifier, Collection<RMMessage>> map) { + Collection<RMMessage> cm = map.get(seq); + if (cm == null) { + cm = new LinkedList<RMMessage>(); + map.put(seq, cm); + } + return cm; + } + + private void removeMessages(Identifier sid, Collection<Long> messageNrs, + Map<Identifier, Collection<RMMessage>> map) { + for (Iterator<RMMessage> it = map.get(sid).iterator(); it.hasNext();) { + RMMessage m = it.next(); + if (messageNrs.contains(m.getMessageNumber())) { + it.remove(); + } + } + if (map.get(sid).size() == 0) { + map.remove(sid); + } + } + } }
