Author: jawi
Date: Tue Oct 8 09:04:32 2013
New Revision: 1530190
URL: http://svn.apache.org/r1530190
Log:
Yet another attempt to make the itests more robust:
- it appears that somehow a resource starvation appears with respect to
the number of available sockets, make sure that we properly handle
exceptions during the use of HttpURLConnections.
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java
Tue Oct 8 09:04:32 2013
@@ -98,6 +98,25 @@ class ConnectionUtil {
return null;
}
+ /**
+ * @see
http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+ */
+ public static int handleIOException(URLConnection conn) {
+ int respCode = -1;
+ if (!(conn instanceof HttpURLConnection)) {
+ return respCode;
+ }
+
+ try {
+ respCode = ((HttpURLConnection) conn).getResponseCode();
+ flushStream(((HttpURLConnection) conn).getErrorStream());
+ }
+ catch (IOException ex) {
+ // deal with the exception
+ }
+ return respCode;
+ }
+
public static void copy(InputStream is, OutputStream os) throws
IOException {
copy(is, os, DEFAULT_BUFFER_SIZE);
}
@@ -129,14 +148,32 @@ class ConnectionUtil {
* @param connection
* the URL connection to get the response code for, can be
<code>null</code>.
* @return the response code for the given connection, or <code>-1</code>
if it could not be determined.
- * @throws IOException
- * if retrieving the response code failed.
*/
- private static int getResponseCode(URLConnection connection) throws
IOException {
- if (connection instanceof HttpURLConnection) {
- return ((HttpURLConnection) connection).getResponseCode();
+ private static int getResponseCode(URLConnection connection) {
+ try {
+ if (connection instanceof HttpURLConnection) {
+ return ((HttpURLConnection) connection).getResponseCode();
+ }
+ return -1;
+ }
+ catch (IOException exception) {
+ return handleIOException(connection);
+ }
+ }
+
+ static void flushStream(InputStream is) {
+ byte[] buf = new byte[4096];
+ try {
+ while (is.read(buf) > 0) {
+ // Ignore...
+ }
+ }
+ catch (IOException ex) {
+ // deal with the exception
+ }
+ finally {
+ closeSilently(is);
}
- return -1;
}
private ConnectionUtil() {
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
Tue Oct 8 09:04:32 2013
@@ -19,7 +19,8 @@
package org.apache.ace.agent.impl;
import static org.apache.ace.agent.impl.ConnectionUtil.DEFAULT_RETRY_TIME;
-import static org.apache.ace.agent.impl.ConnectionUtil.*;
+import static org.apache.ace.agent.impl.ConnectionUtil.HTTP_RETRY_AFTER;
+import static org.apache.ace.agent.impl.ConnectionUtil.handleIOException;
import java.io.IOException;
import java.io.InputStream;
@@ -165,7 +166,7 @@ class ContentRangeInputStream extends In
return -1;
}
- InputStream is = m_conn.getInputStream();
+ InputStream is = getInputStream();
int result = is.read();
if (result > 0) {
@@ -194,7 +195,7 @@ class ContentRangeInputStream extends In
return -1;
}
- InputStream is = m_conn.getInputStream();
+ InputStream is = getInputStream();
int read = is.read(b, off, len);
if (read >= 0) {
@@ -225,7 +226,8 @@ class ContentRangeInputStream extends In
rangeHeader = String.format("%s=%d-", BYTES, m_readTotal);
}
conn.setRequestProperty(HDR_RANGE, rangeHeader);
- } else {
+ }
+ else {
// Non-HTTP connection, skip the first few bytes when calling
this method for the first time...
if (m_contentInfo == null) {
long skip = m_readTotal;
@@ -297,7 +299,14 @@ class ContentRangeInputStream extends In
}
private long[] getHttpContentRangeInfo(HttpURLConnection conn) throws
IOException {
- int rc = conn.getResponseCode();
+ int rc;
+ try {
+ rc = conn.getResponseCode();
+ }
+ catch (IOException exception) {
+ rc = handleIOException(conn);
+ }
+
if (rc == SC_OK) {
// Non-chunked response...
if (m_readTotal > 0) {
@@ -347,6 +356,22 @@ class ContentRangeInputStream extends In
}
/**
+ * @return the current input stream, never <code>null</code>.
+ * @throws IOException
+ * in case of I/O problems accessing the input stream.
+ */
+ private InputStream getInputStream() throws IOException {
+ try {
+ return m_conn.getInputStream();
+ }
+ catch (IOException exception) {
+ handleIOException(m_conn);
+ closeChunk();
+ throw exception;
+ }
+ }
+
+ /**
* Prepares the connection for the next chunk, if needed.
*
* @return <code>true</code> if the prepare was successful (there was a
next chunk to be read), <code>false</code>
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
Tue Oct 8 09:04:32 2013
@@ -19,7 +19,7 @@
package org.apache.ace.agent.impl;
import static org.apache.ace.agent.impl.ConnectionUtil.close;
-import static org.apache.ace.agent.impl.ConnectionUtil.closeSilently;
+import static org.apache.ace.agent.impl.ConnectionUtil.*;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -104,14 +104,16 @@ public class FeedbackChannelImpl impleme
try {
synchronizeStore(storeID,
queryConnection.getInputStream(), writer);
}
+ catch (IOException e) {
+ handleIOException(queryConnection);
+ }
finally {
close(queryConnection);
}
}
writer.flush();
- ConnectionUtil.checkConnectionResponse(sendConnection);
- sendConnection.getContent();
+ checkConnectionResponse(sendConnection);
}
finally {
closeSilently(writer);
Modified:
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
(original)
+++
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
Tue Oct 8 09:04:32 2013
@@ -43,6 +43,12 @@ final class Utils {
}
}
+ static void closeSilently(HttpURLConnection resource) {
+ if (resource != null) {
+ resource.disconnect();
+ }
+ }
+
/* copy in to out */
static void copy(InputStream in, OutputStream out) throws IOException {
byte[] buffer = new byte[COPY_BUFFER_SIZE];
@@ -53,59 +59,93 @@ final class Utils {
}
}
+ static void flushStream(InputStream is) {
+ byte[] buf = new byte[COPY_BUFFER_SIZE];
+ try {
+ while (is.read(buf) > 0) {
+ // Ignore...
+ }
+ }
+ catch (IOException ex) {
+ // deal with the exception
+ }
+ finally {
+ closeSilently(is);
+ }
+ }
+
static int get(URL host, String endpoint, String customer, String name,
String version, OutputStream out) throws IOException {
+ int responseCode;
+
URL url = new URL(host, endpoint + "?customer=" + customer + "&name="
+ name + "&version=" + version);
+
+ InputStream input = null;
HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
try {
- int responseCode = connection.getResponseCode();
- if (responseCode == HttpURLConnection.HTTP_OK) {
- InputStream input = connection.getInputStream();
- try {
- copy(input, out);
- out.flush();
- }
- finally {
- closeSilently(input);
- }
- }
- return responseCode;
+ responseCode = connection.getResponseCode();
+ input = connection.getInputStream();
+
+ copy(input, out);
+ out.flush();
+ }
+ catch (IOException e) {
+ responseCode = handleIOException(connection);
}
finally {
- connection.disconnect();
+ closeSilently(input);
+ closeSilently(connection);
+ }
+
+ return responseCode;
+ }
+
+ /**
+ * @see
http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+ */
+ static int handleIOException(HttpURLConnection conn) {
+ int respCode = -1;
+ try {
+ respCode = conn.getResponseCode();
+ flushStream(conn.getErrorStream());
+ }
+ catch (IOException ex) {
+ // deal with the exception
}
+ return respCode;
}
static int put(URL host, String endpoint, String customer, String name,
String version, InputStream in) throws IOException {
URL url = new URL(host, endpoint + "?customer=" + customer + "&name="
+ name + "&version=" + version);
- HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
- connection.setDoOutput(true);
- // ACE-294: enable streaming mode causing only small amounts of memory
to be
- // used for this commit. Otherwise, the entire input stream is cached
into
- // memory prior to sending it to the server...
- connection.setChunkedStreamingMode(8192);
- connection.setRequestProperty("Content-Type",
MIME_APPLICATION_OCTET_STREAM);
- OutputStream out = connection.getOutputStream();
+
+ int responseCode;
+ HttpURLConnection connection = null;
+ OutputStream out = null;
+
try {
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setDoOutput(true);
+ // ACE-294: enable streaming mode causing only small amounts of
memory to be
+ // used for this commit. Otherwise, the entire input stream is
cached into
+ // memory prior to sending it to the server...
+ connection.setChunkedStreamingMode(8192);
+ connection.setRequestProperty("Content-Type",
MIME_APPLICATION_OCTET_STREAM);
+ out = connection.getOutputStream();
+
copy(in, out);
out.flush();
+
+ responseCode = connection.getResponseCode();
+ flushStream(connection.getInputStream());
+ }
+ catch (IOException e) {
+ responseCode = handleIOException(connection);
}
finally {
closeSilently(in);
closeSilently(out);
+ closeSilently(connection);
}
- int responseCode = connection.getResponseCode();
- if (responseCode == HttpURLConnection.HTTP_OK) {
- InputStream is = (InputStream) connection.getContent();
- try {
- while (is.read() > 0) {
- // ignore...
- }
- }
- finally {
- closeSilently(is);
- }
- }
return responseCode;
}
@@ -114,30 +154,40 @@ final class Utils {
String f2 = (name == null) ? null : "name=" + name;
String filter = ((f1 == null) ? "?" : "?" + f1 + "&") + ((f2 == null)
? "" : f2);
URL url = new URL(host, endpoint + filter);
- HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
- int responseCode = connection.getResponseCode();
- if (responseCode == HttpURLConnection.HTTP_OK) {
- InputStream input = connection.getInputStream();
- try {
- copy(input, out);
- out.flush();
- }
- finally {
- closeSilently(input);
- }
+
+ int responseCode;
+ HttpURLConnection connection = null;
+ InputStream input = null;
+
+ try {
+ connection = (HttpURLConnection) url.openConnection();
+ responseCode = connection.getResponseCode();
+ input = connection.getInputStream();
+
+ copy(input, out);
+ out.flush();
}
+ catch (IOException e) {
+ responseCode = handleIOException(connection);
+ }
+ finally {
+ closeSilently(input);
+ closeSilently(out);
+ closeSilently(connection);
+ }
+
return responseCode;
}
static void waitForWebserver(URL host) throws IOException {
- int retries = 1;
+ int retries = 1, rc = -1;
IOException ioe = null;
HttpURLConnection conn = null;
while (retries++ < 10) {
try {
conn = (HttpURLConnection) host.openConnection();
- int rc = conn.getResponseCode();
+ rc = conn.getResponseCode();
if (rc >= 0) {
return;
}
@@ -152,6 +202,9 @@ final class Utils {
return;
}
}
+ catch (IOException e) {
+ rc = handleIOException(conn);
+ }
finally {
if (conn != null) {
conn.disconnect();
Modified:
ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
(original)
+++
ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
Tue Oct 8 09:04:32 2013
@@ -34,16 +34,15 @@ import org.apache.ace.range.SortedRangeS
import org.apache.ace.repository.Repository;
/**
- * This class works as a local interface for a remote repository by handling
the network
- * communication.
+ * This class works as a local interface for a remote repository by handling
the network communication.
*/
public class RemoteRepository implements Repository {
private static final String COMMAND_QUERY = "/query";
private static final String COMMAND_CHECKOUT = "/checkout";
private static final String COMMAND_COMMIT = "/commit";
-
+
private static final String MIME_APPLICATION_OCTET_STREAM =
"application/octet-stream";
-
+
private static final int COPY_BUFFER_SIZE = 64 * 1024;
private final URL m_url;
@@ -55,9 +54,12 @@ public class RemoteRepository implements
/**
* Creates a remote repository that connects to a given location with a
given customer- and repository name.
*
- * @param url The location of the repository.
- * @param customer The customer name to use.
- * @param name The repository name to use.
+ * @param url
+ * The location of the repository.
+ * @param customer
+ * The customer name to use.
+ * @param name
+ * The repository name to use.
*/
public RemoteRepository(URL url, String customer, String name) {
if (url == null || customer == null || name == null) {
@@ -75,27 +77,29 @@ public class RemoteRepository implements
}
URL url = buildCommand(m_url, COMMAND_CHECKOUT, version);
+
HttpURLConnection connection = (HttpURLConnection)
m_connectionFactory.createConnection(url);
- if (connection.getResponseCode() == HttpServletResponse.SC_NOT_FOUND) {
- connection.disconnect();
- throw new IllegalArgumentException("Requested version not found
in remote repository. (" + connection.getResponseMessage() + ")");
+ int rc = connection.getResponseCode();
+ if (rc == HttpServletResponse.SC_NOT_FOUND) {
+ connection.disconnect();
+ throw new IllegalArgumentException("Requested version not found in
remote repository. (" + connection.getResponseMessage() + ")");
}
- if (connection.getResponseCode() != HttpServletResponse.SC_OK) {
- connection.disconnect();
+ else if (rc != HttpServletResponse.SC_OK) {
+ connection.disconnect();
throw new IOException("Connection error: " +
connection.getResponseMessage());
}
return connection.getInputStream();
-
+
}
public boolean commit(InputStream data, long fromVersion) throws
IOException, IllegalArgumentException {
URL url = buildCommand(m_url, COMMAND_COMMIT, fromVersion);
HttpURLConnection connection = (HttpURLConnection)
m_connectionFactory.createConnection(url);
-
+
// ACE-294: enable streaming mode causing only small amounts of memory
to be
- // used for this commit. Otherwise, the entire input stream is cached
into
+ // used for this commit. Otherwise, the entire input stream is cached
into
// memory prior to sending it to the server...
connection.setChunkedStreamingMode(8192);
connection.setRequestProperty("Content-Type",
MIME_APPLICATION_OCTET_STREAM);
@@ -103,51 +107,56 @@ public class RemoteRepository implements
OutputStream out = connection.getOutputStream();
try {
- copy(data, out);
- } finally {
- out.flush();
- out.close();
+ copy(data, out);
+
+ // causes the stream the be flushed and the server response to be
obtained...
+ return connection.getResponseCode() == HttpServletResponse.SC_OK;
+ }
+ finally {
+ out.flush();
+ out.close();
+ connection.disconnect();
}
-
- try {
- return connection.getResponseCode() ==
HttpServletResponse.SC_OK;
- } finally {
- connection.disconnect();
- }
}
public SortedRangeSet getRange() throws IOException {
URL url = buildCommand(m_url, COMMAND_QUERY, 0);
-
+
HttpURLConnection connection = (HttpURLConnection)
m_connectionFactory.createConnection(url);
-
+
try {
- if (connection.getResponseCode() == HttpServletResponse.SC_OK) {
- BufferedReader reader = new BufferedReader(new
InputStreamReader(connection.getInputStream()));
- try {
- String line = reader.readLine();
- if (line == null) {
- throw new IOException("Repository not found:
customer=" + m_customer + ", name=" + m_name);
- }
-
- String representation =
line.substring(line.lastIndexOf(','));
- return new SortedRangeSet(representation);
- } finally {
- reader.close();
- }
- }
-
- throw new IOException("Connection error: " +
connection.getResponseMessage());
- } finally {
- connection.disconnect();
+ if (connection.getResponseCode() == HttpServletResponse.SC_OK) {
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(connection.getInputStream()));
+ try {
+ String line = reader.readLine();
+ if (line == null) {
+ throw new IOException("Repository not found:
customer=" + m_customer + ", name=" + m_name);
+ }
+
+ String representation =
line.substring(line.lastIndexOf(','));
+ return new SortedRangeSet(representation);
+ }
+ finally {
+ reader.close();
+ }
+ }
+
+ throw new IOException("Connection error: " +
connection.getResponseMessage());
+ }
+ finally {
+ connection.disconnect();
}
}
/**
* Helper method which copies the contents of an input stream to an output
stream.
- * @param in The input stream.
- * @param out The output stream.
- * @throws java.io.IOException Thrown when one of the streams is closed
unexpectedly.
+ *
+ * @param in
+ * The input stream.
+ * @param out
+ * The output stream.
+ * @throws java.io.IOException
+ * Thrown when one of the streams is closed unexpectedly.
*/
private static void copy(InputStream in, OutputStream out) throws
IOException {
byte[] buffer = new byte[COPY_BUFFER_SIZE];
@@ -159,12 +168,13 @@ public class RemoteRepository implements
}
/**
- * Builds a command string to use in the request to the server, based on
the parameters
- * this object was created with. The version is only mandatory for
<code>CHECKOUT</code>
- * and <code>COMMIT</code>.
+ * Builds a command string to use in the request to the server, based on
the parameters this object was created
+ * with. The version is only mandatory for <code>CHECKOUT</code> and
<code>COMMIT</code>.
*
- * @param command A command string, use the <code>COMMAND_</code>
constants in this file.
- * @param version A version statement.
+ * @param command
+ * A command string, use the <code>COMMAND_</code> constants in
this file.
+ * @param version
+ * A version statement.
* @return The command string.
*/
private URL buildCommand(URL url, String command, long version) {
@@ -188,12 +198,12 @@ public class RemoteRepository implements
}
params.append("version=").append(version);
}
-
+
StringBuilder newURL = new StringBuilder();
newURL.append(url.toExternalForm());
newURL.append(command);
if (params.length() > 0) {
- newURL.append("?").append(params);
+ newURL.append("?").append(params);
}
try {
@@ -208,4 +218,4 @@ public class RemoteRepository implements
public String toString() {
return "RemoteRepository[" + m_url + "," + m_customer + "," + m_name +
"]";
}
-}
\ No newline at end of file
+}