Author: szetszwo Date: Wed Oct 19 21:40:20 2011 New Revision: 1186509 URL: http://svn.apache.org/viewvc?rev=1186509&view=rev Log: HDFS-2453. Fix http response code for partial content in webhdfs, added getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem and cleared content type in ExceptionHandler.
Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1186509&r1=1186508&r2=1186509&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Wed Oct 19 21:40:20 2011 @@ -61,6 +61,10 @@ Release 0.20.205.1 - unreleased file or creating a file without specifying the replication parameter. (szetszwo) + HDFS-2453. Fix http response code for partial content in webhdfs, added + getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem + and cleared content type in ExceptionHandler. (szetszwo) + Release 0.20.205.0 - 2011.10.06 NEW FEATURES Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1186509&r1=1186508&r2=1186509&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original) +++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java Wed Oct 19 21:40:20 2011 @@ -22,10 +22,13 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; +import java.net.MalformedURLException; import java.net.URL; +import java.util.StringTokenizer; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.hdfs.server.namenode.StreamFile; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; /** * To support HTTP byte streams, a new connection to an HTTP server needs to be @@ -42,6 +45,8 @@ public class ByteRangeInputStream extend */ static class URLOpener { protected URL url; + /** The url with offset parameter */ + private URL offsetUrl; public URLOpener(URL u) { url = u; @@ -54,12 +59,55 @@ public class ByteRangeInputStream extend public URL getURL() { return url; } - - public HttpURLConnection openConnection() throws IOException { - return (HttpURLConnection)url.openConnection(); + + HttpURLConnection openConnection() throws IOException { + return (HttpURLConnection)offsetUrl.openConnection(); + } + + private HttpURLConnection openConnection(final long offset) throws IOException { + offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset)); + final HttpURLConnection conn = openConnection(); + conn.setRequestMethod("GET"); + if (offset != 0L) { + conn.setRequestProperty("Range", "bytes=" + offset + "-"); + } + return conn; } } + static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "="; + + /** Remove offset parameter, if there is any, from the url */ + static URL removeOffsetParam(final URL url) throws MalformedURLException { + String query = url.getQuery(); + if (query == null) { + return url; + } + final String lower = query.toLowerCase(); + if (!lower.startsWith(OFFSET_PARAM_PREFIX) + && !lower.contains("&" + OFFSET_PARAM_PREFIX)) { + return url; + } + + //rebuild query + StringBuilder b = null; + for(final StringTokenizer st = new StringTokenizer(query, "&"); + st.hasMoreTokens();) { + final String token = st.nextToken(); + if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) { + if (b == null) { + b = new StringBuilder("?").append(token); + } else { + b.append('&').append(token); + } + } + } + query = b == null? "": b.toString(); + + final String urlStr = url.toString(); + return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query); + } + enum StreamStatus { NORMAL, SEEK } @@ -95,12 +143,8 @@ public class ByteRangeInputStream extend final URLOpener opener = (resolvedURL.getURL() == null) ? originalURL : resolvedURL; - final HttpURLConnection connection = opener.openConnection(); + final HttpURLConnection connection = opener.openConnection(startPos); try { - connection.setRequestMethod("GET"); - if (startPos != 0) { - connection.setRequestProperty("Range", "bytes="+startPos+"-"); - } connection.connect(); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); filelength = (cl == null) ? -1 : Long.parseLong(cl); @@ -125,7 +169,7 @@ public class ByteRangeInputStream extend throw new IOException("HTTP_OK expected, received " + respCode); } - resolvedURL.setURL(connection.getURL()); + resolvedURL.setURL(removeOffsetParam(connection.getURL())); status = StreamStatus.NORMAL; } Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1186509&r1=1186508&r2=1186509&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original) +++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Wed Oct 19 21:40:20 2011 @@ -100,7 +100,7 @@ public class DatanodeWebHdfsMethods { final ReplicationParam replication, @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT) final BlockSizeParam blockSize - ) throws IOException, URISyntaxException, InterruptedException { + ) throws IOException, InterruptedException { if (LOG.isTraceEnabled()) { LOG.trace(op + ": " + path + ", ugi=" + ugi @@ -156,7 +156,7 @@ public class DatanodeWebHdfsMethods { final PostOpParam op, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize - ) throws IOException, URISyntaxException, InterruptedException { + ) throws IOException, InterruptedException { if (LOG.isTraceEnabled()) { LOG.trace(op + ": " + path + ", ugi=" + ugi @@ -209,7 +209,7 @@ public class DatanodeWebHdfsMethods { final LengthParam length, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize - ) throws IOException, URISyntaxException, InterruptedException { + ) throws IOException, InterruptedException { if (LOG.isTraceEnabled()) { LOG.trace(op + ": " + path + ", ugi=" + ugi @@ -248,7 +248,11 @@ public class DatanodeWebHdfsMethods { } } }; - return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build(); + + final int status = offset.getValue() == 0? + HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT; + return Response.status(status).entity(streaming).type( + MediaType.APPLICATION_OCTET_STREAM).build(); } case GETFILECHECKSUM: { Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1186509&r1=1186508&r2=1186509&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original) +++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Oct 19 21:40:20 2011 @@ -31,6 +31,8 @@ import java.net.URL; import java.security.PrivilegedExceptionAction; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; @@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.ByteRangeI import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -91,6 +94,7 @@ import org.mortbay.util.ajax.JSON; /** A FileSystem for HDFS over the web. */ public class WebHdfsFileSystem extends FileSystem implements DelegationTokenRenewer.Renewable { + public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class); /** File System URI: {SCHEME}://namenode:port/path/to/file */ public static final String SCHEME = "webhdfs"; /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */ @@ -413,6 +417,17 @@ public class WebHdfsFileSystem extends F run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime)); } + @Override + public long getDefaultBlockSize() { + return getConf().getLong("dfs.block.size", FSConstants.DEFAULT_BLOCK_SIZE); + } + + @Override + public short getDefaultReplication() { + return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + } + private FSDataOutputStream write(final HttpOpParam.Op op, final HttpURLConnection conn, final int bufferSize) throws IOException { return new FSDataOutputStream(new BufferedOutputStream( Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java?rev=1186509&r1=1186508&r2=1186509&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java (original) +++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java Wed Oct 19 21:40:20 2011 @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.web.resou import java.io.FileNotFoundException; import java.io.IOException; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; @@ -36,12 +38,17 @@ import com.sun.jersey.api.ParamException public class ExceptionHandler implements ExceptionMapper<Exception> { public static final Log LOG = LogFactory.getLog(ExceptionHandler.class); + private @Context HttpServletResponse response; + @Override public Response toResponse(Exception e) { if (LOG.isTraceEnabled()) { LOG.trace("GOT EXCEPITION", e); } + //clear content type + response.setContentType(null); + //Convert exception if (e instanceof ParamException) { final ParamException paramexception = (ParamException)e; Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1186509&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (added) +++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Wed Oct 19 21:40:20 2011 @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; + +import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener; +import org.junit.Test; + +class MockHttpURLConnection extends HttpURLConnection { + public MockHttpURLConnection(URL u) { + super(u); + } + + @Override + public boolean usingProxy(){ + return false; + } + + @Override + public void disconnect() { + } + + @Override + public void connect() { + } + + @Override + public InputStream getInputStream() throws IOException { + return new ByteArrayInputStream("asdf".getBytes()); + } + + @Override + public URL getURL() { + URL u = null; + try { + u = new URL("http://resolvedurl/"); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + return u; + } + + @Override + public int getResponseCode() { + if (responseCode != -1) { + return responseCode; + } else { + if (getRequestProperty("Range") == null) { + return 200; + } else { + return 206; + } + } + } + + public void setResponseCode(int resCode) { + responseCode = resCode; + } +} + +public class TestByteRangeInputStream { + @Test + public void testRemoveOffset() throws IOException { + { //no offset + String s = "http://test/Abc?Length=99"; + assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //no parameters + String s = "http://test/Abc"; + assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as first parameter + String s = "http://test/Abc?offset=10&Length=99"; + assertEquals("http://test/Abc?Length=99", + ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as second parameter + String s = "http://test/Abc?op=read&OFFset=10&Length=99"; + assertEquals("http://test/Abc?op=read&Length=99", + ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as last parameter + String s = "http://test/Abc?Length=99&offset=10"; + assertEquals("http://test/Abc?Length=99", + ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as the only parameter + String s = "http://test/Abc?offset=10"; + assertEquals("http://test/Abc", + ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + } + + @Test + public void testByteRange() throws IOException { + URLOpener ospy = spy(new URLOpener(new URL("http://test/"))); + doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy) + .openConnection(); + URLOpener rspy = spy(new URLOpener((URL) null)); + doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy) + .openConnection(); + ByteRangeInputStream is = new ByteRangeInputStream(ospy, rspy); + + assertEquals("getPos wrong", 0, is.getPos()); + + is.read(); + + assertNull("Initial call made incorrectly (Range Check)", ospy + .openConnection().getRequestProperty("Range")); + + assertEquals("getPos should be 1 after reading one byte", 1, is.getPos()); + + is.read(); + + assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos()); + + // No additional connections should have been made (no seek) + + rspy.setURL(new URL("http://resolvedurl/")); + + is.seek(100); + is.read(); + + assertEquals("Seek to 100 bytes made incorrectly (Range Check)", + "bytes=100-", rspy.openConnection().getRequestProperty("Range")); + + assertEquals("getPos should be 101 after reading one byte", 101, + is.getPos()); + + verify(rspy, times(2)).openConnection(); + + is.seek(101); + is.read(); + + verify(rspy, times(2)).openConnection(); + + // Seek to 101 should not result in another request" + + is.seek(2500); + is.read(); + + assertEquals("Seek to 2500 bytes made incorrectly (Range Check)", + "bytes=2500-", rspy.openConnection().getRequestProperty("Range")); + + ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200); + is.seek(500); + + try { + is.read(); + fail("Exception should be thrown when 200 response is given " + + "but 206 is expected"); + } catch (IOException e) { + assertEquals("Should fail because incorrect response code was sent", + "HTTP_PARTIAL expected, received 200", e.getMessage()); + } + + ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206); + is.seek(0); + + try { + is.read(); + fail("Exception should be thrown when 206 response is given " + + "but 200 is expected"); + } catch (IOException e) { + assertEquals("Should fail because incorrect response code was sent", + "HTTP_OK expected, received 206", e.getMessage()); + } + } +} Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java?rev=1186509&r1=1186508&r2=1186509&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java (original) +++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java Wed Oct 19 21:40:20 2011 @@ -129,6 +129,7 @@ public class TestWebHdfsFileSystemContra //For WebHdfsFileSystem, //disable testListStatusReturnsNullForNonExistentFile //and add testListStatusThrowsExceptionForNonExistentFile below. + @Override public void testListStatusReturnsNullForNonExistentFile() {} public void testListStatusThrowsExceptionForNonExistentFile() throws Exception { @@ -140,6 +141,8 @@ public class TestWebHdfsFileSystemContra } } + //the following are new tests (i.e. not over-riding the super class methods) + public void testGetFileBlockLocations() throws IOException { final String f = "/test/testGetFileBlockLocations"; final Path p = path(f); @@ -192,4 +195,45 @@ public class TestWebHdfsFileSystemContra WebHdfsFileSystem.LOG.info("This is expected.", fnfe); } } + + public void testSeek() throws IOException { + final Path p = new Path("/test/testSeek"); + createFile(p); + + final int one_third = data.length/3; + final int two_third = one_third*2; + + { //test seek + final int offset = one_third; + final int len = data.length - offset; + final byte[] buf = new byte[len]; + + final FSDataInputStream in = fs.open(p); + in.seek(offset); + + //read all remaining data + in.readFully(buf); + in.close(); + + for (int i = 0; i < buf.length; i++) { + assertEquals("Position " + i + ", offset=" + offset + ", length=" + len, + data[i + offset], buf[i]); + } + } + + { //test position read (read the data after the two_third location) + final int offset = two_third; + final int len = data.length - offset; + final byte[] buf = new byte[len]; + + final FSDataInputStream in = fs.open(p); + in.readFully(offset, buf); + in.close(); + + for (int i = 0; i < buf.length; i++) { + assertEquals("Position " + i + ", offset=" + offset + ", length=" + len, + data[i + offset], buf[i]); + } + } + } }