Author: jeagles Date: Tue May 13 16:40:15 2014 New Revision: 1594273 URL: http://svn.apache.org/r1594273 Log: HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn Sharp via jeagles)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1594273&r1=1594272&r2=1594273&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue May 13 16:40:15 2014 @@ -461,6 +461,9 @@ Release 2.5.0 - UNRELEASED HDFS-6367. EnumSetParam$Domain#parse fails for parameter containing more than one enum. (Yi Liu via umamahesh) + HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn + Sharp via jeagles) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1594273&r1=1594272&r2=1594273&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Tue May 13 16:40:15 2014 @@ -58,34 +58,8 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; -import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; -import org.apache.hadoop.hdfs.web.resources.AclPermissionParam; -import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; -import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; -import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam; -import org.apache.hadoop.hdfs.web.resources.CreateParentParam; -import org.apache.hadoop.hdfs.web.resources.DelegationParam; -import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; -import org.apache.hadoop.hdfs.web.resources.DestinationParam; -import org.apache.hadoop.hdfs.web.resources.DoAsParam; -import org.apache.hadoop.hdfs.web.resources.GetOpParam; -import org.apache.hadoop.hdfs.web.resources.GroupParam; -import org.apache.hadoop.hdfs.web.resources.HttpOpParam; -import org.apache.hadoop.hdfs.web.resources.LengthParam; -import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; -import org.apache.hadoop.hdfs.web.resources.OffsetParam; -import org.apache.hadoop.hdfs.web.resources.OverwriteParam; -import org.apache.hadoop.hdfs.web.resources.OwnerParam; -import org.apache.hadoop.hdfs.web.resources.Param; -import org.apache.hadoop.hdfs.web.resources.PermissionParam; -import org.apache.hadoop.hdfs.web.resources.PostOpParam; -import org.apache.hadoop.hdfs.web.resources.PutOpParam; -import org.apache.hadoop.hdfs.web.resources.RecursiveParam; -import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam; -import org.apache.hadoop.hdfs.web.resources.RenewerParam; -import org.apache.hadoop.hdfs.web.resources.ReplicationParam; -import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; -import org.apache.hadoop.hdfs.web.resources.UserParam; +import org.apache.hadoop.hdfs.web.resources.*; +import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -426,40 +400,23 @@ public class WebHdfsFileSystem extends F } /** - * Run a http operation. - * Connect to the http server, validate response, and obtain the JSON output. - * - * @param op http operation - * @param fspath file system path - * @param parameters parameters for the operation - * @return a JSON object, e.g. Object[], Map<?, ?>, etc. - * @throws IOException - */ - private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath, - final Param<?,?>... parameters) throws IOException { - return new FsPathRunner(op, fspath, parameters).run().json; - } - - /** * This class is for initialing a HTTP connection, connecting to server, * obtaining a response, and also handling retry on failures. */ - abstract class AbstractRunner { + abstract class AbstractRunner<T> { abstract protected URL getUrl() throws IOException; protected final HttpOpParam.Op op; private final boolean redirected; private boolean checkRetry; - protected HttpURLConnection conn = null; - private Map<?, ?> json = null; protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) { this.op = op; this.redirected = redirected; } - AbstractRunner run() throws IOException { + T run() throws IOException { UserGroupInformation connectUgi = ugi.getRealUser(); if (connectUgi == null) { connectUgi = ugi; @@ -471,9 +428,9 @@ public class WebHdfsFileSystem extends F // the entire lifecycle of the connection must be run inside the // doAs to ensure authentication is performed correctly return connectUgi.doAs( - new PrivilegedExceptionAction<AbstractRunner>() { + new PrivilegedExceptionAction<T>() { @Override - public AbstractRunner run() throws IOException { + public T run() throws IOException { return runWithRetry(); } }); @@ -481,18 +438,51 @@ public class WebHdfsFileSystem extends F throw new IOException(e); } } - - private void init() throws IOException { - checkRetry = !redirected; - URL url = getUrl(); - conn = (HttpURLConnection) connectionFactory.openConnection(url); - } - - private void connect() throws IOException { - connect(op.getDoOutput()); + + /** + * Two-step requests redirected to a DN + * + * Create/Append: + * Step 1) Submit a Http request with neither auto-redirect nor data. + * Step 2) Submit another Http request with the URL from the Location header with data. + * + * The reason of having two-step create/append is for preventing clients to + * send out the data before the redirect. This issue is addressed by the + * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. + * Unfortunately, there are software library bugs (e.g. Jetty 6 http server + * and Java 6 http client), which do not correctly implement "Expect: + * 100-continue". The two-step create/append is a temporary workaround for + * the software library bugs. + * + * Open/Checksum + * Also implements two-step connects for other operations redirected to + * a DN such as open and checksum + */ + private HttpURLConnection connect(URL url) throws IOException { + // resolve redirects for a DN operation unless already resolved + if (op.getRedirect() && !redirected) { + final HttpOpParam.Op redirectOp = + HttpOpParam.TemporaryRedirectOp.valueOf(op); + final HttpURLConnection conn = connect(redirectOp, url); + // application level proxy like httpfs might not issue a redirect + if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) { + return conn; + } + try { + validateResponse(redirectOp, conn, false); + url = new URL(conn.getHeaderField("Location")); + } finally { + conn.disconnect(); + } + } + return connect(op, url); } - private void connect(boolean doOutput) throws IOException { + private HttpURLConnection connect(final HttpOpParam.Op op, final URL url) + throws IOException { + final HttpURLConnection conn = + (HttpURLConnection)connectionFactory.openConnection(url); + final boolean doOutput = op.getDoOutput(); conn.setRequestMethod(op.getType().toString()); conn.setInstanceFollowRedirects(false); switch (op.getType()) { @@ -505,6 +495,10 @@ public class WebHdfsFileSystem extends F // explicitly setting content-length to 0 won't do spnego!! // opening and closing the stream will send "Content-Length: 0" conn.getOutputStream().close(); + } else { + conn.setRequestProperty("Content-Type", + MediaType.APPLICATION_OCTET_STREAM); + conn.setChunkedStreamingMode(32 << 10); //32kB-chunk } break; } @@ -514,16 +508,10 @@ public class WebHdfsFileSystem extends F } } conn.connect(); + return conn; } - private void disconnect() { - if (conn != null) { - conn.disconnect(); - conn = null; - } - } - - private AbstractRunner runWithRetry() throws IOException { + private T runWithRetry() throws IOException { /** * Do the real work. * @@ -541,15 +529,16 @@ public class WebHdfsFileSystem extends F * examines the exception and swallows it if it decides to rerun the work. */ for(int retry = 0; ; retry++) { + checkRetry = !redirected; + final URL url = getUrl(); try { - init(); - if (op.getDoOutput()) { - twoStepWrite(); - } else { - getResponse(op != GetOpParam.Op.OPEN); + final HttpURLConnection conn = connect(url); + // output streams will validate on close + if (!op.getDoOutput()) { + validateResponse(op, conn, false); } - return this; - } catch(IOException ioe) { + return getResponse(conn); + } catch (IOException ioe) { Throwable cause = ioe.getCause(); if (cause != null && cause instanceof AuthenticationException) { throw ioe; // no retries for auth failures @@ -591,87 +580,129 @@ public class WebHdfsFileSystem extends F throw toIOException(ioe); } - /** - * Two-step Create/Append: - * Step 1) Submit a Http request with neither auto-redirect nor data. - * Step 2) Submit another Http request with the URL from the Location header with data. - * - * The reason of having two-step create/append is for preventing clients to - * send out the data before the redirect. This issue is addressed by the - * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. - * Unfortunately, there are software library bugs (e.g. Jetty 6 http server - * and Java 6 http client), which do not correctly implement "Expect: - * 100-continue". The two-step create/append is a temporary workaround for - * the software library bugs. - */ - HttpURLConnection twoStepWrite() throws IOException { - //Step 1) Submit a Http request with neither auto-redirect nor data. - connect(false); - validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false); - final String redirect = conn.getHeaderField("Location"); - disconnect(); - checkRetry = false; - - //Step 2) Submit another Http request with the URL from the Location header with data. - conn = (HttpURLConnection) connectionFactory.openConnection(new URL( - redirect)); - conn.setRequestProperty("Content-Type", - MediaType.APPLICATION_OCTET_STREAM); - conn.setChunkedStreamingMode(32 << 10); //32kB-chunk - connect(); - return conn; + abstract T getResponse(HttpURLConnection conn) throws IOException; + } + + /** + * Abstract base class to handle path-based operations with params + */ + abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> { + private final Path fspath; + private final Param<?,?>[] parameters; + + AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath, + Param<?,?>... parameters) { + super(op, false); + this.fspath = fspath; + this.parameters = parameters; + } + + @Override + protected URL getUrl() throws IOException { + return toUrl(op, fspath, parameters); } + } - FSDataOutputStream write(final int bufferSize) throws IOException { - return WebHdfsFileSystem.this.write(op, conn, bufferSize); + /** + * Default path-based implementation expects no json response + */ + class FsPathRunner extends AbstractFsPathRunner<Void> { + FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) { + super(op, fspath, parameters); } + + @Override + Void getResponse(HttpURLConnection conn) throws IOException { + return null; + } + } - void getResponse(boolean getJsonAndDisconnect) throws IOException { + /** + * Handle path-based operations with a json response + */ + abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> { + FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath, + Param<?,?>... parameters) { + super(op, fspath, parameters); + } + + @Override + final T getResponse(HttpURLConnection conn) throws IOException { try { - connect(); - final int code = conn.getResponseCode(); - if (!redirected && op.getRedirect() - && code != op.getExpectedHttpResponseCode()) { - final String redirect = conn.getHeaderField("Location"); - json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), - conn, false); - disconnect(); - - checkRetry = false; - conn = (HttpURLConnection) connectionFactory.openConnection(new URL( - redirect)); - connect(); + final Map<?,?> json = jsonParse(conn, false); + if (json == null) { + // match exception class thrown by parser + throw new IllegalStateException("Missing response"); } - - json = validateResponse(op, conn, false); - if (json == null && getJsonAndDisconnect) { - json = jsonParse(conn, false); + return decodeResponse(json); + } catch (IOException ioe) { + throw ioe; + } catch (Exception e) { // catch json parser errors + final IOException ioe = + new IOException("Response decoding failure: "+e.toString(), e); + if (LOG.isDebugEnabled()) { + LOG.debug(ioe); } + throw ioe; } finally { - if (getJsonAndDisconnect) { - disconnect(); - } + conn.disconnect(); } } + + abstract T decodeResponse(Map<?,?> json) throws IOException; } - final class FsPathRunner extends AbstractRunner { - private final Path fspath; - private final Param<?, ?>[] parameters; - - FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) { - super(op, false); - this.fspath = fspath; - this.parameters = parameters; + /** + * Handle path-based operations with json boolean response + */ + class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> { + FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) { + super(op, fspath, parameters); } - + @Override - protected URL getUrl() throws IOException { - return toUrl(op, fspath, parameters); + Boolean decodeResponse(Map<?,?> json) throws IOException { + return (Boolean)json.get("boolean"); } } - final class URLRunner extends AbstractRunner { + /** + * Handle create/append output streams + */ + class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> { + private final int bufferSize; + + FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize, + Param<?,?>... parameters) { + super(op, fspath, parameters); + this.bufferSize = bufferSize; + } + + @Override + FSDataOutputStream getResponse(final HttpURLConnection conn) + throws IOException { + return new FSDataOutputStream(new BufferedOutputStream( + conn.getOutputStream(), bufferSize), statistics) { + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + try { + validateResponse(op, conn, true); + } finally { + conn.disconnect(); + } + } + } + }; + } + } + + /** + * Used by open() which tracks the resolved url itself + */ + final class URLRunner extends AbstractRunner<HttpURLConnection> { private final URL url; @Override protected URL getUrl() { @@ -682,6 +713,11 @@ public class WebHdfsFileSystem extends F super(op, redirected); this.url = url; } + + @Override + HttpURLConnection getResponse(HttpURLConnection conn) throws IOException { + return conn; + } } private FsPermission applyUMask(FsPermission permission) { @@ -693,8 +729,12 @@ public class WebHdfsFileSystem extends F private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS; - final Map<?, ?> json = run(op, f); - final HdfsFileStatus status = JsonUtil.toFileStatus(json, true); + HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) { + @Override + HdfsFileStatus decodeResponse(Map<?,?> json) { + return JsonUtil.toFileStatus(json, true); + } + }.run(); if (status == null) { throw new FileNotFoundException("File does not exist: " + f); } @@ -718,8 +758,12 @@ public class WebHdfsFileSystem extends F @Override public AclStatus getAclStatus(Path f) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS; - final Map<?, ?> json = run(op, f); - AclStatus status = JsonUtil.toAclStatus(json); + AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) { + @Override + AclStatus decodeResponse(Map<?,?> json) { + return JsonUtil.toAclStatus(json); + } + }.run(); if (status == null) { throw new FileNotFoundException("File does not exist: " + f); } @@ -730,9 +774,9 @@ public class WebHdfsFileSystem extends F public boolean mkdirs(Path f, FsPermission permission) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.MKDIRS; - final Map<?, ?> json = run(op, f, - new PermissionParam(applyUMask(permission))); - return (Boolean)json.get("boolean"); + return new FsPathBooleanRunner(op, f, + new PermissionParam(applyUMask(permission)) + ).run(); } /** @@ -743,17 +787,19 @@ public class WebHdfsFileSystem extends F ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK; - run(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()), - new CreateParentParam(createParent)); + new FsPathRunner(op, f, + new DestinationParam(makeQualified(destination).toUri().getPath()), + new CreateParentParam(createParent) + ).run(); } @Override public boolean rename(final Path src, final Path dst) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.RENAME; - final Map<?, ?> json = run(op, src, - new DestinationParam(makeQualified(dst).toUri().getPath())); - return (Boolean)json.get("boolean"); + return new FsPathBooleanRunner(op, src, + new DestinationParam(makeQualified(dst).toUri().getPath()) + ).run(); } @SuppressWarnings("deprecation") @@ -762,8 +808,10 @@ public class WebHdfsFileSystem extends F final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.RENAME; - run(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()), - new RenameOptionSetParam(options)); + new FsPathRunner(op, src, + new DestinationParam(makeQualified(dst).toUri().getPath()), + new RenameOptionSetParam(options) + ).run(); } @Override @@ -775,7 +823,9 @@ public class WebHdfsFileSystem extends F statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETOWNER; - run(op, p, new OwnerParam(owner), new GroupParam(group)); + new FsPathRunner(op, p, + new OwnerParam(owner), new GroupParam(group) + ).run(); } @Override @@ -783,7 +833,7 @@ public class WebHdfsFileSystem extends F ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION; - run(op, p, new PermissionParam(permission)); + new FsPathRunner(op, p,new PermissionParam(permission)).run(); } @Override @@ -791,7 +841,7 @@ public class WebHdfsFileSystem extends F throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES; - run(op, path, new AclPermissionParam(aclSpec)); + new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); } @Override @@ -799,21 +849,21 @@ public class WebHdfsFileSystem extends F throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES; - run(op, path, new AclPermissionParam(aclSpec)); + new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); } @Override public void removeDefaultAcl(Path path) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL; - run(op, path); + new FsPathRunner(op, path).run(); } @Override public void removeAcl(Path path) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL; - run(op, path); + new FsPathRunner(op, path).run(); } @Override @@ -821,7 +871,7 @@ public class WebHdfsFileSystem extends F throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETACL; - run(op, p, new AclPermissionParam(aclSpec)); + new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run(); } @Override @@ -829,8 +879,9 @@ public class WebHdfsFileSystem extends F ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION; - final Map<?, ?> json = run(op, p, new ReplicationParam(replication)); - return (Boolean)json.get("boolean"); + return new FsPathBooleanRunner(op, p, + new ReplicationParam(replication) + ).run(); } @Override @@ -838,7 +889,10 @@ public class WebHdfsFileSystem extends F ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETTIMES; - run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime)); + new FsPathRunner(op, p, + new ModificationTimeParam(mtime), + new AccessTimeParam(atime) + ).run(); } @Override @@ -853,32 +907,11 @@ public class WebHdfsFileSystem extends F DFSConfigKeys.DFS_REPLICATION_DEFAULT); } - FSDataOutputStream write(final HttpOpParam.Op op, - final HttpURLConnection conn, final int bufferSize) throws IOException { - return new FSDataOutputStream(new BufferedOutputStream( - conn.getOutputStream(), bufferSize), statistics) { - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - try { - validateResponse(op, conn, true); - } finally { - conn.disconnect(); - } - } - } - }; - } - @Override public void concat(final Path trg, final Path [] srcs) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.CONCAT; - - ConcatSourcesParam param = new ConcatSourcesParam(srcs); - run(op, trg, param); + new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run(); } @Override @@ -888,14 +921,13 @@ public class WebHdfsFileSystem extends F statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATE; - return new FsPathRunner(op, f, + return new FsPathOutputStreamRunner(op, f, bufferSize, new PermissionParam(applyUMask(permission)), new OverwriteParam(overwrite), new BufferSizeParam(bufferSize), new ReplicationParam(replication), - new BlockSizeParam(blockSize)) - .run() - .write(bufferSize); + new BlockSizeParam(blockSize) + ).run(); } @Override @@ -904,16 +936,17 @@ public class WebHdfsFileSystem extends F statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.APPEND; - return new FsPathRunner(op, f, new BufferSizeParam(bufferSize)) - .run() - .write(bufferSize); + return new FsPathOutputStreamRunner(op, f, bufferSize, + new BufferSizeParam(bufferSize) + ).run(); } @Override public boolean delete(Path f, boolean recursive) throws IOException { final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; - final Map<?, ?> json = run(op, f, new RecursiveParam(recursive)); - return (Boolean)json.get("boolean"); + return new FsPathBooleanRunner(op, f, + new RecursiveParam(recursive) + ).run(); } @Override @@ -945,7 +978,7 @@ public class WebHdfsFileSystem extends F final boolean resolved) throws IOException { final URL offsetUrl = offset == 0L? url : new URL(url + "&" + new OffsetParam(offset)); - return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn; + return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run(); } } @@ -1001,25 +1034,36 @@ public class WebHdfsFileSystem extends F statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS; - final Map<?, ?> json = run(op, f); - final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es"); - final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName()); - - //convert FileStatus - final FileStatus[] statuses = new FileStatus[array.length]; - for(int i = 0; i < array.length; i++) { - final Map<?, ?> m = (Map<?, ?>)array[i]; - statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f); - } - return statuses; + return new FsPathResponseRunner<FileStatus[]>(op, f) { + @Override + FileStatus[] decodeResponse(Map<?,?> json) { + final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es"); + final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName()); + + //convert FileStatus + final FileStatus[] statuses = new FileStatus[array.length]; + for (int i = 0; i < array.length; i++) { + final Map<?, ?> m = (Map<?, ?>)array[i]; + statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f); + } + return statuses; + } + }.run(); } @Override public Token<DelegationTokenIdentifier> getDelegationToken( final String renewer) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN; - final Map<?, ?> m = run(op, null, new RenewerParam(renewer)); - final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m); + Token<DelegationTokenIdentifier> token = + new FsPathResponseRunner<Token<DelegationTokenIdentifier>>( + op, null, new RenewerParam(renewer)) { + @Override + Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json) + throws IOException { + return JsonUtil.toDelegationToken(json); + } + }.run(); token.setService(tokenServiceName); return token; } @@ -1041,19 +1085,22 @@ public class WebHdfsFileSystem extends F public synchronized long renewDelegationToken(final Token<?> token ) throws IOException { final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN; - TokenArgumentParam dtargParam = new TokenArgumentParam( - token.encodeToUrlString()); - final Map<?, ?> m = run(op, null, dtargParam); - return (Long) m.get("long"); + return new FsPathResponseRunner<Long>(op, null, + new TokenArgumentParam(token.encodeToUrlString())) { + @Override + Long decodeResponse(Map<?,?> json) throws IOException { + return (Long) json.get("long"); + } + }.run(); } @Override public synchronized void cancelDelegationToken(final Token<?> token ) throws IOException { final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN; - TokenArgumentParam dtargParam = new TokenArgumentParam( - token.encodeToUrlString()); - run(op, null, dtargParam); + new FsPathRunner(op, null, + new TokenArgumentParam(token.encodeToUrlString()) + ).run(); } @Override @@ -1071,9 +1118,14 @@ public class WebHdfsFileSystem extends F statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; - final Map<?, ?> m = run(op, p, new OffsetParam(offset), - new LengthParam(length)); - return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m)); + return new FsPathResponseRunner<BlockLocation[]>(op, p, + new OffsetParam(offset), new LengthParam(length)) { + @Override + BlockLocation[] decodeResponse(Map<?,?> json) throws IOException { + return DFSUtil.locatedBlocks2Locations( + JsonUtil.toLocatedBlocks(json)); + } + }.run(); } @Override @@ -1081,8 +1133,12 @@ public class WebHdfsFileSystem extends F statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY; - final Map<?, ?> m = run(op, p); - return JsonUtil.toContentSummary(m); + return new FsPathResponseRunner<ContentSummary>(op, p) { + @Override + ContentSummary decodeResponse(Map<?,?> json) { + return JsonUtil.toContentSummary(json); + } + }.run(); } @Override @@ -1091,8 +1147,12 @@ public class WebHdfsFileSystem extends F statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM; - final Map<?, ?> m = run(op, p); - return JsonUtil.toMD5MD5CRC32FileChecksum(m); + return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) { + @Override + MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException { + return JsonUtil.toMD5MD5CRC32FileChecksum(json); + } + }.run(); } /** Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java?rev=1594273&r1=1594272&r2=1594273&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java Tue May 13 16:40:15 2014 @@ -102,7 +102,7 @@ public abstract class HttpOpParam<E exte @Override public boolean getDoOutput() { - return op.getDoOutput(); + return false; } @Override Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java?rev=1594273&r1=1594272&r2=1594273&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Tue May 13 16:40:15 2014 @@ -94,10 +94,4 @@ public class WebHdfsTestUtil { Assert.assertEquals(expectedResponseCode, conn.getResponseCode()); return WebHdfsFileSystem.jsonParse(conn, false); } - - public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs, - final HttpOpParam.Op op, final HttpURLConnection conn, - final int bufferSize) throws IOException { - return webhdfs.write(op, conn, bufferSize); - } }