Repository: cxf Updated Branches: refs/heads/3.1.x-fixes 4537bb27c -> 2e2bfe121
[CXF-7109] ClientCallback may be invoked twice when Async HTTP Transport is used (cherry picked from commit 4eb81d3044c0f663d580cdbd3b611d5e3b1b4ac5) Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2e2bfe12 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2e2bfe12 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2e2bfe12 Branch: refs/heads/3.1.x-fixes Commit: 2e2bfe1211b521765458401eb49101a2857036ff Parents: 4537bb2 Author: Tadayoshi Sato <[email protected]> Authored: Tue Oct 25 18:45:42 2016 +0900 Committer: Freeman Fang <[email protected]> Committed: Wed Oct 26 11:35:06 2016 +0800 ---------------------------------------------------------------------- .../org/apache/cxf/endpoint/ClientImpl.java | 14 ++++++++++--- .../cxf/interceptor/ClientOutFaultObserver.java | 3 ++- .../cxf/message/AbstractWrappedMessage.java | 3 +++ .../org/apache/cxf/message/ExchangeImpl.java | 4 ++++ .../java/org/apache/cxf/message/StringMap.java | 7 +++++++ .../org/apache/cxf/message/StringMapImpl.java | 4 ++++ .../http/asyncclient/AsyncHTTPConduitTest.java | 21 ++++++++++++++++++++ .../apache/cxf/transport/http/HTTPConduit.java | 11 +++++----- 8 files changed, 58 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java index 428f794..848d3bc 100644 --- a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java +++ b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java @@ -791,8 +791,11 @@ public class ClientImpl if (resCtx != null) { responseContext.put(Thread.currentThread(), resCtx); } - callback.handleException(resCtx, error); - + // remove callback so that it won't be invoked twice + callback = message.getExchange().remove(ClientCallback.class); + if (callback != null) { + callback.handleException(resCtx, error); + } } } else { chain.doIntercept(message); @@ -801,8 +804,13 @@ public class ClientImpl } callback = message.getExchange().get(ClientCallback.class); + if (callback == null || isPartialResponse(message)) { + return; + } - if (callback != null && !isPartialResponse(message)) { + // remove callback so that it won't be invoked twice + callback = message.getExchange().remove(ClientCallback.class); + if (callback != null) { message.getExchange().setInMessage(message); Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>)message .getExchange() http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java b/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java index 0c879c5..f139da9 100644 --- a/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java +++ b/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java @@ -50,7 +50,8 @@ public class ClientOutFaultObserver extends AbstractFaultChainInitiatorObserver return; } Exception ex = m.getContent(Exception.class); - ClientCallback callback = m.getExchange().get(ClientCallback.class); + // remove callback so that it won't be invoked twice + ClientCallback callback = m.getExchange().remove(ClientCallback.class); if (callback != null) { Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>) m.getExchange().getOutMessage().get( http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java b/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java index 496b97b..094e432 100644 --- a/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java +++ b/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java @@ -157,6 +157,9 @@ public abstract class AbstractWrappedMessage implements Message { public <T> void put(Class<T> key, T value) { message.put(key, value); } + public <T> T remove(Class<T> key) { + return message.remove(key); + } public Object getContextualProperty(String key) { return message.getContextualProperty(key); http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java b/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java index d2abd7a..4798827 100644 --- a/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java +++ b/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java @@ -159,6 +159,10 @@ public class ExchangeImpl extends ConcurrentHashMap<String, Object> implements return super.put(key, value); } + public <T> T remove(Class<T> key) { + return key.cast(super.remove(key.getName())); + } + private void setMessageContextProperty(Message m, String key, Object value) { if (m == null) { return; http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/message/StringMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/message/StringMap.java b/core/src/main/java/org/apache/cxf/message/StringMap.java index 4d802b8..87115fe 100644 --- a/core/src/main/java/org/apache/cxf/message/StringMap.java +++ b/core/src/main/java/org/apache/cxf/message/StringMap.java @@ -38,4 +38,11 @@ public interface StringMap extends Map<String, Object> { * @param value the value */ <T> void put(Class<T> key, T value); + + /** + * Convenience method for removing typed objects from the map. + * equivalent to: (T)remove(key.getName()); + * @param key the key + */ + <T> T remove(Class<T> key); } http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/message/StringMapImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/message/StringMapImpl.java b/core/src/main/java/org/apache/cxf/message/StringMapImpl.java index c1428e4..f3acdc1 100644 --- a/core/src/main/java/org/apache/cxf/message/StringMapImpl.java +++ b/core/src/main/java/org/apache/cxf/message/StringMapImpl.java @@ -46,4 +46,8 @@ public class StringMapImpl public <T> void put(Class<T> key, T value) { put(key.getName(), value); } + + public <T> T remove(Class<T> key) { + return key.cast(remove(key.getName())); + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java index 3341e3e..d4a25f6 100644 --- a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java +++ b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.xml.ws.AsyncHandler; import javax.xml.ws.Endpoint; @@ -197,6 +198,26 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { } }).get(); } + + @Test + public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception { + // This test is especially targeted for RHEL 6.8 + updateAddressPort(g, PORT_INV); + int repeat = 100; + final AtomicInteger count = new AtomicInteger(0); + for (int i = 0; i < repeat; i++) { + try { + g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>() { + public void handleResponse(Response<GreetMeResponse> res) { + count.incrementAndGet(); + } + }).get(); + } catch (Exception e) { + } + } + Thread.sleep(1000); + assertEquals("Callback should be invoked only once per request", repeat, count.intValue()); + } @Test @Ignore("peformance test") http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java ---------------------------------------------------------------------- diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java index adebe09..e5aa543 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java @@ -1626,12 +1626,13 @@ public abstract class HTTPConduit if (isOneway(exchange) && responseCode > 300) { throw new HTTPException(responseCode, getResponseMessage(), url.toURL()); } - ClientCallback cc = exchange.get(ClientCallback.class); - if (null != cc) { - //REVISIT move the decoupled destination property name into api - Endpoint ep = exchange.getEndpoint(); - if (null != ep && null != ep.getEndpointInfo() && null == ep.getEndpointInfo(). + //REVISIT move the decoupled destination property name into api + Endpoint ep = exchange.getEndpoint(); + if (null != ep && null != ep.getEndpointInfo() && null == ep.getEndpointInfo(). getProperty("org.apache.cxf.ws.addressing.MAPAggregator.decoupledDestination")) { + // remove callback so that it won't be invoked twice + ClientCallback cc = exchange.remove(ClientCallback.class); + if (null != cc) { cc.handleResponse(null, null); } }
