[
https://issues.apache.org/jira/browse/CXF-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16660431#comment-16660431
]
ASF GitHub Bot commented on CXF-7881:
-------------------------------------
coheigea closed pull request #463: CXF-7881: Ensure proper allowCurrentThread
behavior
URL: https://github.com/apache/cxf/pull/463
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
index e62f59477ca..2bcc8bde272 100644
---
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
+++
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
@@ -1242,10 +1242,10 @@ public void run() {
ex.execute(runnable);
}
} catch (RejectedExecutionException rex) {
- if (allowCurrentThread
- && policy != null
+ if (!allowCurrentThread
+ || (policy != null
&& policy.isSetAsyncExecuteTimeoutRejection()
- && policy.isAsyncExecuteTimeoutRejection()) {
+ && policy.isAsyncExecuteTimeoutRejection())) {
throw rex;
}
if (!hasLoggedAsyncWarning) {
diff --git
a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
index 343fb954c21..848e90a8e2c 100644
---
a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
+++
b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
@@ -26,16 +26,24 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.cxf.Bus;
import org.apache.cxf.bus.extension.ExtensionManagerBus;
import org.apache.cxf.common.util.Base64Utility;
import org.apache.cxf.configuration.security.AuthorizationPolicy;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.EndpointImpl;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit.WrappedOutputStream;
import org.apache.cxf.transport.http.auth.HttpAuthSupplier;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
@@ -58,7 +66,7 @@ public void tearDown() {
/**
* Generates a new message.
*/
- private Message getNewMessage() {
+ private Message getNewMessage() throws Exception {
Message message = new MessageImpl();
Map<String, List<String>> headers = new TreeMap<String,
List<String>>(String.CASE_INSENSITIVE_ORDER);
List<String> contentTypes = new ArrayList<>();
@@ -228,4 +236,49 @@ public void testAuthPolicyPrecedence() throws Exception {
}
+ @Test
+ public void testHandleResponseOnWorkqueueAllowCurrentThread() throws
Exception {
+ Message m = getNewMessage();
+ Exchange exchange = new ExchangeImpl();
+ Bus bus = new ExtensionManagerBus();
+ exchange.put(Bus.class, bus);
+
+ EndpointInfo endpointInfo = new EndpointInfo();
+ Endpoint endpoint = new EndpointImpl(null, null, endpointInfo);
+ exchange.put(Endpoint.class, endpoint);
+
+ m.setExchange(exchange);
+
+ HTTPClientPolicy policy = new HTTPClientPolicy();
+ policy.setAsyncExecuteTimeoutRejection(true);
+ m.put(HTTPClientPolicy.class, policy);
+ exchange.put(Executor.class, new Executor() {
+
+ @Override
+ public void execute(Runnable command) {
+ // simulates a maxxed-out executor
+ // forces us to use current thread
+ throw new RejectedExecutionException("expected");
+ } });
+
+ HTTPConduit conduit = new MockHTTPConduit(bus, endpointInfo, policy);
+ OutputStream os = conduit.createOutputStream(m, false, false, 0);
+ assertTrue(os instanceof WrappedOutputStream);
+ WrappedOutputStream wos = (WrappedOutputStream) os;
+
+ try {
+ wos.handleResponseOnWorkqueue(true, false);
+ assertEquals(Thread.currentThread(), m.get(Thread.class));
+
+ try {
+ wos.handleResponseOnWorkqueue(false, false);
+ fail("Expected RejectedExecutionException not thrown");
+ } catch (RejectedExecutionException ex) {
+ assertEquals("expected", ex.getMessage());
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw ex;
+ }
+ }
}
diff --git
a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
new file mode 100644
index 00000000000..94b5d120f83
--- /dev/null
+++
b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+
+public class MockHTTPConduit extends HTTPConduit {
+
+ public MockHTTPConduit(Bus b, EndpointInfo ei, HTTPClientPolicy policy)
throws IOException {
+ super(b, ei);
+ setClient(policy);
+ }
+
+ @Override
+ protected void setupConnection(Message message, Address address,
HTTPClientPolicy csPolicy)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected OutputStream createOutputStream(Message message, boolean
needToCacheRequest, boolean isChunking,
+ int chunkThreshold)
+ throws IOException {
+ return new MockWrappedOutputStream(message, isChunking, isChunking,
chunkThreshold, "mockConduit", null);
+ }
+
+ class MockWrappedOutputStream extends WrappedOutputStream {
+
+ protected MockWrappedOutputStream(Message outMessage, boolean
possibleRetransmit, boolean isChunking,
+ int chunkThreshold, String
conduitName, URI url) {
+ super(outMessage, possibleRetransmit, isChunking, chunkThreshold,
conduitName, url);
+ }
+
+ @Override
+ protected void setupWrappedStream() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws
IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void setProtocolHeaders() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void setFixedLengthStreamingMode(int i) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected int getResponseCode() throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ protected String getResponseMessage() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void updateResponseHeaders(Message inMessage) throws
IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void handleResponseAsync() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void handleResponseInternal() throws IOException {
+ outMessage.put(Thread.class, Thread.currentThread());
+ }
+
+ @Override
+ protected void closeInputStream() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected boolean usingProxy() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ protected InputStream getInputStream() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected InputStream getPartialResponse() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void setupNewConnection(String newURL) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void retransmitStream() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void updateCookiesBeforeRetransmit() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void thresholdReached() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> HttpConduit.handleResponseOnWorkqueue will always handle response on current
> thread when allowCurrentThread is false and the work queue rejects the
> execution
> -------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: CXF-7881
> URL: https://issues.apache.org/jira/browse/CXF-7881
> Project: CXF
> Issue Type: Bug
> Components: Transports
> Affects Versions: 3.2.6
> Reporter: Jan Hallonsten
> Assignee: Andy McCright
> Priority: Major
>
> Creating this Jira according to the discussion
> [here|http://cxf.547215.n5.nabble.com/Response-always-handled-on-current-thread-when-WorkQueue-rejects-execution-tt5793019.html]
> In the method
> [org.apache.cxf.transport.http.HTTPConduit.handleResponseOnWorkqueue(boolean
> allowCurrentThread, boolean
> forceWQ)|https://github.com/apache/cxf/blob/540bb76f6f3d3d23944c566905f9f395c6f86b79/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java#L1190]
> If the work queue is full so that RejectedExecutionException is thrown and
> allowCurrentThread is false like when called from
> [AsyncHttpConduit|https://github.com/apache/cxf/blob/6db38f9984b9c0bf6309a3d7e26d5a9ab8055d1f/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java#L600]
> the expression in the if statement below will always return false and the
> response will be handled on the current thread via the call to
> handleResponseInternal. When used from AsyncHttpConduit this will be the IO
> core thread which is not a good idea.
> {code:java}
> } catch (RejectedExecutionException rex) {
> if (allowCurrentThread
> && policy != null
> && policy.isSetAsyncExecuteTimeoutRejection()
> && policy.isAsyncExecuteTimeoutRejection()) {
> throw rex;
> }
> if (!hasLoggedAsyncWarning) {
> LOG.warning("EXECUTOR_FULL_WARNING");
> hasLoggedAsyncWarning = true;
> }
> LOG.fine("EXECUTOR_FULL");
> handleResponseInternal();
> }
> {code}
> I think that the code above should be changed to
> {code:java}
> } catch (RejectedExecutionException rex) {
> if (!allowCurrentThread
> || (policy != null
> && policy.isSetAsyncExecuteTimeoutRejection()
> && policy.isAsyncExecuteTimeoutRejection())) {
> throw rex;
> }
> if (!hasLoggedAsyncWarning) {
> LOG.warning("EXECUTOR_FULL_WARNING");
> hasLoggedAsyncWarning = true;
> }
> LOG.fine("EXECUTOR_FULL");
> handleResponseInternal();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)