Author: dkulp
Date: Fri Jul 27 19:18:15 2012
New Revision: 1366507
URL: http://svn.apache.org/viewvc?rev=1366507&view=rev
Log:
More moving stuff around to help with HTTPConduit subclassing
Modified:
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookies.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Headers.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/URLConnectionHTTPConduit.java
Modified:
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookies.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookies.java?rev=1366507&r1=1366506&r2=1366507&view=diff
==============================================================================
---
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookies.java
(original)
+++
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookies.java
Fri Jul 27 19:18:15 2012
@@ -18,7 +18,6 @@
*/
package org.apache.cxf.transport.http;
-import java.net.HttpURLConnection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,12 +36,11 @@ public class Cookies {
return sessionCookies;
}
- public void readFromConnection(HttpURLConnection connection) {
+ public void readFromHeaders(Headers headers) {
if (maintainSession) {
- for (Map.Entry<String, List<String>> h :
connection.getHeaderFields().entrySet()) {
- if ("Set-Cookie".equalsIgnoreCase(h.getKey())) {
- handleSetCookie(h.getValue());
- }
+ List<String> c = headers.headerMap().get("Set-Cookie");
+ if (c != null) {
+ handleSetCookie(c);
}
}
}
Modified:
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=1366507&r1=1366506&r2=1366507&view=diff
==============================================================================
---
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
(original)
+++
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
Fri Jul 27 19:18:15 2012
@@ -33,6 +33,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -60,6 +62,7 @@ import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageContentsList;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.PhaseInterceptorChain;
import org.apache.cxf.policy.PolicyDataEngine;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
@@ -76,6 +79,8 @@ import org.apache.cxf.transport.https.Ce
import org.apache.cxf.transport.https.CertConstraintsJaxBUtils;
import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
/*
@@ -138,7 +143,7 @@ public abstract class HTTPConduit
extends AbstractConduit
implements Configurable, Assertor, PropertyChangeListener {
-
+
/**
* This constant is the Message(Map) key for the HttpURLConnection that
* is used to get the response.
@@ -150,6 +155,7 @@ public abstract class HTTPConduit
*/
protected static final Logger LOG =
LogUtils.getL7dLogger(HTTPConduit.class);
+ private static boolean hasLoggedAsyncWarning;
/**
* This constant holds the suffix ".http-conduit" that is appended to the
@@ -1069,21 +1075,82 @@ public abstract class HTTPConduit
}
}
+ // methods used for the outgoing side
protected abstract void setupWrappedStream() throws IOException;
- protected abstract void handleResponseAsync() throws IOException;
- protected abstract void closeInputStream() throws IOException;
- protected abstract InputStream getInputStream(int rc) throws
IOException;
- protected abstract void updateResponseHeaders(Message inMessage);
- protected abstract boolean usingProxy();
protected abstract HttpsURLConnectionInfo getHttpsURLConnectionInfo()
throws IOException;
- protected abstract int getResponseCode() throws IOException;
- protected abstract String getResponseMessage() throws IOException;
- protected abstract void updateCookies();
- protected abstract InputStream getPartialResponse(int responseCode)
throws IOException;
protected abstract void setProtocolHeaders() throws IOException;
protected abstract void setFixedLengthStreamingMode(int i);
- protected abstract void retransmitStream() throws IOException;
+
+
+ // methods used for the incoming side
+ protected abstract int getResponseCode() throws IOException;
+ protected abstract String getResponseMessage() throws IOException;
+ protected abstract void updateResponseHeaders(Message inMessage);
+ protected abstract void handleResponseAsync() throws IOException;
+ protected abstract void closeInputStream() throws IOException;
+ protected abstract boolean usingProxy();
+ protected abstract InputStream getInputStream() throws IOException;
+ protected abstract InputStream getPartialResponse() throws IOException;
+
+ //methods to support retransmission for auth or redirects
protected abstract void setupNewConnection(String newURL) throws
IOException;
+ protected abstract void retransmitStream() throws IOException;
+ protected abstract void updateCookiesBeforeRetransmit() throws
IOException;
+
+
+ protected void handleResponseOnWorkqueue(boolean allowCurrentThread)
throws IOException {
+ Runnable runnable = new Runnable() {
+ public void run() {
+ try {
+ handleResponseInternal();
+ } catch (Throwable e) {
+
((PhaseInterceptorChain)outMessage.getInterceptorChain()).abort();
+
((PhaseInterceptorChain)outMessage.getInterceptorChain()).unwind(outMessage);
+ outMessage.setContent(Exception.class, e);
+
outMessage.getInterceptorChain().getFaultObserver().onMessage(outMessage);
+ }
+ }
+ };
+ HTTPClientPolicy policy = getClient(outMessage);
+ try {
+ Executor ex = outMessage.getExchange().get(Executor.class);
+ if (ex == null) {
+ WorkQueueManager mgr =
outMessage.getExchange().get(Bus.class)
+ .getExtension(WorkQueueManager.class);
+ AutomaticWorkQueue qu =
mgr.getNamedWorkQueue("http-conduit");
+ if (qu == null) {
+ qu = mgr.getAutomaticWorkQueue();
+ }
+ long timeout = 1000;
+ if (policy != null && policy.isSetAsyncExecuteTimeout()) {
+ timeout = policy.getAsyncExecuteTimeout();
+ }
+ if (timeout > 0) {
+ qu.execute(runnable, timeout);
+ } else {
+ qu.execute(runnable);
+ }
+ } else {
+ outMessage.getExchange().put(Executor.class.getName()
+ + ".USING_SPECIFIED",
Boolean.TRUE);
+ ex.execute(runnable);
+ }
+ } catch (RejectedExecutionException rex) {
+ if (allowCurrentThread
+ && policy != null
+ && policy.isSetAsyncExecuteTimeoutRejection()
+ && policy.isAsyncExecuteTimeoutRejection()) {
+ throw rex;
+ }
+ if (!hasLoggedAsyncWarning) {
+ LOG.warning("EXECUTOR_FULL_WARNING");
+ hasLoggedAsyncWarning = true;
+ }
+ LOG.fine("EXECUTOR_FULL");
+ handleResponseInternal();
+ }
+ }
+
protected void retransmit(String newURL) throws IOException {
setupNewConnection(newURL);
@@ -1262,7 +1329,7 @@ public abstract class HTTPConduit
int maxRetransmits = getMaxRetransmits();
- updateCookies();
+ updateCookiesBeforeRetransmit();
int nretransmits = 0;
while ((maxRetransmits < 0 || nretransmits < maxRetransmits)
&& processRetransmit()) {
nretransmits++;
@@ -1426,7 +1493,7 @@ public abstract class HTTPConduit
// oneway or decoupled twoway calls may expect HTTP 202 with no
content
if (isOneway(exchange)
|| HttpURLConnection.HTTP_ACCEPTED == responseCode) {
- in = getPartialResponse(responseCode);
+ in = getPartialResponse();
if ((in == null) || !doProcessResponse(outMessage)) {
// oneway operation or decoupled MEP without
// partial response
@@ -1472,7 +1539,7 @@ public abstract class HTTPConduit
}
inMessage.put(Message.ENCODING, normalizedEncoding);
if (in == null) {
- in = getInputStream(responseCode);
+ in = getInputStream();
}
if (in == null) {
// Create an empty stream to avoid NullPointerExceptions
Modified:
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Headers.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Headers.java?rev=1366507&r1=1366506&r2=1366507&view=diff
==============================================================================
---
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Headers.java
(original)
+++
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Headers.java
Fri Jul 27 19:18:15 2012
@@ -53,8 +53,8 @@ public class Headers {
*/
public static final String KEY_HTTP_CONNECTION = "http.connection";
public static final String PROTOCOL_HEADERS_CONTENT_TYPE =
Message.CONTENT_TYPE.toLowerCase();
- private static final String HTTP_HEADERS_SETCOOKIE = "Set-Cookie";
- private static final String ADD_HEADERS_PROPERTY =
"org.apache.cxf.http.add-headers";
+ public static final String ADD_HEADERS_PROPERTY =
"org.apache.cxf.http.add-headers";
+ public static final String HTTP_HEADERS_SETCOOKIE = "Set-Cookie";
private static final Logger LOG = LogUtils.getL7dLogger(Headers.class);
private final Message message;
@@ -64,6 +64,15 @@ public class Headers {
this.message = message;
this.headers = getSetProtocolHeaders(message);
}
+ public Headers() {
+ this.headers = new TreeMap<String,
List<String>>(String.CASE_INSENSITIVE_ORDER);
+ this.message = null;
+ }
+
+ public Map<String, List<String>> headerMap() {
+ return headers;
+ }
+
/**
* Write cookie header from given session cookies
@@ -266,7 +275,7 @@ public class Headers {
logProtocolHeaders(Level.FINE);
}
- private String determineContentType() {
+ public String determineContentType() {
String ct = (String)message.get(Message.CONTENT_TYPE);
String enc = (String)message.get(Message.ENCODING);
Modified:
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/URLConnectionHTTPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/URLConnectionHTTPConduit.java?rev=1366507&r1=1366506&r2=1366507&view=diff
==============================================================================
---
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/URLConnectionHTTPConduit.java
(original)
+++
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/URLConnectionHTTPConduit.java
Fri Jul 27 19:18:15 2012
@@ -26,28 +26,22 @@ import java.net.HttpURLConnection;
import java.net.Proxy;
import java.net.URL;
import java.net.URLConnection;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import org.apache.cxf.Bus;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.io.CacheAndWriteOutputStream;
import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.PhaseInterceptorChain;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.https.HttpsURLConnectionFactory;
import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
-import org.apache.cxf.workqueue.AutomaticWorkQueue;
-import org.apache.cxf.workqueue.WorkQueueManager;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
/**
*
*/
public class URLConnectionHTTPConduit extends HTTPConduit {
- private static boolean hasLoggedAsyncWarning;
/**
* This field holds the connection factory, which primarily is used to
@@ -168,6 +162,7 @@ public class URLConnectionHTTPConduit ex
wrappedStream = connection.getOutputStream();
}
}
+
@Override
public void thresholdReached() {
if (chunking) {
@@ -195,17 +190,24 @@ public class URLConnectionHTTPConduit ex
connection.connect();
return new HttpsURLConnectionInfo(connection);
}
- protected void updateCookies() {
- cookies.readFromConnection(connection);
- }
protected void updateResponseHeaders(Message inMessage) {
- new Headers(inMessage).readFromConnection(connection);
+ Headers h = new Headers(inMessage);
+ h.readFromConnection(connection);
inMessage.put(Message.CONTENT_TYPE, connection.getContentType());
- cookies.readFromConnection(connection);
+ cookies.readFromHeaders(h);
+ }
+ protected void handleResponseAsync() throws IOException {
+ handleResponseOnWorkqueue(true);
+ }
+ protected void updateCookiesBeforeRetransmit() {
+ Headers h = new Headers();
+ h.readFromConnection(connection);
+ cookies.readFromHeaders(h);
}
- protected InputStream getInputStream(int responseCode) throws
IOException {
+
+ protected InputStream getInputStream() throws IOException {
InputStream in = null;
- if (responseCode >= HttpURLConnection.HTTP_BAD_REQUEST) {
+ if (getResponseCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {
in = connection.getErrorStream();
if (in == null) {
try {
@@ -233,64 +235,14 @@ public class URLConnectionHTTPConduit ex
ins.close();
}
}
- protected void handleResponseAsync() throws IOException {
- Runnable runnable = new Runnable() {
- public void run() {
- try {
- handleResponseInternal();
- } catch (Exception e) {
-
((PhaseInterceptorChain)outMessage.getInterceptorChain()).abort();
-
((PhaseInterceptorChain)outMessage.getInterceptorChain()).unwind(outMessage);
- outMessage.setContent(Exception.class, e);
-
outMessage.getInterceptorChain().getFaultObserver().onMessage(outMessage);
- }
- }
- };
- HTTPClientPolicy policy = getClient(outMessage);
- try {
- Executor ex = outMessage.getExchange().get(Executor.class);
- if (ex == null) {
- WorkQueueManager mgr =
outMessage.getExchange().get(Bus.class)
- .getExtension(WorkQueueManager.class);
- AutomaticWorkQueue qu =
mgr.getNamedWorkQueue("http-conduit");
- if (qu == null) {
- qu = mgr.getAutomaticWorkQueue();
- }
- long timeout = 5000;
- if (policy != null && policy.isSetAsyncExecuteTimeout()) {
- timeout = policy.getAsyncExecuteTimeout();
- }
- if (timeout > 0) {
- qu.execute(runnable, timeout);
- } else {
- qu.execute(runnable);
- }
- } else {
- outMessage.getExchange().put(Executor.class.getName()
- + ".USING_SPECIFIED",
Boolean.TRUE);
- ex.execute(runnable);
- }
- } catch (RejectedExecutionException rex) {
- if (policy != null &&
policy.isSetAsyncExecuteTimeoutRejection()
- && policy.isAsyncExecuteTimeoutRejection()) {
- throw rex;
- }
- if (!hasLoggedAsyncWarning) {
- LOG.warning("EXECUTOR_FULL_WARNING");
- hasLoggedAsyncWarning = true;
- }
- LOG.fine("EXECUTOR_FULL");
- handleResponseInternal();
- }
- }
protected int getResponseCode() throws IOException {
return connection.getResponseCode();
}
protected String getResponseMessage() throws IOException {
return connection.getResponseMessage();
}
- protected InputStream getPartialResponse(int responseCode) throws
IOException {
- return ChunkedUtil.getPartialResponse(connection, responseCode);
+ protected InputStream getPartialResponse() throws IOException {
+ return ChunkedUtil.getPartialResponse(connection,
connection.getResponseCode());
}
protected boolean usingProxy() {
return connection.usingProxy();