Author: jawi
Date: Wed Oct 17 15:09:29 2012
New Revision: 1399299
URL: http://svn.apache.org/viewvc?rev=1399299&view=rev
Log:
ACE-294: enable streaming mode for all HTTP-POSTs to the server.
Modified:
ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java
ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java
ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java
ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java
ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
Modified:
ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java
(original)
+++
ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java
Wed Oct 17 15:09:29 2012
@@ -38,21 +38,22 @@ import org.apache.ace.client.repository.
import org.apache.ace.connectionfactory.ConnectionFactory;
/**
- * This class can be used as a base class for artifact preprocessors. It comes
with its
- * own upload() method, which will be used by all artifact preprocessors
anyway.
+ * This class can be used as a base class for artifact preprocessors. It comes
with its own upload() method, which will
+ * be used by all artifact preprocessors anyway.
*/
public abstract class ArtifactPreprocessorBase implements ArtifactPreprocessor
{
- /** 64k buffers should be enough for everybody... */
+ /** 64k buffers should be enough for everybody... */
protected static final int BUFFER_SIZE = 64 * 1024;
-
+
protected final ConnectionFactory m_connectionFactory;
private final ExecutorService m_executor;
/**
* Creates a new {@link ArtifactPreprocessorBase} instance.
*
- * @param connectionFactory the connection factory to use, cannot be
<code>null</code>.
+ * @param connectionFactory
+ * the connection factory to use, cannot be <code>null</code>.
*/
protected ArtifactPreprocessorBase(ConnectionFactory connectionFactory) {
m_connectionFactory = connectionFactory;
@@ -62,10 +63,13 @@ public abstract class ArtifactPreprocess
/**
* Creates a new URL for given (file) name and OBR base URL.
*
- * @param name the name of the file to create the URL for;
- * @param obrBase the OBR base URL to use.
+ * @param name
+ * the name of the file to create the URL for;
+ * @param obrBase
+ * the OBR base URL to use.
* @return a new URL for the file, never <code>null</code>.
- * @throws MalformedURLException in case of invalid characters in the
given name.
+ * @throws MalformedURLException
+ * in case of invalid characters in the given name.
*/
protected URL determineNewUrl(String name, URL obrBase) throws
MalformedURLException {
return new URL(obrBase, name);
@@ -74,7 +78,8 @@ public abstract class ArtifactPreprocess
/**
* Silently closes the given {@link Closeable} instance.
*
- * @param closable the closeable to close, may be <code>null</code>.
+ * @param closable
+ * the closeable to close, may be <code>null</code>.
*/
protected final void silentlyClose(Closeable closable) {
if (closable != null) {
@@ -90,44 +95,52 @@ public abstract class ArtifactPreprocess
/**
* Gets a stream to write an artifact to, which will be uploaded
asynchronously to the OBR.
*
- * @param name The name of the artifact.
- * @param obrBase The base URL of the obr to which this artifact should be
written.
- * @param inputStream the input stream with data to upload.
+ * @param name
+ * The name of the artifact.
+ * @param obrBase
+ * The base URL of the obr to which this artifact should be
written.
+ * @param inputStream
+ * the input stream with data to upload.
*/
protected final Future<URL> uploadAsynchronously(final String name, final
URL obrBase, final InputStream inputStream) {
return m_executor.submit(new Callable<URL>() {
public URL call() throws IOException {
- return upload(inputStream, name, obrBase);
+ return upload(inputStream, name, obrBase);
}
});
}
- /**
- * Converts a given URL to a {@link File} object.
- *
- * @param url the URL to convert, cannot be <code>null</code>.
- * @return a {@link File} object, never <code>null</code>.
- */
- protected final File urlToFile(URL url) {
- File file;
+ /**
+ * Converts a given URL to a {@link File} object.
+ *
+ * @param url
+ * the URL to convert, cannot be <code>null</code>.
+ * @return a {@link File} object, never <code>null</code>.
+ */
+ protected final File urlToFile(URL url) {
+ File file;
try {
file = new File(url.toURI());
}
catch (URISyntaxException e) {
file = new File(url.getPath());
}
- return file;
- }
+ return file;
+ }
/**
* Uploads an artifact synchronously to an OBR.
*
- * @param input A inputstream from which the artifact can be read.
- * @param name The name of the artifact. If the name is not unique, an
IOException will be thrown.
- * @param obrBase The base URL of the obr to which this artifact should be
written.
+ * @param input
+ * A inputstream from which the artifact can be read.
+ * @param name
+ * The name of the artifact. If the name is not unique, an
IOException will be thrown.
+ * @param obrBase
+ * The base URL of the obr to which this artifact should be
written.
* @return A URL to the uploaded artifact; this is identical to calling
<code>determineNewUrl(name, obrBase)</code>
- * @throws IOException If there was an error reading from
<code>input</code>, or if there was a problem communicating
- * with the OBR.
+ * @throws IOException
+ * If there was an error reading from <code>input</code>, or
if there was a problem communicating with
+ * the OBR.
*/
private URL upload(InputStream input, String name, URL obrBase) throws
IOException {
if (obrBase == null) {
@@ -142,12 +155,12 @@ public abstract class ArtifactPreprocess
url = determineNewUrl(name, obrBase);
if (!urlPointsToExistingFile(url)) {
- if ("file".equals(url.getProtocol())) {
- uploadToFile(input, url);
- }
- else {
- uploadToRemote(input, url);
- }
+ if ("file".equals(url.getProtocol())) {
+ uploadToFile(input, url);
+ }
+ else {
+ uploadToRemote(input, url);
+ }
}
}
catch (IOException ioe) {
@@ -163,9 +176,12 @@ public abstract class ArtifactPreprocess
/**
* Uploads an artifact to a local file location.
*
- * @param input the input stream of the (local) artifact to upload.
- * @param url the URL of the (file) artifact to upload to.
- * @throws IOException in case of I/O problems.
+ * @param input
+ * the input stream of the (local) artifact to upload.
+ * @param url
+ * the URL of the (file) artifact to upload to.
+ * @throws IOException
+ * in case of I/O problems.
*/
private void uploadToFile(InputStream input, URL url) throws IOException {
File file = urlToFile(url);
@@ -188,17 +204,26 @@ public abstract class ArtifactPreprocess
/**
* Uploads an artifact to a remote location.
*
- * @param input the input stream of the (local) artifact to upload.
- * @param url the URL of the (remote) artifact to upload to.
- * @throws IOException in case of I/O problems, or when the upload was
refused by the remote.
+ * @param input
+ * the input stream of the (local) artifact to upload.
+ * @param url
+ * the URL of the (remote) artifact to upload to.
+ * @throws IOException
+ * in case of I/O problems, or when the upload was refused by
the remote.
*/
private void uploadToRemote(InputStream input, URL url) throws IOException
{
OutputStream output = null;
try {
URLConnection connection =
m_connectionFactory.createConnection(url);
+ if (connection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts
of memory to be
+ // used for this commit. Otherwise, the entire input stream is
cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection) connection).setChunkedStreamingMode(8192);
+ }
connection.setDoOutput(true);
-
+
output = connection.getOutputStream();
byte[] buffer = new byte[BUFFER_SIZE];
@@ -226,48 +251,54 @@ public abstract class ArtifactPreprocess
silentlyClose(output);
}
}
-
+
/**
* Determines whether the given URL points to an existing file.
*
- * @param url the URL to test, cannot be <code>null</code>.
+ * @param url
+ * the URL to test, cannot be <code>null</code>.
* @return <code>true</code> if the given URL points to an existing file,
<code>false</code> otherwise.
*/
private boolean urlPointsToExistingFile(URL url) {
- boolean result = false;
+ boolean result = false;
+
+ if ("file".equals(url.getProtocol())) {
+ result = urlToFile(url).exists();
+ }
+ else {
+ try {
+ URLConnection connection =
m_connectionFactory.createConnection(url);
+
+ if (connection instanceof HttpURLConnection) {
+ HttpURLConnection hc = (HttpURLConnection) connection;
+
+ // Perform a HEAD on the file, to see whether it exists...
+ hc.setRequestMethod("HEAD");
+ try {
+ int responseCode = hc.getResponseCode();
+ result = (responseCode == HttpURLConnection.HTTP_OK);
+ }
+ finally {
+ hc.disconnect();
+ }
+ }
+ else {
+ // In all other scenario's: try to read a single byte from
the input
+ // stream, if this succeeds, we can assume the file
exists...
+ InputStream is = connection.getInputStream();
+ try {
+ is.read();
+ }
+ finally {
+ silentlyClose(is);
+ }
+ }
+ }
+ catch (IOException e) {
+ // Ignore; assume file does not exist...
+ }
+ }
- if ("file".equals(url.getProtocol())) {
- result = urlToFile(url).exists();
- } else {
- try {
- URLConnection connection =
m_connectionFactory.createConnection(url);
-
- if (connection instanceof HttpURLConnection) {
- HttpURLConnection hc =
(HttpURLConnection) connection;
-
- // Perform a HEAD on the file, to see
whether it exists...
- hc.setRequestMethod("HEAD");
- try {
- int responseCode =
hc.getResponseCode();
- result = (responseCode ==
HttpURLConnection.HTTP_OK);
- } finally {
- hc.disconnect();
- }
- } else {
- // In all other scenario's: try to read
a single byte from the input
- // stream, if this succeeds, we can
assume the file exists...
- InputStream is =
connection.getInputStream();
- try {
- is.read();
- } finally {
- silentlyClose(is);
- }
- }
- } catch (IOException e) {
- // Ignore; assume file does not exist...
- }
- }
-
- return result;
+ return result;
}
}
Modified:
ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java
(original)
+++
ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java
Wed Oct 17 15:09:29 2012
@@ -52,23 +52,23 @@ import org.osgi.service.log.LogService;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
/**
- * Implementation class for the ArtifactRepository. For 'what it does', see
ArtifactRepository,
- * for 'how it works', see ObjectRepositoryImpl.<br>
+ * Implementation class for the ArtifactRepository. For 'what it does', see
ArtifactRepository, for 'how it works', see
+ * ObjectRepositoryImpl.<br>
* <br>
* This class has some extended functionality when compared to
<code>ObjectRepositoryImpl</code>,
* <ul>
- * <li> it keeps track of all <code>ArtifactHelper</code>s, and serves them to
its inhabitants.
- * <li> it handles importing of artifacts.
+ * <li>it keeps track of all <code>ArtifactHelper</code>s, and serves them to
its inhabitants.
+ * <li>it handles importing of artifacts.
* </ul>
*/
public class ArtifactRepositoryImpl extends
ObjectRepositoryImpl<ArtifactObjectImpl, ArtifactObject> implements
ArtifactRepository {
private final static String XML_NODE = "artifacts";
-
+
// Injected by Dependency Manager
private volatile BundleContext m_context;
private volatile LogService m_log;
private volatile ConnectionFactory m_connectionFactory;
-
+
private final Map<String, ArtifactHelper> m_helpers = new HashMap<String,
ArtifactHelper>();
private URL m_obrBase;
@@ -121,10 +121,11 @@ public class ArtifactRepositoryImpl exte
ArtifactObjectImpl createNewInhabitant(Map<String, String> attributes,
Map<String, String> tags) {
ArtifactHelper helper =
getHelper(attributes.get(ArtifactObject.KEY_MIMETYPE));
ArtifactObjectImpl ao = new
ArtifactObjectImpl(helper.checkAttributes(attributes),
helper.getMandatoryAttributes(), tags, this, this);
- if ((ao.getAttribute("upload") != null) && (m_obrBase != null)){
+ if ((ao.getAttribute("upload") != null) && (m_obrBase != null)) {
try {
ao.addAttribute(ArtifactObject.KEY_URL, new URL(m_obrBase,
ao.getDefinition() + ao.getAttribute("upload")).toString());
- } catch (MalformedURLException e) {
+ }
+ catch (MalformedURLException e) {
throw new IllegalStateException(e);
}
}
@@ -138,12 +139,15 @@ public class ArtifactRepositoryImpl exte
/**
* Helper method for this repository's inhabitants, which finds the
necessary helpers.
- * @param mimetype The mimetype for which a helper should be found.
+ *
+ * @param mimetype
+ * The mimetype for which a helper should be found.
* @return An artifact helper for the given mimetype.
- * @throws IllegalArgumentException when the mimetype is invalid, or no
helpers are available.
+ * @throws IllegalArgumentException
+ * when the mimetype is invalid, or no helpers are available.
*/
ArtifactHelper getHelper(String mimetype) {
- synchronized(m_helpers) {
+ synchronized (m_helpers) {
if ((mimetype == null) || (mimetype.length() == 0)) {
throw new IllegalArgumentException("Without a mimetype, we
cannot find a helper.");
}
@@ -162,7 +166,7 @@ public class ArtifactRepositoryImpl exte
* Method intended for adding artifact helpers by the bundle's activator.
*/
void addHelper(String mimetype, ArtifactHelper helper) {
- synchronized(m_helpers) {
+ synchronized (m_helpers) {
if ((mimetype == null) || (mimetype.length() == 0)) {
m_log.log(LogService.LOG_WARNING, "An ArtifactHelper has been
published without a proper mimetype.");
}
@@ -176,7 +180,7 @@ public class ArtifactRepositoryImpl exte
* Method intended for removing artifact helpers by the bundle's activator.
*/
void removeHelper(String mimetype, ArtifactHelper helper) {
- synchronized(m_helpers) {
+ synchronized (m_helpers) {
if ((mimetype == null) || (mimetype.length() == 0)) {
m_log.log(LogService.LOG_WARNING, "An ArtifactHelper is being
removed without a proper mimetype.");
}
@@ -187,13 +191,14 @@ public class ArtifactRepositoryImpl exte
}
/**
- * Utility function that takes either a URL or a String representing a
mimetype,
- * and returns the corresponding <code>ArtifactHelper</code>,
<code>ArtifactRecognizer</code>
- * and, if not specified, the mimetype.
- * @param input Either a <code>URL</code> pointing to a physical artifact,
or a <code>String</code>
- * representing a mime type.
+ * Utility function that takes either a URL or a String representing a
mimetype, and returns the corresponding
+ * <code>ArtifactHelper</code>, <code>ArtifactRecognizer</code> and, if
not specified, the mimetype.
+ *
+ * @param input
+ * Either a <code>URL</code> pointing to a physical artifact,
or a <code>String</code> representing a
+ * mime type.
* @return A mapping from a class (<code>ArtifactRecognizer</code>,
<code>ArtifactHelper</code> or
- * <code>String</code> to an instance of that class as a result.
+ * <code>String</code> to an instance of that class as a result.
*/
protected Map<Class<?>, Object> findRecognizerAndHelper(Object input)
throws IllegalArgumentException {
// check input.
@@ -248,7 +253,8 @@ public class ArtifactRepositoryImpl exte
break;
}
}
- } finally {
+ }
+ finally {
m_context.ungetService(ref);
}
}
@@ -278,7 +284,7 @@ public class ArtifactRepositoryImpl exte
return mimetype != null;
}
catch (Exception e) {
- //too bad... Nothing to do now.
+ // too bad... Nothing to do now.
return false;
}
}
@@ -336,7 +342,7 @@ public class ArtifactRepositoryImpl exte
private ArtifactObject importArtifact(URL artifact, ArtifactRecognizer
recognizer, ArtifactHelper helper, String mimetype, boolean overwrite, boolean
upload) throws IOException {
ArtifactResource resource = convertToArtifactResource(artifact);
-
+
Map<String, String> attributes = recognizer.extractMetaData(resource);
Map<String, String> tags = new HashMap<String, String>();
@@ -347,39 +353,44 @@ public class ArtifactRepositoryImpl exte
}
String artifactURL = artifact.toString();
-
+
attributes.put(ArtifactObject.KEY_URL, artifactURL);
-
+
if (upload) {
attributes.put("upload", recognizer.getExtension(resource));
}
ArtifactObject result = create(attributes, tags);
-
+
if (upload) {
try {
upload(artifact, result.getDefinition() +
attributes.get("upload"), mimetype);
- } catch (IOException ex) {
+ }
+ catch (IOException ex) {
remove(result);
throw ex;
}
finally {
try {
attributes.remove("upload");
- } catch (Exception ex) {
+ }
+ catch (Exception ex) {
// Not much we can do
}
}
}
return result;
-
+
}
/**
- * Helper method which checks a given URL for 'validity', that is, does
this URL point
- * to something that can be read.
- * @param artifact A URL pointing to an artifact.
- * @throws IllegalArgumentException when the URL does not point to a valid
file.
+ * Helper method which checks a given URL for 'validity', that is, does
this URL point to something that can be
+ * read.
+ *
+ * @param artifact
+ * A URL pointing to an artifact.
+ * @throws IllegalArgumentException
+ * when the URL does not point to a valid file.
*/
private void checkURL(URL artifact) throws IllegalArgumentException {
@@ -406,17 +417,21 @@ public class ArtifactRepositoryImpl exte
String artifactName = artifact.toString();
for (byte b : artifactName.substring(artifactName.lastIndexOf('/') +
1).getBytes()) {
if (!(((b >= 'A') && (b <= 'Z')) || ((b >= 'a') && (b <= 'z')) ||
((b >= '0') && (b <= '9')) || (b == '.') || (b == '-') || (b == '_'))) {
- throw new IllegalArgumentException("Artifact " + artifactName
+ "'s name contains an illegal character '" + new String(new byte[] {b}) + "'");
+ throw new IllegalArgumentException("Artifact " + artifactName
+ "'s name contains an illegal character '" + new String(new byte[] { b }) +
"'");
}
}
}
/**
* Uploads an artifact to the OBR.
- * @param artifact URL pointing to the local artifact.
- * @param mimetype The mimetype of this artifact.
+ *
+ * @param artifact
+ * URL pointing to the local artifact.
+ * @param mimetype
+ * The mimetype of this artifact.
* @return The persistent URL of this artifact.
- * @throws IOException for any problem uploading the artifact.
+ * @throws IOException
+ * for any problem uploading the artifact.
*/
private URL upload(URL artifact, String definition, String mimetype)
throws IOException {
if (m_obrBase == null) {
@@ -432,11 +447,18 @@ public class ArtifactRepositoryImpl exte
url = new URL(m_obrBase, definition);
URLConnection connection =
m_connectionFactory.createConnection(url);
-
+
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
+
connection.setRequestProperty("Content-Type", mimetype);
+ if (connection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts
of memory to be
+ // used for this commit. Otherwise, the entire input stream is
cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection) connection).setChunkedStreamingMode(8192);
+ }
output = connection.getOutputStream();
@@ -446,11 +468,11 @@ public class ArtifactRepositoryImpl exte
}
output.close();
-
+
if (connection instanceof HttpURLConnection) {
int responseCode = ((HttpURLConnection)
connection).getResponseCode();
switch (responseCode) {
- case HttpURLConnection.HTTP_OK :
+ case HttpURLConnection.HTTP_OK:
break;
case HttpURLConnection.HTTP_CONFLICT:
throw new IOException("Artifact already exists in
storage.");
@@ -517,7 +539,13 @@ public class ArtifactRepositoryImpl exte
/**
* Custom comparator which sorts service references by service rank,
highest rank first.
*/
- private static Comparator<ServiceReference> SERVICE_RANK_COMPARATOR = new
Comparator<ServiceReference>() { // TODO ServiceReferences are comparable by
default now
+ private static Comparator<ServiceReference> SERVICE_RANK_COMPARATOR = new
Comparator<ServiceReference>() { // TODO
+
// ServiceReferences
+
// are
+
// comparable
+
// by
+
// default
+
// now
public int compare(ServiceReference o1, ServiceReference o2) {
int rank1 = 0;
int rank2 = 0;
@@ -539,18 +567,19 @@ public class ArtifactRepositoryImpl exte
return rank1 - rank2;
}
};
-
+
private InputStream openInputStream(URL artifactURL) throws IOException {
URLConnection connection =
m_connectionFactory.createConnection(artifactURL);
return connection.getInputStream();
}
/**
- * Converts a given URL to a {@link ArtifactResource} that abstracts the
way we access the contents of
- * the URL away from the URL itself. This way, we can avoid having to pass
authentication credentials,
- * or a {@link ConnectionFactory} to the artifact recognizers.
- *
- * @param url the URL to convert, can be <code>null</code> in which case
<code>null</code> is returned.
+ * Converts a given URL to a {@link ArtifactResource} that abstracts the
way we access the contents of the URL away
+ * from the URL itself. This way, we can avoid having to pass
authentication credentials, or a
+ * {@link ConnectionFactory} to the artifact recognizers.
+ *
+ * @param url
+ * the URL to convert, can be <code>null</code> in which case
<code>null</code> is returned.
* @return an {@link ArtifactResource}, or <code>null</code> if the given
URL was <code>null</code>.
*/
private ArtifactResource convertToArtifactResource(final URL url) {
@@ -562,7 +591,7 @@ public class ArtifactRepositoryImpl exte
public URL getURL() {
return url;
}
-
+
public InputStream openStream() throws IOException {
// Take care of the fact that an URL could need credentials to
be accessible!!!
URLConnection conn =
m_connectionFactory.createConnection(getURL());
Modified:
ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java
(original)
+++
ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java
Wed Oct 17 15:09:29 2012
@@ -62,8 +62,7 @@ public class LogSyncTask implements Runn
}
/**
- * Synchronize the log events available remote with the events available
- * locally.
+ * Synchronize the log events available remote with the events available
locally.
*/
public void run() {
URL host = m_discovery.discover();
@@ -74,28 +73,34 @@ public class LogSyncTask implements Runn
m_log.log(LogService.LOG_WARNING, "Unable to synchronize log with
remote (endpoint=" + m_endpoint + ") - none available");
return;
}
-
- if ("file".equals(host.getProtocol())) {
- // if the discovery URL is a file, we cannot sync, so we
silently return here
- return;
- }
- String targetId = m_identification.getID();
+ if ("file".equals(host.getProtocol())) {
+ // if the discovery URL is a file, we cannot sync, so we silently
return here
+ return;
+ }
+
+ String targetId = m_identification.getID();
URLConnection sendConnection = null;
try {
sendConnection = m_connectionFactory.createConnection(new
URL(host, m_endpoint + "/" + COMMAND_SEND));
sendConnection.setDoOutput(true);
+ if (sendConnection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts
of memory to be
+ // used for this commit. Otherwise, the entire input stream is
cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection)
sendConnection).setChunkedStreamingMode(8192);
+ }
long[] logIDs = m_LogStore.getLogIDs();
for (int i = 0; i < logIDs.length; i++) {
URL url = new URL(host, m_endpoint + "/" + COMMAND_QUERY + "?"
+ PARAMETER_TARGETID + "=" + targetId + "&" + PARAMETER_LOGID + "=" +
logIDs[i]);
-
+
URLConnection queryConnection =
m_connectionFactory.createConnection(url);
// TODO: make sure no actual call is made using sendConnection
// when there's nothing to sync
synchronizeLog(logIDs[i], queryConnection.getInputStream(),
sendConnection);
}
-
+
// Make sure to send the actual POST request...
sendConnection.getContent();
}
@@ -113,20 +118,16 @@ public class LogSyncTask implements Runn
}
/**
- * Synchronizes a single log (there can be multiple log/logid's per
- * target).
+ * Synchronizes a single log (there can be multiple log/logid's per
target).
*
* @param logID
* ID of the log to synchronize.
* @param queryInput
- * Stream pointing to a query result for the events available
- * remotely for this log id
+ * Stream pointing to a query result for the events available
remotely for this log id
* @param sendConnection
- * .getOutputStream() Stream to write the events to that are
- * missing on the remote side.
+ * .getOutputStream() Stream to write the events to that are
missing on the remote side.
* @throws java.io.IOException
- * If synchronization could not be completed due to an I/O
- * failure.
+ * If synchronization could not be completed due to an I/O
failure.
*/
protected void synchronizeLog(long logID, InputStream queryInput,
URLConnection sendConnection) throws IOException {
long highestLocal = m_LogStore.getHighestID(logID);
@@ -134,14 +135,14 @@ public class LogSyncTask implements Runn
// No events, no need to synchronize
return;
}
-
+
SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal);
SortedRangeSet remoteRange = getDescriptor(queryInput).getRangeSet();
SortedRangeSet delta = remoteRange.diffDest(localRange);
RangeIterator rangeIterator = delta.iterator();
BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(sendConnection.getOutputStream()));
-
+
if (rangeIterator.hasNext()) {
long lowest = rangeIterator.next();
long highest = delta.getHigh();
@@ -171,8 +172,7 @@ public class LogSyncTask implements Runn
*
* @param queryInput
* Stream containing a LogDescriptor object.
- * @return LogDescriptor object reflecting the range contained in the
- * stream.
+ * @return LogDescriptor object reflecting the range contained in the
stream.
* @throws java.io.IOException
* If no range could be determined due to an I/O failure.
*/
@@ -184,7 +184,7 @@ public class LogSyncTask implements Runn
if (rangeString != null) {
try {
return new LogDescriptor(rangeString);
- }
+ }
catch (IllegalArgumentException iae) {
throw new IOException("Could not determine highest remote
event id, received malformed event range (" + rangeString + ")");
}
@@ -192,16 +192,16 @@ public class LogSyncTask implements Runn
else {
throw new IOException("Could not construct LogDescriptor from
stream because stream is empty");
}
- }
+ }
finally {
if (queryReader != null) {
try {
queryReader.close();
- }
+ }
catch (Exception ex) {
// not much we can do
}
}
}
}
-}
\ No newline at end of file
+}
Modified:
ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java
(original)
+++
ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java
Wed Oct 17 15:09:29 2012
@@ -119,8 +119,15 @@ public class LogSyncTask implements Runn
OutputStream sendOutput = null;
try {
URLConnection sendConnection =
m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" +
COMMAND_SEND));
- sendConnection.setDoOutput(true);
+ if (sendConnection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts
of memory to be
+ // used for this commit. Otherwise, the entire input stream is
cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection)
sendConnection).setChunkedStreamingMode(8192);
+ }
+ sendConnection.setDoOutput(true);
+
sendOutput = sendConnection.getOutputStream();
BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(sendOutput));
Modified:
ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
(original)
+++
ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
Wed Oct 17 15:09:29 2012
@@ -94,8 +94,12 @@ public class RemoteRepository implements
URL url = buildCommand(m_url, COMMAND_COMMIT, fromVersion);
HttpURLConnection connection = (HttpURLConnection)
m_connectionFactory.createConnection(url);
- connection.setDoOutput(true);
+ // ACE-294: enable streaming mode causing only small amounts of memory
to be
+ // used for this commit. Otherwise, the entire input stream is cached
into
+ // memory prior to sending it to the server...
+ connection.setChunkedStreamingMode(8192);
connection.setRequestProperty("Content-Type",
MIME_APPLICATION_OCTET_STREAM);
+ connection.setDoOutput(true);
OutputStream out = connection.getOutputStream();
try {
Modified:
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
(original)
+++
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
Wed Oct 17 15:09:29 2012
@@ -60,6 +60,10 @@ final class Utils {
URL url = new URL(host, endpoint + "?customer=" + customer + "&name="
+ name + "&version=" + version);
HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
connection.setDoOutput(true);
+ // ACE-294: enable streaming mode causing only small amounts of memory
to be
+ // used for this commit. Otherwise, the entire input stream is cached
into
+ // memory prior to sending it to the server...
+ connection.setChunkedStreamingMode(8192);
connection.setRequestProperty("Content-Type",
MIME_APPLICATION_OCTET_STREAM);
OutputStream out = connection.getOutputStream();
copy(in, out);