Author: ay
Date: Thu May 10 08:41:31 2012
New Revision: 1336538
URL: http://svn.apache.org/viewvc?rev=1336538&view=rev
Log:
[CXF-4303] WS-RM fails when GZIP feature is enabled
Added:
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionGZIPTest.java
(with props)
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/gzip-enabled.xml
(with props)
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionQueueTest.java
cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/recorders/OutMessageRecorder.java
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java?rev=1336538&r1=1336537&r2=1336538&view=diff
==============================================================================
---
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
(original)
+++
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
Thu May 10 08:41:31 2012
@@ -21,13 +21,15 @@ package org.apache.cxf.ws.rm;
import java.io.OutputStream;
-import org.apache.cxf.interceptor.AttachmentOutInterceptor;
+//import org.apache.cxf.interceptor.AttachmentOutInterceptor;
import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.interceptor.StaxOutInterceptor;
+import org.apache.cxf.interceptor.MessageSenderInterceptor;
+//import org.apache.cxf.interceptor.StaxOutInterceptor;
import org.apache.cxf.io.WriteOnCloseOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
+import org.apache.cxf.transport.common.gzip.GZIPOutInterceptor;
/**
*
@@ -37,9 +39,12 @@ public class RetransmissionInterceptor e
RMManager manager;
public RetransmissionInterceptor() {
- super(Phase.PRE_STREAM);
- addBefore(StaxOutInterceptor.class.getName());
- addBefore(AttachmentOutInterceptor.class.getName());
+ super(Phase.PREPARE_SEND);
+ addAfter(MessageSenderInterceptor.class.getName());
+ addBefore(GZIPOutInterceptor.class.getName());
+// super(Phase.PRE_STREAM);
+// addBefore(StaxOutInterceptor.class.getName());
+// addBefore(AttachmentOutInterceptor.class.getName());
}
public RMManager getManager() {
Modified:
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java?rev=1336538&r1=1336537&r2=1336538&view=diff
==============================================================================
---
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
(original)
+++
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
Thu May 10 08:41:31 2012
@@ -43,14 +43,31 @@ import org.apache.cxf.ws.rm.RMContextUti
*
*/
public class MessageLossSimulator extends AbstractPhaseInterceptor<Message> {
-
private static final Logger LOG =
LogUtils.getLogger(MessageLossSimulator.class);
- private int appMessageCount;
-
+ private int appMessageCount;
+ private boolean throwsException;
+ private int mode;
+
public MessageLossSimulator() {
super(Phase.PREPARE_SEND);
addBefore(MessageSenderInterceptor.class.getName());
}
+
+ public boolean isThrowsException() {
+ return throwsException;
+ }
+
+ public void setThrowsException(boolean throwsException) {
+ this.throwsException = throwsException;
+ }
+
+ public int getMode() {
+ return mode;
+ }
+
+ public void setMode(int mode) {
+ this.mode = mode;
+ }
public void handleMessage(Message message) throws Fault {
AddressingProperties maps = RMContextUtils.retrieveMAPs(message,
false, true);
@@ -64,10 +81,18 @@ public class MessageLossSimulator extend
if (MessageUtils.isPartialResponse(message)) {
return;
}
- synchronized (this) {
- appMessageCount++;
- if (0 != (appMessageCount % 2)) {
- return;
+ if (mode == 1) {
+ // never lose
+ return;
+ } else if (mode == -1) {
+ // always lose
+ } else {
+ // alternatively lose
+ synchronized (this) {
+ appMessageCount++;
+ if (0 != (appMessageCount % 2)) {
+ return;
+ }
}
}
@@ -89,6 +114,9 @@ public class MessageLossSimulator extend
public void handleMessage(Message message) throws Fault {
try {
message.getContent(OutputStream.class).close();
+ if (throwsException) {
+ throw new IOException("simulated transmission
exception");
+ }
} catch (IOException e) {
throw new Fault(e);
}
Added:
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionGZIPTest.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionGZIPTest.java?rev=1336538&view=auto
==============================================================================
---
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionGZIPTest.java
(added)
+++
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionGZIPTest.java
Thu May 10 08:41:31 2012
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Logger;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.interceptor.LoggingInInterceptor;
+import org.apache.cxf.interceptor.LoggingOutInterceptor;
+import org.apache.cxf.systest.ws.util.ConnectionHelper;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.ws.rm.RMManager;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the gzip feature does not interfere with the ws-rm retransmission.
+ * Note that the current retransmission logic isn't optimal (in some sense,
wrong)
+ * and stores the wire-message and retransmits this wire-message directly.
This
+ * approach is not practical when ws-security is enabled and each message
needs to be
+ * timestamped and signed. Therefore, the current retransmission logic needs
to be
+ * changed one day.
+ *
+ * Independently of this fix, this test verifies the gzip feature does not
interfere
+ * with retransmission.
+ */
+public class RetransmissionGZIPTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = allocatePort(Server.class);
+ public static final String DECOUPLE_PORT = allocatePort("decoupled.port");
+
+ private static final Logger LOG =
LogUtils.getLogger(RetransmissionQueueTest.class);
+ private Bus bus;
+
+ public static class Server extends AbstractBusTestServerBase {
+
+ protected void run() {
+ SpringBusFactory bf = new SpringBusFactory();
+ Bus bus =
bf.createBus("/org/apache/cxf/systest/ws/rm/gzip-enabled.xml");
+ BusFactory.setDefaultBus(bus);
+ LoggingInInterceptor in = new LoggingInInterceptor();
+ bus.getInInterceptors().add(in);
+ bus.getInFaultInterceptors().add(in);
+ LoggingOutInterceptor out = new LoggingOutInterceptor();
+ bus.getOutInterceptors().add(out);
+ bus.getOutFaultInterceptors().add(out);
+
+ GreeterImpl implementor = new GreeterImpl();
+ String address = "http://localhost:" + PORT +
"/SoapContext/GreeterPort";
+
+ Endpoint ep = Endpoint.create(implementor);
+ ep.publish(address);
+
+ LOG.info("Published greeter endpoint.");
+ }
+
+ public static void main(String[] args) {
+ try {
+ Server s = new Server();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+ assertTrue("server did not launch correctly",
+ launchServer(Server.class, true));
+ }
+
+ @Test
+ public void testDecoupleFaultHandling() throws Exception {
+ SpringBusFactory bf = new SpringBusFactory();
+ bus = bf.createBus("/org/apache/cxf/systest/ws/rm/gzip-enabled.xml");
+ BusFactory.setDefaultBus(bus);
+ LoggingInInterceptor in = new LoggingInInterceptor();
+ bus.getInInterceptors().add(in);
+ bus.getInFaultInterceptors().add(in);
+ LoggingOutInterceptor out = new LoggingOutInterceptor();
+ bus.getOutInterceptors().add(out);
+
bus.getExtension(RMManager.class).getRMAssertion().getBaseRetransmissionInterval()
+ .setMilliseconds(new Long(4000));
+
+ // an interceptor to simulate a transmission error
+ MessageLossSimulator loser = new MessageLossSimulator();
+ bus.getOutInterceptors().add(loser);
+
+ bus.getOutFaultInterceptors().add(out);
+
+ GreeterService gs = new GreeterService();
+ final Greeter greeter = gs.getGreeterPort();
+ updateAddressPort(greeter, PORT);
+ LOG.fine("Created greeter client.");
+
+ ConnectionHelper.setKeepAliveConnection(greeter, true);
+ loser.setMode(-1);
+ loser.setThrowsException(true);
+
+ try {
+ greeter.greetMeOneWay("oneway");
+ } catch (Exception e) {
+ fail("fault thrown after queued for retransmission");
+ }
+ Thread.sleep(2000);
+
+ RMManager manager = bus.getExtension(RMManager.class);
+ boolean empty = manager.getRetransmissionQueue().isEmpty();
+ assertFalse("RetransmissionQueue is empty", empty);
+
+ loser.setMode(1);
+
+ Thread.sleep(6000);
+
+ empty = manager.getRetransmissionQueue().isEmpty();
+ assertTrue("RetransmissionQueue not cleared", empty);
+ }
+
+}
Propchange:
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionGZIPTest.java
------------------------------------------------------------------------------
svn:executable = *
Modified:
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionQueueTest.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionQueueTest.java?rev=1336538&r1=1336537&r2=1336538&view=diff
==============================================================================
---
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionQueueTest.java
(original)
+++
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RetransmissionQueueTest.java
Thu May 10 08:41:31 2012
@@ -18,7 +18,6 @@
*/
package org.apache.cxf.systest.ws.rm;
-import java.io.IOException;
import java.util.logging.Logger;
import javax.xml.ws.Endpoint;
@@ -29,18 +28,11 @@ import org.apache.cxf.bus.spring.SpringB
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.greeter_control.Greeter;
import org.apache.cxf.greeter_control.GreeterService;
-import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.LoggingInInterceptor;
import org.apache.cxf.interceptor.LoggingOutInterceptor;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.AbstractPhaseInterceptor;
-import org.apache.cxf.phase.Phase;
import org.apache.cxf.systest.ws.util.ConnectionHelper;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.ws.addressing.AddressingProperties;
-import org.apache.cxf.ws.rm.RM10Constants;
-import org.apache.cxf.ws.rm.RMContextUtils;
import org.apache.cxf.ws.rm.RMManager;
import org.junit.BeforeClass;
@@ -95,7 +87,7 @@ public class RetransmissionQueueTest ext
@BeforeClass
public static void startServers() throws Exception {
assertTrue("server did not launch correctly",
- launchServer(Server.class));
+ launchServer(Server.class, true));
}
@Test
@@ -108,9 +100,12 @@ public class RetransmissionQueueTest ext
bus.getInFaultInterceptors().add(in);
LoggingOutInterceptor out = new LoggingOutInterceptor();
bus.getOutInterceptors().add(out);
+
bus.getExtension(RMManager.class).getRMAssertion().getBaseRetransmissionInterval()
+ .setMilliseconds(new Long(4000));
+
// an interceptor to simulate a transmission error
- TransmissionErrorSimulator tes = new TransmissionErrorSimulator();
- bus.getOutInterceptors().add(tes);
+ MessageLossSimulator loser = new MessageLossSimulator();
+ bus.getOutInterceptors().add(loser);
bus.getOutFaultInterceptors().add(out);
@@ -120,73 +115,26 @@ public class RetransmissionQueueTest ext
LOG.fine("Created greeter client.");
ConnectionHelper.setKeepAliveConnection(greeter, true);
+ loser.setMode(-1);
+ loser.setThrowsException(true);
try {
greeter.greetMeOneWay("oneway");
} catch (Exception e) {
fail("fault thrown after queued for retransmission");
}
-
- tes.setWorking(true);
-
- long wait = 3000;
- while (wait > 0) {
- long start = System.currentTimeMillis();
- try {
- Thread.sleep(wait);
- } catch (InterruptedException ex) {
- // ignore
- }
- wait -= System.currentTimeMillis() - start;
- }
+ Thread.sleep(2000);
RMManager manager = bus.getExtension(RMManager.class);
boolean empty = manager.getRetransmissionQueue().isEmpty();
+ assertFalse("RetransmissionQueue is empty", empty);
- assertTrue("RetransmissionQueue not cleared", empty);
- }
-
-
- static class TransmissionErrorSimulator extends
AbstractPhaseInterceptor<Message> {
- private boolean working;
-
- /**
- * @param phase
- */
- public TransmissionErrorSimulator() {
- super(Phase.WRITE);
- }
-
- /* (non-Javadoc)
- * @see
org.apache.cxf.interceptor.Interceptor#handleMessage(org.apache.cxf.message.Message)
- */
- public void handleMessage(Message message) throws Fault {
- // let the create sequence message to succeed so that a valid
sequence is created
- AddressingProperties maps =
- RMContextUtils.retrieveMAPs(message, false, true);
- if (maps != null
- && maps.getAction() != null
- &&
RM10Constants.CREATE_SEQUENCE_ACTION.equals(maps.getAction().getValue())) {
- // spare the message
- } else if (!working) {
- // triggers a simulated error
- throw new Fault(new IOException("simulated transmission
error"));
- }
- }
+ loser.setMode(1);
- /**
- * @return the working
- */
- public boolean isWorking() {
- return working;
- }
-
- /**
- * @param working the working to set
- */
- public void setWorking(boolean working) {
- this.working = working;
- }
+ Thread.sleep(6000);
+
+ empty = manager.getRetransmissionQueue().isEmpty();
+ assertTrue("RetransmissionQueue not cleared", empty);
}
}
Added:
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/gzip-enabled.xml
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/gzip-enabled.xml?rev=1336538&view=auto
==============================================================================
Binary file - no diff available.
Propchange:
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/gzip-enabled.xml
------------------------------------------------------------------------------
svn:mime-type = application/xml
Modified:
cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/recorders/OutMessageRecorder.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/recorders/OutMessageRecorder.java?rev=1336538&r1=1336537&r2=1336538&view=diff
==============================================================================
---
cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/recorders/OutMessageRecorder.java
(original)
+++
cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/recorders/OutMessageRecorder.java
Thu May 10 08:41:31 2012
@@ -27,7 +27,7 @@ import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.interceptor.StaxOutInterceptor;
+import org.apache.cxf.interceptor.MessageSenderInterceptor;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.io.CachedOutputStreamCallback;
import org.apache.cxf.io.WriteOnCloseOutputStream;
@@ -45,10 +45,10 @@ public class OutMessageRecorder extends
private List<byte[]> outbound;
public OutMessageRecorder() {
- super(Phase.PRE_STREAM);
+ super(Phase.PREPARE_SEND);
outbound = new CopyOnWriteArrayList<byte[]>();
+ addAfter(MessageSenderInterceptor.class.getName());
addAfter("org.apache.cxf.ws.rm.RetransmissionInterceptor");
- addBefore(StaxOutInterceptor.class.getName());
}
public void handleMessage(Message message) throws Fault {