Author: jawi
Date: Fri Nov 8 11:27:35 2013
New Revision: 1539985
URL: http://svn.apache.org/r1539985
Log:
ACE-413 - DP download could not be resumed if download was complete:
- when the DP is already completely downloaded, the download would be
completely restarted due to the fact that the server didn't handle
the 416 response correctly;
- added several tests to verify that handling 416 responses is done
correctly.
Modified:
ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java
Modified:
ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java?rev=1539985&r1=1539984&r2=1539985&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java
(original)
+++
ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java
Fri Nov 8 11:27:35 2013
@@ -18,6 +18,7 @@
*/
package org.apache.ace.agent.itest;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -41,9 +42,11 @@ import org.apache.ace.agent.DeploymentHa
import org.apache.ace.agent.EventListener;
import org.apache.ace.agent.LoggingHandler.Levels;
import org.apache.ace.test.constants.TestConstants;
+import org.apache.ace.test.utils.FileUtils;
import org.apache.felix.dm.Component;
import org.osgi.framework.Bundle;
import org.osgi.framework.Constants;
+import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.Version;
import org.osgi.service.http.HttpService;
@@ -172,8 +175,14 @@ public class AgentDeploymentTest extends
end = fileLength / 2;
}
- resp.addHeader("Content-Range", String.format("bytes
%d-%d/%d", start, end, fileLength));
-
+ if (start == end) {
+ // Invalid...
+ resp.addHeader("Content-Range", String.format("bytes
*/%d", fileLength));
+ resp.setStatus(416); // content range not satisfiable...
+ return;
+ }
+
+ resp.addHeader("Content-Range", String.format("bytes
%d-%d/%d", start, end - 1, fileLength));
resp.setStatus(206); // partial
}
@@ -365,20 +374,6 @@ public class AgentDeploymentTest extends
/**
* Tests that we can install upgrades for an earlier installed DP.
*/
- public void testInstallUpgradeDeploymentPackage() throws Exception {
- setupAgentForNonStreamingDeployment();
-
- // Try to install a DP that fails at bundle-starting due to a
non-existing class, but this does not revert the
- // installation of the DP itself...
- expectSuccessfulDeployment(m_package5, null);
-
- // If we install a newer version, it should succeed...
- expectSuccessfulDeployment(m_package6, null);
- }
-
- /**
- * Tests that we can install upgrades for an earlier installed DP.
- */
public void testGetSizeEstimateForDeploymentPackage() throws Exception {
AgentControl control = getService(AgentControl.class);
@@ -386,7 +381,7 @@ public class AgentDeploymentTest extends
ConfigurationHandler configurationHandler =
control.getConfigurationHandler();
configurationHandler.putAll(props);
-
+
// Allow configuration to propagate...
Thread.sleep(100L);
@@ -410,6 +405,20 @@ public class AgentDeploymentTest extends
}
/**
+ * Tests that we can install upgrades for an earlier installed DP.
+ */
+ public void testInstallUpgradeDeploymentPackage() throws Exception {
+ setupAgentForNonStreamingDeployment();
+
+ // Try to install a DP that fails at bundle-starting due to a
non-existing class, but this does not revert the
+ // installation of the DP itself...
+ expectSuccessfulDeployment(m_package5, null);
+
+ // If we install a newer version, it should succeed...
+ expectSuccessfulDeployment(m_package6, null);
+ }
+
+ /**
* Tests the deployment of "non-streamed" deployment packages in various
situations.
*/
public void testNonStreamingDeployment() throws Exception {
@@ -430,6 +439,30 @@ public class AgentDeploymentTest extends
/**
* Tests the deployment of "non-streamed" deployment packages in various
situations.
*/
+ public void testNonStreamingDeployment_ChunkedContentRange() throws
Exception {
+ setupAgentForNonStreamingDeployment();
+
+ expectSuccessfulDeployment(m_package6, Failure.CONTENT_RANGE);
+ }
+
+ /**
+ * Tests the deployment of "non-streamed" deployment packages in various
situations.
+ * <p>
+ * This test simulates a DP that is already downloaded, but not yet
installed as reported in ACE-413.
+ * </p>
+ */
+ public void
testNonStreamingDeployment_ChunkedContentAlreadyCompletelyDownloaded() throws
Exception {
+ setupAgentForNonStreamingDeployment();
+
+ // Simulate that the DP is already downloaded...
+ simulateDPDownloadComplete(m_package6);
+
+ expectSuccessfulDeployment(m_package6, Failure.CONTENT_RANGE);
+ }
+
+ /**
+ * Tests the deployment of "non-streamed" deployment packages in various
situations.
+ */
public void testNonStreamingDeployment_CorruptStream() throws Exception {
setupAgentForNonStreamingDeployment();
@@ -464,30 +497,30 @@ public class AgentDeploymentTest extends
}
/**
- * Tests the deployment of "non-streamed" deployment packages in various
situations.
+ * Tests the deployment of "streamed" deployment packages in various
situations.
*/
- public void testNonStreamingDeployment_ChunkedContentRange() throws
Exception {
- setupAgentForNonStreamingDeployment();
+ public void testStreamingDeployment() throws Exception {
+ setupAgentForStreamingDeployment();
- expectSuccessfulDeployment(m_package6, Failure.CONTENT_RANGE);
+ expectSuccessfulDeployment(m_package6, null);
}
/**
* Tests the deployment of "streamed" deployment packages in various
situations.
*/
- public void testStreamingDeployment() throws Exception {
+ public void testStreamingDeployment_AbortStream() throws Exception {
setupAgentForStreamingDeployment();
- expectSuccessfulDeployment(m_package6, null);
+ expectFailedDeployment(m_package5, Failure.ABORT_STREAM);
}
/**
* Tests the deployment of "streamed" deployment packages in various
situations.
*/
- public void testStreamingDeployment_AbortStream() throws Exception {
+ public void testStreamingDeployment_ChunkedContentRange() throws Exception
{
setupAgentForStreamingDeployment();
- expectFailedDeployment(m_package5, Failure.ABORT_STREAM);
+ expectSuccessfulDeployment(m_package1, Failure.CONTENT_RANGE);
}
/**
@@ -527,15 +560,6 @@ public class AgentDeploymentTest extends
}
/**
- * Tests the deployment of "streamed" deployment packages in various
situations.
- */
- public void testStreamingDeployment_ChunkedContentRange() throws Exception
{
- setupAgentForStreamingDeployment();
-
- expectSuccessfulDeployment(m_package1, Failure.CONTENT_RANGE);
- }
-
- /**
* Tests the deployment of "streamed" deployment packages simulating an
"unstable" connection.
*/
public void testStreamingDeploymentWithUnstableConnection() throws
Exception {
@@ -582,19 +606,6 @@ public class AgentDeploymentTest extends
}
@Override
- protected Component[] getDependencies() {
- m_listener = new TestEventListener();
- return new Component[] {
- createComponent()
- .setImplementation(this)
-
.add(createServiceDependency().setService(HttpService.class).setRequired(true)),
- createComponent()
- .setInterface(EventListener.class.getName(), null)
- .setImplementation(m_listener)
- };
- }
-
- @Override
protected void doTearDown() throws Exception {
// Remove all provisioned components...
m_dependencyManager.clear();
@@ -614,6 +625,19 @@ public class AgentDeploymentTest extends
resetAgentBundleState();
}
+ @Override
+ protected Component[] getDependencies() {
+ m_listener = new TestEventListener();
+ return new Component[] {
+ createComponent()
+ .setImplementation(this)
+
.add(createServiceDependency().setService(HttpService.class).setRequired(true)),
+ createComponent()
+ .setInterface(EventListener.class.getName(), null)
+ .setImplementation(m_listener)
+ };
+ }
+
private Map<String, String> createAgentConfiguration(boolean useStreaming,
int syncInterval) {
Map<String, String> props = new HashMap<String, String>();
props.put(AgentConstants.CONFIG_DISCOVERY_SERVERURLS,
String.format("http://localhost:%d/", TestConstants.PORT));
@@ -675,6 +699,26 @@ public class AgentDeploymentTest extends
waitForInstalledVersion(Version.emptyVersion);
}
+ /**
+ * Simulates a DP that is already completely downloaded.
+ *
+ * @param _package
+ * the test package to simulate a download for, cannot be
<code>null</code>.
+ * @throws IOException
+ * in case of I/O problems.
+ */
+ private void simulateDPDownloadComplete(TestPackage _package) throws
IOException {
+ Bundle agentBundle = FrameworkUtil.getBundle(AgentConstants.class);
+ assertNotNull(agentBundle);
+ assertFalse(agentBundle.getBundleId() ==
m_bundleContext.getBundle().getBundleId());
+
+ // The filename used for DP is the encoded URL...
+ String dpFilename =
String.format("http%%3A%%2F%%2Flocalhost%%3A%d%%2Fdeployment%%2F%s%%2Fversions%%2F%s",
TestConstants.PORT, AGENT_ID, _package.getVersion());
+ File dpFile = new File(agentBundle.getBundleContext().getDataFile(""),
dpFilename);
+
+ FileUtils.copy(_package.getFile(), dpFile);
+ }
+
private void waitForEventReceived(String topic) throws Exception {
int timeout = 10000;
while (!m_listener.containsTopic(topic)) {
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java?rev=1539985&r1=1539984&r2=1539985&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
Fri Nov 8 11:27:35 2013
@@ -44,6 +44,7 @@ class ContentRangeInputStream extends In
private static final int SC_OK = 200;
private static final int SC_PARTIAL_CONTENT = 206;
+ private static final int SC_RANGE_NOT_SATISFIABLE = 416;
private static final int SC_SERVICE_UNAVAILABLE = 503;
private static final int ST_EOF = -1;
@@ -320,29 +321,22 @@ class ContentRangeInputStream extends In
return new long[] { totalBytes, totalBytes };
}
else if (rc == SC_PARTIAL_CONTENT) {
+ // Chunked response, see how many bytes we've got to read for it...
String contentRange = conn.getHeaderField(HDR_CONTENT_RANGE);
if (contentRange == null) {
throw new IOException("Server returned no Content-Range for
partial content");
}
- if (!contentRange.startsWith(BYTES_)) {
- throw new IOException("Server returned non-byte Content-Range
" + contentRange);
- }
-
- String[] parts = contentRange.substring(6).split("/");
- String[] rangeDef = parts[0].split("-");
- long start = Long.parseLong(rangeDef[0]);
- long end = Long.parseLong(rangeDef[1]);
-
- long totalBytes;
- if ("*".equals(parts[1])) {
- totalBytes = -1L;
- }
- else {
- totalBytes = Long.parseLong(parts[1]);
+ return parseContentRangeHeader(contentRange);
+ }
+ else if (rc == SC_RANGE_NOT_SATISFIABLE) {
+ // Range not satisfiable, we might already have completed the
download?
+ String contentRange = conn.getHeaderField(HDR_CONTENT_RANGE);
+ if (contentRange != null) {
+ return parseContentRangeHeader(contentRange);
}
-
- return new long[] { (end - start), totalBytes };
+
+ // fall through, we cannot handle this...
}
else if (rc == SC_SERVICE_UNAVAILABLE) {
// Service is unavailable, throw an exception to try it again
later...
@@ -350,9 +344,8 @@ class ContentRangeInputStream extends In
throw new RetryAfterException(retry);
}
- else {
- throw new IOException("Unknown/unexpected status code: " + rc);
- }
+
+ throw new IOException("Unknown/unexpected status code: " + rc);
}
/**
@@ -372,6 +365,44 @@ class ContentRangeInputStream extends In
}
/**
+ * Parses a Content-Range header, which should be a byte-range
specification of the content to expect.
+ *
+ * @param value
+ * the Content-Range header value to parse, cannot be
<code>null</code>.
+ * @return an array with two elements, the first indicating the length of
the current chunk, and the second the
+ * total length of the content.
+ * @throws IOException
+ * in case a non-byte Content-Range header value was given.
+ */
+ private long[] parseContentRangeHeader(String value) throws IOException {
+ if (!value.startsWith(BYTES_)) {
+ throw new IOException("Server returned non-byte Content-Range " +
value);
+ }
+
+ String[] parts = value.substring(6).split("/");
+
+ long chunkSize = 0L;
+ if (!"*".equals(parts[0])) {
+ String[] rangeDef = parts[0].split("-");
+
+ long start = Long.parseLong(rangeDef[0]);
+ long end = Long.parseLong(rangeDef[1]);
+
+ chunkSize = end - start;
+ }
+
+ long totalBytes;
+ if ("*".equals(parts[1])) {
+ totalBytes = -1L;
+ }
+ else {
+ totalBytes = Long.parseLong(parts[1]);
+ }
+
+ return new long[] { chunkSize, totalBytes };
+ }
+
+ /**
* Prepares the connection for the next chunk, if needed.
*
* @return <code>true</code> if the prepare was successful (there was a
next chunk to be read), <code>false</code>
@@ -397,7 +428,7 @@ class ContentRangeInputStream extends In
m_contentInfo = contentInfo;
}
-
- return (m_conn != null);
+ // Make sure there's still content remaining to be read...
+ return (m_conn != null) && contentRemaining();
}
}
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java?rev=1539985&r1=1539984&r2=1539985&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java
Fri Nov 8 11:27:35 2013
@@ -63,19 +63,25 @@ final class DownloadCallableImpl impleme
os = new BufferedOutputStream(new FileOutputStream(m_target,
appendTarget));
byte buffer[] = new byte[READBUFFER_SIZE];
- long bytesRead = targetLength, totalBytes = -1L;
+ long bytesRead = targetLength;
+ long totalBytes = -1L;
int read = 0;
try {
while (!Thread.currentThread().isInterrupted() && (read >= 0))
{
read = is.read(buffer);
- if (read < 0) {
+ // this can only fail when the IS is closed, but in that
case, the read() above should have failed
+ // already...
+ totalBytes = is.getContentSize();
+
+ if (read >= 0) {
+ os.write(buffer, 0, read);
+ // update local administration...
+ bytesRead += read;
+ }
+ else {
break; // EOF...
}
- os.write(buffer, 0, read);
- // update local administration...
- bytesRead += read;
- totalBytes = is.getContentSize();
if (m_listener != null) {
m_listener.progress(bytesRead, totalBytes);
@@ -91,7 +97,9 @@ final class DownloadCallableImpl impleme
os.flush();
}
- boolean stoppedEarly = Thread.currentThread().isInterrupted() ||
(totalBytes > 0L && bytesRead < totalBytes);
+ boolean downloadComplete = (bytesRead == totalBytes);
+ boolean stoppedEarly = Thread.currentThread().isInterrupted() ||
!downloadComplete;
+
if (stoppedEarly) {
m_handle.logDebug("Download stopped early: %d of %d bytes
downloaded... (%d)", bytesRead, totalBytes, targetLength);
Modified:
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java?rev=1539985&r1=1539984&r2=1539985&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java
(original)
+++
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java
Fri Nov 8 11:27:35 2013
@@ -204,7 +204,12 @@ public class ContentRangeInputStreamTest
@Override
public int getResponseCode() throws IOException {
- return 206;
+ int[] connInfo = determineNextContent();
+ if (connInfo == null) {
+ return 500;
+ }
+ int len = connInfo[1] - connInfo[0];
+ return len > 0 ? 206 : 416;
}
private int[] determineNextContent() {
Modified:
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java?rev=1539985&r1=1539984&r2=1539985&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java
(original)
+++
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java
Fri Nov 8 11:27:35 2013
@@ -189,6 +189,26 @@ public class DownloadHandlerTest extends
assertSuccessful(handle.start(null), m_200digest);
}
+ @Test
+ public void testSuccessfulResumeAfterCompleteDownload() throws Exception {
+ DownloadHandler downloadHandler =
m_agentContext.getHandler(DownloadHandler.class);
+
+ final DownloadHandle handle = downloadHandler.getHandle(m_200url);
+ handle.discard();
+
+ Future<DownloadResult> future = handle.start(null);
+ DownloadResult result = future.get();
+ assertNotNull(result);
+ assertTrue(result.isComplete());
+ assertEquals(m_200digest, getDigest(result.getInputStream()));
+
+ Future<DownloadResult> future2 = handle.start(null);
+ DownloadResult result2 = future2.get();
+ assertNotNull(result2);
+ assertTrue(result2.isComplete());
+ assertEquals(m_200digest, getDigest(result2.getInputStream()));
+ }
+
private void assertIOException(Future<DownloadResult> future) throws
Exception {
try {
future.get(5, TimeUnit.SECONDS);