Repository: hadoop
Updated Branches:
refs/heads/branch-2 029b6fbe7 -> 80e664084
HDFS-11156. Add new op GETFILEBLOCKLOCATIONS to WebHDFS REST API. Contributed
by Weiwei Yang.
(cherry picked from commit 7fcc73fc0d248aae1edbd4e1514c5818f6198928)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/80e66408
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/80e66408
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/80e66408
Branch: refs/heads/branch-2
Commit: 80e664084894bbff9b1cc3102e71b161873fb6ef
Parents: 029b6fb
Author: Andrew Wang <[email protected]>
Authored: Tue Jan 3 09:58:00 2017 -0800
Committer: Andrew Wang <[email protected]>
Committed: Tue Jan 3 09:58:45 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/hdfs/web/JsonUtilClient.java | 53 ++++
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 62 ++++-
.../hadoop/hdfs/web/resources/GetOpParam.java | 12 +-
.../web/resources/NamenodeWebHdfsMethods.java | 16 ++
.../org/apache/hadoop/hdfs/web/JsonUtil.java | 33 +++
.../hadoop-hdfs/src/site/markdown/WebHDFS.md | 195 ++++++++++++-
.../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 276 +++++++++++++++++++
7 files changed, 640 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/80e66408/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
index a6bf3f4..176b640 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.web;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.ContentSummary.Builder;
import org.apache.hadoop.fs.FileChecksum;
@@ -637,4 +638,56 @@ class JsonUtilClient {
}
}
+ static BlockLocation[] toBlockLocationArray(Map<?, ?> json)
+ throws IOException{
+ final Map<?, ?> rootmap =
+ (Map<?, ?>)json.get(BlockLocation.class.getSimpleName() + "s");
+ final List<?> array = JsonUtilClient.getList(rootmap,
+ BlockLocation.class.getSimpleName());
+
+ Preconditions.checkNotNull(array);
+ final BlockLocation[] locations = new BlockLocation[array.size()];
+ int i = 0;
+ for (Object object : array) {
+ final Map<?, ?> m = (Map<?, ?>) object;
+ locations[i++] = JsonUtilClient.toBlockLocation(m);
+ }
+ return locations;
+ }
+
+ /** Convert a Json map to BlockLocation. **/
+ static BlockLocation toBlockLocation(Map<?, ?> m)
+ throws IOException{
+ if(m == null) {
+ return null;
+ }
+
+ long length = ((Number) m.get("length")).longValue();
+ long offset = ((Number) m.get("offset")).longValue();
+ boolean corrupt = Boolean.
+ getBoolean(m.get("corrupt").toString());
+ String[] storageIds = toStringArray(getList(m, "storageIds"));
+ String[] cachedHosts = toStringArray(getList(m, "cachedHosts"));
+ String[] hosts = toStringArray(getList(m, "hosts"));
+ String[] names = toStringArray(getList(m, "names"));
+ String[] topologyPaths = toStringArray(getList(m, "topologyPaths"));
+ StorageType[] storageTypes = toStorageTypeArray(
+ getList(m, "storageTypes"));
+ return new BlockLocation(names, hosts, cachedHosts,
+ topologyPaths, storageIds, storageTypes,
+ offset, length, corrupt);
+ }
+
+ static String[] toStringArray(List<?> list) {
+ if (list == null) {
+ return null;
+ } else {
+ final String[] array = new String[list.size()];
+ int i = 0;
+ for (Object object : list) {
+ array[i++] = object.toString();
+ }
+ return array;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/80e66408/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index 1607be9..ba7f0cc 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -1610,14 +1610,68 @@ public class WebHdfsFileSystem extends FileSystem
final long offset, final long length) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
+ BlockLocation[] locations = null;
+ try {
+ locations = getFileBlockLocations(
+ GetOpParam.Op.GETFILEBLOCKLOCATIONS,
+ p, offset, length);
+ } catch (RemoteException e) {
+ // See the error message from ExceptionHandle
+ if(e.getMessage() != null &&
+ e.getMessage().contains(
+ "Invalid value for webhdfs parameter") &&
+ e.getMessage().contains(
+ GetOpParam.Op.GETFILEBLOCKLOCATIONS.toString())) {
+ // Old webhdfs server doesn't support GETFILEBLOCKLOCATIONS
+ // operation, fall back to query again using old API
+ // GET_BLOCK_LOCATIONS.
+ LOG.info("Invalid webhdfs operation parameter "
+ + GetOpParam.Op.GETFILEBLOCKLOCATIONS + ". Fallback to use "
+ + GetOpParam.Op.GET_BLOCK_LOCATIONS + " instead.");
+ locations = getFileBlockLocations(
+ GetOpParam.Op.GET_BLOCK_LOCATIONS,
+ p, offset, length);
+ }
+ }
+ return locations;
+ }
- final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
- return new FsPathResponseRunner<BlockLocation[]>(op, p,
+ /**
+ * Get file block locations implementation. Provide a operation
+ * parameter to determine how to get block locations from a webhdfs
+ * server. Older server only supports <b>GET_BLOCK_LOCATIONS</b> but
+ * not <b>GETFILEBLOCKLOCATIONS</b>.
+ *
+ * @param path path to the file
+ * @param offset start offset in the given file
+ * @param length of the file to get locations for
+ * @param operation
+ * Valid operation is either
+ * {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op
+ * #GET_BLOCK_LOCATIONS} or
+ * {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op
+ * #GET_BLOCK_LOCATIONS}
+ * @throws IOException
+ * Http connection error, decoding error or given
+ * operation is not valid
+ */
+ @VisibleForTesting
+ protected BlockLocation[] getFileBlockLocations(
+ GetOpParam.Op operation, final Path path,
+ final long offset, final long length) throws IOException {
+ return new FsPathResponseRunner<BlockLocation[]>(operation, path,
new OffsetParam(offset), new LengthParam(length)) {
@Override
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
- return DFSUtilClient.locatedBlocks2Locations(
- JsonUtilClient.toLocatedBlocks(json));
+ switch(operation) {
+ case GETFILEBLOCKLOCATIONS:
+ return JsonUtilClient.toBlockLocationArray(json);
+ case GET_BLOCK_LOCATIONS:
+ return DFSUtilClient.locatedBlocks2Locations(
+ JsonUtilClient.toLocatedBlocks(json));
+ default :
+ throw new IOException("Unknown operation " + operation.name());
+ }
}
}.run();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/80e66408/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
index 9169ca8..1321bf6 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
@@ -33,8 +33,18 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
- /** GET_BLOCK_LOCATIONS is a private unstable op. */
+ /**
+ * GET_BLOCK_LOCATIONS is a private/stable API op. It returns a
+ * {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks}
+ * json object.
+ */
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
+ /**
+ * GETFILEBLOCKLOCATIONS is the public op that complies with
+ * {@link org.apache.hadoop.fs.FileSystem#getFileBlockLocations}
+ * interface.
+ */
+ GETFILEBLOCKLOCATIONS(false, HttpURLConnection.HTTP_OK),
GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
GETXATTRS(false, HttpURLConnection.HTTP_OK),
GETTRASHROOT(false, HttpURLConnection.HTTP_OK),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/80e66408/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
index d838bb8..8fe1619 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
@@ -54,6 +54,7 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -948,6 +949,21 @@ public class NamenodeWebHdfsMethods {
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
+ case GETFILEBLOCKLOCATIONS:
+ {
+ final long offsetValue = offset.getValue();
+ final Long lengthValue = length.getValue();
+
+ FileSystem fs = FileSystem.get(conf != null ?
+ conf : new Configuration());
+ BlockLocation[] locations = fs.getFileBlockLocations(
+ new org.apache.hadoop.fs.Path(fullpath),
+ offsetValue,
+ lengthValue != null? lengthValue: Long.MAX_VALUE);
+ final String js = JsonUtil.toJsonString("BlockLocations",
+ JsonUtil.toJsonMap(locations));
+ return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+ }
case GET_BLOCK_LOCATIONS:
{
final long offsetValue = offset.getValue();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/80e66408/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
index da8d01e..db0148d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
@@ -463,4 +463,37 @@ public class JsonUtil {
public static String toJsonString(BlockStoragePolicy storagePolicy) {
return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy));
}
+
+ public static Map<String, Object> toJsonMap(BlockLocation[] locations)
+ throws IOException {
+ if(locations == null) {
+ return null;
+ }
+ final Map<String, Object> m = new TreeMap<String, Object>();
+ Object[] blockLocations = new Object[locations.length];
+ for(int i=0; i<locations.length; i++) {
+ blockLocations[i] = toJsonMap(locations[i]);
+ }
+ m.put(BlockLocation.class.getSimpleName(), blockLocations);
+ return m;
+ }
+
+ public static Map<String, Object> toJsonMap(
+ final BlockLocation blockLocation) throws IOException {
+ if (blockLocation == null) {
+ return null;
+ }
+
+ final Map<String, Object> m = new TreeMap<String, Object>();
+ m.put("length", blockLocation.getLength());
+ m.put("offset", blockLocation.getOffset());
+ m.put("corrupt", blockLocation.isCorrupt());
+ m.put("storageTypes", toJsonArray(blockLocation.getStorageTypes()));
+ m.put("storageIds", blockLocation.getStorageIds());
+ m.put("cachedHosts", blockLocation.getCachedHosts());
+ m.put("hosts", blockLocation.getHosts());
+ m.put("names", blockLocation.getNames());
+ m.put("topologyPaths", blockLocation.getTopologyPaths());
+ return m;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/80e66408/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
index 7c08bcb..968b65d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
@@ -38,6 +38,7 @@ WebHDFS REST API
* [Status of a File/Directory](#Status_of_a_FileDirectory)
* [List a Directory](#List_a_Directory)
* [Iteratively List a Directory](#Iteratively_List_a_Directory)
+ * [Get File Block Locations](#Get_File_Block_Locations)
* [Other File System Operations](#Other_File_System_Operations)
* [Get Content Summary of a
Directory](#Get_Content_Summary_of_a_Directory)
* [Get File Checksum](#Get_File_Checksum)
@@ -97,6 +98,9 @@ WebHDFS REST API
* [BlockStoragePolicy JSON Schema](#BlockStoragePolicy_JSON_Schema)
* [BlockStoragePolicy Properties](#BlockStoragePolicy_Properties)
* [BlockStoragePolicies JSON Schema](#BlockStoragePolicies_JSON_Schema)
+ * [BlockLocation JSON Schema](#BlockLocation_JSON_Schema)
+ * [BlockLocation Properties](#BlockLocation_Properties)
+ * [BlockLocations JSON Schema](#BlockLocations_JSON_Schema)
* [HTTP Query Parameter Dictionary](#HTTP_Query_Parameter_Dictionary)
* [ACL Spec](#ACL_Spec)
* [XAttr Name](#XAttr_Name)
@@ -167,6 +171,7 @@ The HTTP REST API supports the complete
[FileSystem](../../api/org/apache/hadoop
* [`CHECKACCESS`](#Check_access) (see
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access)
* [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies)
* [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy)
+ * [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations) (see
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations)
* HTTP PUT
* [`CREATE`](#Create_and_Write_to_a_File) (see
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
* [`MKDIRS`](#Make_a_Directory) (see
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
@@ -1142,7 +1147,7 @@ See also:
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
{
"BlockStoragePolicy": {
"copyOnCreateFile": false,
- "creationFallbacks": [],
+ "creationFallbacks": [],
"id":7,
"name":"HOT",
"replicationFallbacks":["ARCHIVE"],
@@ -1152,6 +1157,51 @@ See also:
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
See also:
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy
+### Get File Block Locations
+
+* Submit a HTTP GET request.
+
+ curl -i
"http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILEBLOCKLOCATIONS
+
+ The client receives a response with a [`BlockLocations` JSON
Object](#Block_Locations_JSON_Schema):
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+ Transfer-Encoding: chunked
+
+ {
+ "BlockLocations" :
+ {
+ "BlockLocation":
+ [
+ {
+ "cachedHosts" : [],
+ "corrupt" : false,
+ "hosts" : ["host"],
+ "length" : 134217728, // length of
this block
+ "names" : ["host:ip"],
+ "offset" : 0, // offset of
the block in the file
+ "storageIds" : ["storageid"],
+ "storageTypes" : ["DISK"], // enum
{RAM_DISK, SSD, DISK, ARCHIVE}
+ "topologyPaths" : ["/default-rack/hostname:ip"]
+ }, {
+ "cachedHosts" : [],
+ "corrupt" : false,
+ "hosts" : ["host"],
+ "length" : 62599364,
+ "names" : ["host:ip"],
+ "offset" : 134217728,
+ "storageIds" : ["storageid"],
+ "storageTypes" : ["DISK"],
+ "topologyPaths" : ["/default-rack/hostname:ip"]
+ },
+ ...
+ ]
+ }
+ }
+
+See also: [`offset`](#Offset), [`length`](#Length),
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations
+
Extended Attributes(XAttrs) Operations
--------------------------------------
@@ -2109,6 +2159,147 @@ A `BlockStoragePolicies` JSON object represents an
array of `BlockStoragePolicy`
}
}
```
+
+#### BlockLocations JSON Schema
+
+A `BlockLocations` JSON object represents an array of `BlockLocation` JSON
objects.
+
+```json
+{
+ "name" : "BlockLocations",
+ "properties":
+ {
+ "BlockLocations":
+ {
+ "type" : "object",
+ "properties":
+ {
+ "BlockLocation":
+ {
+ "description": "An array of BlockLocation",
+ "type" : "array",
+ "items" : blockLocationProperties //See BlockLocation
Properties
+ }
+ }
+ }
+ }
+}
+```
+
+See also [`BlockLocation` Properties](#BlockLocation_Properties),
[`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations),
[BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
+
+### BlockLocation JSON Schema
+
+```json
+{
+ "name" : "BlockLocation",
+ "properties":
+ {
+ "BlockLocation": blockLocationProperties //See BlockLocation
Properties
+ }
+}
+```
+
+See also [`BlockLocation` Properties](#BlockLocation_Properties),
[`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations),
[BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
+
+#### BlockLocation Properties
+
+JavaScript syntax is used to define `blockLocationProperties` so that it can
be referred in both `BlockLocation` and `BlockLocations` JSON schemas.
+
+```javascript
+var blockLocationProperties =
+{
+ "type" : "object",
+ "properties":
+ {
+ "cachedHosts":
+ {
+ "description": "Datanode hostnames with a cached replica",
+ "type" : "array",
+ "required" : "true",
+ "items" :
+ {
+ "description": "A datanode hostname",
+ "type" : "string"
+ }
+ },
+ "corrupt":
+ {
+ "description": "True if the block is corrupted",
+ "type" : "boolean",
+ "required" : "true"
+ },
+ "hosts":
+ {
+ "description": "Datanode hostnames store the block",
+ "type" : "array",
+ "required" : "true",
+ "items" :
+ {
+ "description": "A datanode hostname",
+ "type" : "string"
+ }
+ },
+ "length":
+ {
+ "description": "Length of the block",
+ "type" : "integer",
+ "required" : "true"
+ },
+ "names":
+ {
+ "description": "Datanode IP:xferPort for accessing the block",
+ "type" : "array",
+ "required" : "true",
+ "items" :
+ {
+ "description": "DatanodeIP:xferPort",
+ "type" : "string"
+ }
+ },
+ "offset":
+ {
+ "description": "Offset of the block in the file",
+ "type" : "integer",
+ "required" : "true"
+ },
+ "storageIds":
+ {
+ "description": "Storage ID of each replica",
+ "type" : "array",
+ "required" : "true",
+ "items" :
+ {
+ "description": "Storage ID",
+ "type" : "string"
+ }
+ },
+ "storageTypes":
+ {
+ "description": "Storage type of each replica",
+ "type" : "array",
+ "required" : "true",
+ "items" :
+ {
+ "description": "Storage type",
+ "enum" : ["RAM_DISK", "SSD", "DISK", "ARCHIVE"]
+ }
+ },
+ "topologyPaths":
+ {
+ "description": "Datanode addresses in network topology",
+ "type" : "array",
+ "required" : "true",
+ "items" :
+ {
+ "description": "/rack/host:ip",
+ "type" : "string"
+ }
+ }
+ }
+};
+```
+
HTTP Query Parameter Dictionary
-------------------------------
@@ -2541,4 +2732,4 @@ See also: [`SETSTORAGEPOLICY`](#Set_Storage_Policy)
| Valid Values | Any valid file/directory name. |
| Syntax | Any string. |
-See also: [`LISTSTATUS_BATCH`](#Iteratively_List_a_Directory)
\ No newline at end of file
+See also: [`LISTSTATUS_BATCH`](#Iteratively_List_a_Directory)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/80e66408/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
index 638decc..c9276be 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
@@ -29,6 +29,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketException;
@@ -38,8 +39,16 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -66,7 +75,11 @@ import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@@ -77,6 +90,8 @@ import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.Param;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.HttpServerFunctionalTest;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
@@ -92,8 +107,12 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@@ -850,6 +869,76 @@ public class TestWebHDFS {
Assert.assertTrue(storageTypes != null && storageTypes.length > 0 &&
storageTypes[0] == StorageType.DISK);
}
+
+ // Query webhdfs REST API to get block locations
+ InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
+
+ // Case 1
+ // URL without length or offset parameters
+ URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url1);
+
+ String response1 = getResponse(url1, "GET");
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response1);
+ // Parse BlockLocation array from json output using object mapper
+ BlockLocation[] locationArray1 = toBlockLocationArray(response1);
+
+ // Verify the result from rest call is same as file system api
+ verifyEquals(locations, locationArray1);
+
+ // Case 2
+ // URL contains length and offset parameters
+ URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ + "&length=" + LENGTH + "&offset=" + OFFSET);
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url2);
+
+ String response2 = getResponse(url2, "GET");
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response2);
+ BlockLocation[] locationArray2 = toBlockLocationArray(response2);
+
+ verifyEquals(locations, locationArray2);
+
+ // Case 3
+ // URL contains length parameter but without offset parameters
+ URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ + "&length=" + LENGTH);
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url3);
+
+ String response3 = getResponse(url3, "GET");
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response3);
+ BlockLocation[] locationArray3 = toBlockLocationArray(response3);
+
+ verifyEquals(locations, locationArray3);
+
+ // Case 4
+ // URL contains offset parameter but without length parameter
+ URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ + "&offset=" + OFFSET);
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url4);
+
+ String response4 = getResponse(url4, "GET");
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response4);
+ BlockLocation[] locationArray4 = toBlockLocationArray(response4);
+
+ verifyEquals(locations, locationArray4);
+
+ // Case 5
+ // URL specifies offset exceeds the file length
+ URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ + "&offset=1200");
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url5);
+
+ String response5 = getResponse(url5, "GET");
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response5);
+ BlockLocation[] locationArray5 = toBlockLocationArray(response5);
+
+ // Expected an empty array of BlockLocation
+ verifyEquals(new BlockLocation[] {}, locationArray5);
} finally {
if (cluster != null) {
cluster.shutdown();
@@ -857,6 +946,66 @@ public class TestWebHDFS {
}
}
+ private BlockLocation[] toBlockLocationArray(String json)
+ throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ MapType subType = mapper.getTypeFactory().constructMapType(
+ Map.class,
+ String.class,
+ BlockLocation[].class);
+ MapType rootType = mapper.getTypeFactory().constructMapType(
+ Map.class,
+ mapper.constructType(String.class),
+ mapper.constructType(subType));
+
+ Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
+ .readValue(json, rootType);
+ Map<String, BlockLocation[]> locationMap = jsonMap
+ .get("BlockLocations");
+ BlockLocation[] locationArray = locationMap.get(
+ BlockLocation.class.getSimpleName());
+ return locationArray;
+ }
+
+ private void verifyEquals(BlockLocation[] locations1,
+ BlockLocation[] locations2) throws IOException {
+ for(int i=0; i<locations1.length; i++) {
+ BlockLocation location1 = locations1[i];
+ BlockLocation location2 = locations2[i];
+ Assert.assertEquals(location1.getLength(),
+ location2.getLength());
+ Assert.assertEquals(location1.getOffset(),
+ location2.getOffset());
+ Assert.assertArrayEquals(location1.getCachedHosts(),
+ location2.getCachedHosts());
+ Assert.assertArrayEquals(location1.getHosts(),
+ location2.getHosts());
+ Assert.assertArrayEquals(location1.getNames(),
+ location2.getNames());
+ Assert.assertArrayEquals(location1.getStorageIds(),
+ location2.getStorageIds());
+ Assert.assertArrayEquals(location1.getTopologyPaths(),
+ location2.getTopologyPaths());
+ Assert.assertArrayEquals(location1.getStorageTypes(),
+ location2.getStorageTypes());
+ }
+ }
+
+ private static String getResponse(URL url, String httpRequestType)
+ throws IOException {
+ HttpURLConnection conn = null;
+ try {
+ conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod(httpRequestType);
+ conn.setInstanceFollowRedirects(false);
+ return IOUtils.toString(conn.getInputStream());
+ } finally {
+ if(conn != null) {
+ conn.disconnect();
+ }
+ }
+ }
+
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
final URI uri, final String userName) throws Exception {
@@ -1212,4 +1361,131 @@ public class TestWebHDFS {
}
}
}
+
+ /**
+ * A mock class to handle the {@link WebHdfsFileSystem} client
+ * request. The format of the response depends on how many of
+ * times it gets called (1 to 3 times).
+ * <p>
+ * First time call it return a wrapped json response with a
+ * IllegalArgumentException
+ * <p>
+ * Second time call it return a valid GET_BLOCK_LOCATIONS
+ * json response
+ * <p>
+ * Third time call it return a wrapped json response with
+ * a random IOException
+ *
+ */
+ public static class MockWebHdfsServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 1L;
+ private static int respondTimes = 0;
+ private static final String RANDOM_EXCEPTION_MSG =
+ "This is a random exception";
+
+ @Override
+ public void doGet(HttpServletRequest request,
+ HttpServletResponse response) throws ServletException, IOException {
+ response.setHeader("Content-Type",
+ MediaType.APPLICATION_JSON);
+ String param = request.getParameter("op");
+ if(respondTimes == 0) {
+ Exception mockException = new IllegalArgumentException(
+ "Invalid value for webhdfs parameter \"op\". "
+ + "" + "No enum constant " + param);
+ sendException(request, response, mockException);
+ } else if (respondTimes == 1) {
+ sendResponse(request, response);
+ } else if (respondTimes == 2) {
+ Exception mockException = new IOException(RANDOM_EXCEPTION_MSG);
+ sendException(request, response, mockException);
+ }
+ respondTimes++;
+ }
+
+ private void sendResponse(HttpServletRequest request,
+ HttpServletResponse response) throws IOException {
+ response.setStatus(HttpServletResponse.SC_OK);
+ // Construct a LocatedBlock for testing
+ DatanodeInfo d = DFSTestUtil.getLocalDatanodeInfo();
+ DatanodeInfo[] ds = new DatanodeInfo[1];
+ ds[0] = d;
+ ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 121, 1);
+ LocatedBlock l1 = new LocatedBlock(b1, ds);
+ l1.setStartOffset(0);
+ l1.setCorrupt(false);
+ List<LocatedBlock> ls = Arrays.asList(l1);
+ LocatedBlocks locatedblocks =
+ new LocatedBlocks(10, false, ls, l1,
+ true, null, null);
+
+ try (PrintWriter pw = response.getWriter()) {
+ pw.write(JsonUtil.toJsonString(locatedblocks));
+ }
+ }
+
+ private void sendException(HttpServletRequest request,
+ HttpServletResponse response,
+ Exception mockException) throws IOException {
+ response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ String errJs = JsonUtil.toJsonString(mockException);
+ try (PrintWriter pw = response.getWriter()) {
+ pw.write(errJs);
+ }
+ }
+ }
+
+ @Test
+ public void testGetFileBlockLocationsBackwardsCompatibility()
+ throws Exception {
+ final Configuration conf = WebHdfsTestUtil.createConf();
+ final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
+ HttpServer2 http = null;
+ try {
+ http = HttpServerFunctionalTest.createTestServer(conf);
+ http.addServlet("test", pathSpec, MockWebHdfsServlet.class);
+ http.start();
+
+ // Write the address back to configuration so
+ // WebHdfsFileSystem could connect to the mock server
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
+ "localhost:" + http.getConnectorAddress(0).getPort());
+
+ final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
+ conf, WebHdfsConstants.WEBHDFS_SCHEME);
+
+ WebHdfsFileSystem spyFs = spy(webFS);
+ BlockLocation[] locations = spyFs
+ .getFileBlockLocations(new Path("p"), 0, 100);
+
+ // Verify result
+ assertEquals(1, locations.length);
+ assertEquals(121, locations[0].getLength());
+
+ // Verify the fall back
+ // The function should be called exactly 2 times
+ // 1st time handles GETFILEBLOCKLOCATIONS and found it is not supported
+ // 2nd time fall back to handle GET_FILE_BLOCK_LOCATIONS
+ verify(spyFs, times(2)).getFileBlockLocations(any(),
+ any(), anyLong(), anyLong());
+
+ // Verify it doesn't erroneously fall back
+ // When server returns a different error, it should directly
+ // throw an exception.
+ try {
+ spyFs.getFileBlockLocations(new Path("p"), 0, 100);
+ } catch (Exception e) {
+ assertTrue(e instanceof IOException);
+ assertEquals(e.getMessage(), MockWebHdfsServlet.RANDOM_EXCEPTION_MSG);
+ // Totally this function has been called 3 times
+ verify(spyFs, times(3)).getFileBlockLocations(any(),
+ any(), anyLong(), anyLong());
+ }
+ } finally {
+ if(http != null) {
+ http.stop();
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]