Repository: hadoop
Updated Branches:
  refs/heads/trunk f6f3a447b -> 8a4095305


HDFS-10823. Implement HttpFSFileSystem#listStatusIterator.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8a409530
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8a409530
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8a409530

Branch: refs/heads/trunk
Commit: 8a40953058d50d421d62b71067a13b626b3cba1f
Parents: f6f3a44
Author: Andrew Wang <w...@apache.org>
Authored: Fri Sep 16 15:37:36 2016 -0700
Committer: Andrew Wang <w...@apache.org>
Committed: Fri Sep 16 15:37:36 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileSystem.java   | 125 ++++++++++++++++---
 .../apache/hadoop/fs/TestFilterFileSystem.java  |   1 +
 .../org/apache/hadoop/fs/TestHarFileSystem.java |   1 +
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      |  68 ++++------
 .../hadoop/fs/http/client/HttpFSFileSystem.java |  56 +++++++--
 .../hadoop/fs/http/client/HttpFSUtils.java      |   2 +
 .../hadoop/fs/http/server/FSOperations.java     |  62 +++++++++
 .../http/server/HttpFSParametersProvider.java   |  20 +++
 .../hadoop/fs/http/server/HttpFSServer.java     |  17 +++
 .../service/hadoop/FileSystemAccessService.java |   4 +-
 .../fs/http/client/BaseTestHttpFSWith.java      |  62 ++++++++-
 11 files changed, 340 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 9bde29d..5939f97 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.core.Tracer;
 import org.apache.htrace.core.TraceScope;
 
+import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -1530,7 +1531,68 @@ public abstract class FileSystem extends Configured 
implements Closeable {
    */
   public abstract FileStatus[] listStatus(Path f) throws 
FileNotFoundException, 
                                                          IOException;
-    
+
+  /**
+   * Represents a batch of directory entries when iteratively listing a
+   * directory. This is a private API not meant for use by end users.
+   * <p>
+   * For internal use by FileSystem subclasses that override
+   * {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative
+   * listing.
+   */
+  @InterfaceAudience.Private
+  public static class DirectoryEntries {
+    private final FileStatus[] entries;
+    private final byte[] token;
+    private final boolean hasMore;
+
+    public DirectoryEntries(FileStatus[] entries, byte[] token, boolean
+        hasMore) {
+      this.entries = entries;
+      if (token != null) {
+        this.token = token.clone();
+      } else {
+        this.token = null;
+      }
+      this.hasMore = hasMore;
+    }
+
+    public FileStatus[] getEntries() {
+      return entries;
+    }
+
+    public byte[] getToken() {
+      return token;
+    }
+
+    public boolean hasMore() {
+      return hasMore;
+    }
+  }
+
+  /**
+   * Given an opaque iteration token, return the next batch of entries in a
+   * directory. This is a private API not meant for use by end users.
+   * <p>
+   * This method should be overridden by FileSystem subclasses that want to
+   * use the generic {@link FileSystem#listStatusIterator(Path)} 
implementation.
+   * @param f Path to list
+   * @param token opaque iteration token returned by previous call, or null
+   *              if this is the first call.
+   * @return
+   * @throws FileNotFoundException
+   * @throws IOException
+   */
+  @InterfaceAudience.Private
+  protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+      FileNotFoundException, IOException {
+    // The default implementation returns the entire listing as a single batch.
+    // Thus, there is never a second batch, and no need to respect the passed
+    // token or set a token in the returned DirectoryEntries.
+    FileStatus[] listing = listStatus(f);
+    return new DirectoryEntries(listing, null, false);
+  }
+
   /*
    * Filter files/directories in the given path using the user-supplied path
    * filter. Results are added to the given array <code>results</code>.
@@ -1767,6 +1829,49 @@ public abstract class FileSystem extends Configured 
implements Closeable {
   }
 
   /**
+   * Generic iterator for implementing {@link #listStatusIterator(Path)}.
+   */
+  private class DirListingIterator<T extends FileStatus> implements
+      RemoteIterator<T> {
+
+    private final Path path;
+    private DirectoryEntries entries;
+    private int i = 0;
+
+    DirListingIterator(Path path) {
+      this.path = path;
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      if (entries == null) {
+        fetchMore();
+      }
+      return i < entries.getEntries().length ||
+          entries.hasMore();
+    }
+
+    private void fetchMore() throws IOException {
+      byte[] token = null;
+      if (entries != null) {
+        token = entries.getToken();
+      }
+      entries = listStatusBatch(path, token);
+      i = 0;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public T next() throws IOException {
+      Preconditions.checkState(hasNext(), "No more items in iterator");
+      if (i == entries.getEntries().length) {
+        fetchMore();
+      }
+      return (T)entries.getEntries()[i++];
+    }
+  }
+
+  /**
    * Returns a remote iterator so that followup calls are made on demand
    * while consuming the entries. Each file system implementation should
    * override this method and provide a more efficient implementation, if
@@ -1779,23 +1884,7 @@ public abstract class FileSystem extends Configured 
implements Closeable {
    */
   public RemoteIterator<FileStatus> listStatusIterator(final Path p)
   throws FileNotFoundException, IOException {
-    return new RemoteIterator<FileStatus>() {
-      private final FileStatus[] stats = listStatus(p);
-      private int i = 0;
-
-      @Override
-      public boolean hasNext() {
-        return i<stats.length;
-      }
-
-      @Override
-      public FileStatus next() throws IOException {
-        if (!hasNext()) {
-          throw new NoSuchElementException("No more entry in " + p);
-        }
-        return stats[i++];
-      }
-    };
+    return new DirListingIterator<>(p);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index e1312bc..24f3dc8 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -103,6 +103,7 @@ public class TestFilterFileSystem {
     public void processDeleteOnExit();
     public FsStatus getStatus();
     public FileStatus[] listStatus(Path f, PathFilter filter);
+    public FileStatus[] listStatusBatch(Path f, byte[] token);
     public FileStatus[] listStatus(Path[] files);
     public FileStatus[] listStatus(Path[] files, PathFilter filter);
     public FileStatus[] globStatus(Path pathPattern);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index d2020b9..bacdbb7 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -115,6 +115,7 @@ public class TestHarFileSystem {
     public QuotaUsage getQuotaUsage(Path f);
     public FsStatus getStatus();
     public FileStatus[] listStatus(Path f, PathFilter filter);
+    public FileStatus[] listStatusBatch(Path f, byte[] token);
     public FileStatus[] listStatus(Path[] files);
     public FileStatus[] listStatus(Path[] files, PathFilter filter);
     public FileStatus[] globStatus(Path pathPattern);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index 9a9edc8..19de5b5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
@@ -1504,55 +1503,30 @@ public class WebHdfsFileSystem extends FileSystem
   }
 
   private static final byte[] EMPTY_ARRAY = new byte[] {};
-  private class DirListingIterator<T extends FileStatus> implements
-      RemoteIterator<T> {
 
-    private final Path path;
-    private DirectoryListing thisListing;
-    private int i = 0;
-    private byte[] prevKey = EMPTY_ARRAY;
-
-    DirListingIterator(Path path) {
-      this.path = path;
-    }
-
-    @Override
-    public boolean hasNext() throws IOException {
-      if (thisListing == null) {
-        fetchMore();
-      }
-      return i < thisListing.getPartialListing().length ||
-          thisListing.hasMore();
-    }
-
-    private void fetchMore() throws IOException {
-      thisListing = new FsPathResponseRunner<DirectoryListing>(
-          GetOpParam.Op.LISTSTATUS_BATCH,
-          path, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
-        @Override
-        DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
-          return JsonUtilClient.toDirectoryListing(json);
-        }
-      }.run();
-      i = 0;
-      prevKey = thisListing.getLastName();
+  @Override
+  public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+      FileNotFoundException, IOException {
+    byte[] prevKey = EMPTY_ARRAY;
+    if (token != null) {
+      prevKey = token;
     }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public T next() throws IOException {
-      Preconditions.checkState(hasNext(), "No more items in iterator");
-      if (i == thisListing.getPartialListing().length) {
-        fetchMore();
+    DirectoryListing listing = new FsPathResponseRunner<DirectoryListing>(
+        GetOpParam.Op.LISTSTATUS_BATCH,
+        f, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
+      @Override
+      DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtilClient.toDirectoryListing(json);
       }
-      return (T)makeQualified(thisListing.getPartialListing()[i++], path);
-    }
-  }
-
-  @Override
-  public RemoteIterator<FileStatus> listStatusIterator(final Path f)
-      throws FileNotFoundException, IOException {
-    return new DirListingIterator<>(f);
+    }.run();
+    // Qualify the returned FileStatus array
+    final HdfsFileStatus[] statuses = listing.getPartialListing();
+    FileStatus[] qualified = new FileStatus[statuses.length];
+    for (int i = 0; i < statuses.length; i++) {
+      qualified[i] = makeQualified(statuses[i], f);
+    }
+    return new DirectoryEntries(qualified, listing.getLastName(),
+        listing.hasMore());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
index 0f97d90..77e3323 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.fs.http.client;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+
+import com.google.common.base.Charsets;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -111,6 +113,7 @@ public class HttpFSFileSystem extends FileSystem
   public static final String XATTR_SET_FLAG_PARAM = "flag";
   public static final String XATTR_ENCODING_PARAM = "encoding";
   public static final String NEW_LENGTH_PARAM = "newlength";
+  public static final String START_AFTER_PARAM = "startAfter";
 
   public static final Short DEFAULT_PERMISSION = 0755;
   public static final String ACLSPEC_DEFAULT = "";
@@ -184,6 +187,10 @@ public class HttpFSFileSystem extends FileSystem
 
   public static final String ENC_BIT_JSON = "encBit";
 
+  public static final String DIRECTORY_LISTING_JSON = "DirectoryListing";
+  public static final String PARTIAL_LISTING_JSON = "partialListing";
+  public static final String REMAINING_ENTRIES_JSON = "remainingEntries";
+
   public static final int HTTP_TEMPORARY_REDIRECT = 307;
 
   private static final String HTTP_GET = "GET";
@@ -203,7 +210,7 @@ public class HttpFSFileSystem extends FileSystem
     MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
     REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
     DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
-    REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET);
+    REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET);
 
     private String httpMethod;
 
@@ -666,6 +673,17 @@ public class HttpFSFileSystem extends FileSystem
     return (Boolean) json.get(DELETE_JSON);
   }
 
+  private FileStatus[] toFileStatuses(JSONObject json, Path f) {
+    json = (JSONObject) json.get(FILE_STATUSES_JSON);
+    JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
+    FileStatus[] array = new FileStatus[jsonArray.size()];
+    f = makeQualified(f);
+    for (int i = 0; i < jsonArray.size(); i++) {
+      array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
+    }
+    return array;
+  }
+
   /**
    * List the statuses of the files/directories in the given path if the path 
is
    * a directory.
@@ -684,14 +702,36 @@ public class HttpFSFileSystem extends FileSystem
                                            params, f, true);
     HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
     JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
-    json = (JSONObject) json.get(FILE_STATUSES_JSON);
-    JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
-    FileStatus[] array = new FileStatus[jsonArray.size()];
-    f = makeQualified(f);
-    for (int i = 0; i < jsonArray.size(); i++) {
-      array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
+    return toFileStatuses(json, f);
+  }
+
+  @Override
+  public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+      FileNotFoundException, IOException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(OP_PARAM, Operation.LISTSTATUS_BATCH.toString());
+    if (token != null) {
+      params.put(START_AFTER_PARAM, new String(token, Charsets.UTF_8));
     }
-    return array;
+    HttpURLConnection conn = getConnection(
+        Operation.LISTSTATUS_BATCH.getMethod(),
+        params, f, true);
+    HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+    // Parse the FileStatus array
+    JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
+    JSONObject listing = (JSONObject) json.get(DIRECTORY_LISTING_JSON);
+    FileStatus[] statuses = toFileStatuses(
+        (JSONObject) listing.get(PARTIAL_LISTING_JSON), f);
+    // New token is the last FileStatus entry
+    byte[] newToken = null;
+    if (statuses.length > 0) {
+      newToken = statuses[statuses.length - 1].getPath().getName().toString()
+          .getBytes(Charsets.UTF_8);
+    }
+    // Parse the remainingEntries boolean into hasMore
+    final long remainingEntries = (Long) listing.get(REMAINING_ENTRIES_JSON);
+    final boolean hasMore = remainingEntries > 0 ? true : false;
+    return new DirectoryEntries(statuses, newToken, hasMore);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
index 95e26d7..fcc7bab 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
@@ -43,6 +43,8 @@ public class HttpFSUtils {
 
   public static final String SERVICE_VERSION = "/v1";
 
+  public static final byte[] EMPTY_BYTES = {};
+
   private static final String SERVICE_PATH = SERVICE_NAME + SERVICE_VERSION;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
index 39597eb..46948f9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.GlobFilter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -37,6 +38,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -108,6 +110,27 @@ public class FSOperations {
     return json;
   }
 
+  /**
+   * Serializes a DirectoryEntries object into the JSON for a
+   * WebHDFS {@link org.apache.hadoop.hdfs.protocol.DirectoryListing}.
+   * <p>
+   * These two classes are slightly different, due to the impedance
+   * mismatches between the WebHDFS and FileSystem APIs.
+   * @param entries
+   * @return json
+   */
+  private static Map<String, Object> toJson(FileSystem.DirectoryEntries
+      entries) {
+    Map<String, Object> json = new LinkedHashMap<>();
+    Map<String, Object> inner = new LinkedHashMap<>();
+    Map<String, Object> fileStatuses = toJson(entries.getEntries());
+    inner.put(HttpFSFileSystem.PARTIAL_LISTING_JSON, fileStatuses);
+    inner.put(HttpFSFileSystem.REMAINING_ENTRIES_JSON, entries.hasMore() ? 1
+        : 0);
+    json.put(HttpFSFileSystem.DIRECTORY_LISTING_JSON, inner);
+    return json;
+  }
+
   /** Converts an <code>AclStatus</code> object into a JSON object.
    *
    * @param aclStatus AclStatus object
@@ -625,6 +648,45 @@ public class FSOperations {
   }
 
   /**
+   * Executor that performs a batched directory listing.
+   */
+  @InterfaceAudience.Private
+  public static class FSListStatusBatch implements FileSystemAccess
+      .FileSystemExecutor<Map> {
+    private final Path path;
+    private final byte[] token;
+
+    public FSListStatusBatch(String path, byte[] token) throws IOException {
+      this.path = new Path(path);
+      this.token = token.clone();
+    }
+
+    /**
+     * Simple wrapper filesystem that exposes the protected batched
+     * listStatus API so we can use it.
+     */
+    private static class WrappedFileSystem extends FilterFileSystem {
+      public WrappedFileSystem(FileSystem f) {
+        super(f);
+      }
+
+      @Override
+      public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+          FileNotFoundException, IOException {
+        return super.listStatusBatch(f, token);
+      }
+    }
+
+    @Override
+    public Map execute(FileSystem fs) throws IOException {
+      WrappedFileSystem wrappedFS = new WrappedFileSystem(fs);
+      FileSystem.DirectoryEntries entries =
+          wrappedFS.listStatusBatch(path, token);
+      return toJson(entries);
+    }
+  }
+
+  /**
    * Executor that performs a mkdirs FileSystemAccess files system operation.
    */
   @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
index 5c4204a..25585c5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
@@ -91,6 +91,8 @@ public class HttpFSParametersProvider extends 
ParametersProvider {
     PARAMS_DEF.put(Operation.GETXATTRS, 
         new Class[]{XAttrNameParam.class, XAttrEncodingParam.class});
     PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{});
+    PARAMS_DEF.put(Operation.LISTSTATUS_BATCH,
+        new Class[]{StartAfterParam.class});
   }
 
   public HttpFSParametersProvider() {
@@ -520,4 +522,22 @@ public class HttpFSParametersProvider extends 
ParametersProvider {
       super(NAME, XAttrCodec.class, null);
     }
   }
+
+  /**
+   * Class for startafter parameter.
+   */
+  @InterfaceAudience.Private
+  public static class StartAfterParam extends StringParam {
+    /**
+     * Parameter name.
+     */
+    public static final String NAME = HttpFSFileSystem.START_AFTER_PARAM;
+
+    /**
+     * Constructor.
+     */
+    public StartAfterParam() {
+      super(NAME, null);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
index db4692a..a4db124 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.fs.http.server;
 
+import com.google.common.base.Charsets;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.XAttrCodec;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
+import org.apache.hadoop.fs.http.client.HttpFSUtils;
 import 
org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam;
 import 
org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AclPermissionParam;
 import 
org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
@@ -320,6 +322,21 @@ public class HttpFSServer {
       response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
       break;
     }
+    case LISTSTATUS_BATCH: {
+      String startAfter = params.get(
+          HttpFSParametersProvider.StartAfterParam.NAME,
+          HttpFSParametersProvider.StartAfterParam.class);
+      byte[] token = HttpFSUtils.EMPTY_BYTES;
+      if (startAfter != null) {
+        token = startAfter.getBytes(Charsets.UTF_8);
+      }
+      FSOperations.FSListStatusBatch command = new FSOperations
+          .FSListStatusBatch(path, token);
+      @SuppressWarnings("rawtypes") Map json = fsExecute(user, command);
+      AUDIT_LOG.info("[{}] token [{}]", path, token);
+      response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
+      break;
+    }
     default: {
       throw new IOException(
           MessageFormat.format("Invalid HTTP GET operation [{0}]", 
op.value()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
index 88780cb..0b767be 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
@@ -84,7 +84,7 @@ public class FileSystemAccessService extends BaseService 
implements FileSystemAc
       count = 0;
     }
 
-    synchronized FileSystem getFileSytem(Configuration conf)
+    synchronized FileSystem getFileSystem(Configuration conf)
       throws IOException {
       if (fs == null) {
         fs = FileSystem.get(conf);
@@ -290,7 +290,7 @@ public class FileSystemAccessService extends BaseService 
implements FileSystemAc
     }
     Configuration conf = new Configuration(namenodeConf);
     conf.set(HTTPFS_FS_USER, user);
-    return cachedFS.getFileSytem(conf);
+    return cachedFS.getFileSystem(conf);
   }
 
   protected void closeFileSystem(FileSystem fs) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
index aea6cf8..e475803 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
@@ -44,6 +45,7 @@ import org.apache.hadoop.test.TestHdfsHelper;
 import org.apache.hadoop.test.TestJetty;
 import org.apache.hadoop.test.TestJettyHelper;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -62,6 +64,7 @@ import java.io.Writer;
 import java.net.URI;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -136,14 +139,19 @@ public abstract class BaseTestHttpFSWith extends 
HFSTestCase {
     return "webhdfs";
   }
 
-  protected FileSystem getHttpFSFileSystem() throws Exception {
-    Configuration conf = new Configuration();
+  protected FileSystem getHttpFSFileSystem(Configuration conf) throws
+      Exception {
     conf.set("fs.webhdfs.impl", getFileSystemClass().getName());
     URI uri = new URI(getScheme() + "://" +
                       TestJettyHelper.getJettyURL().toURI().getAuthority());
     return FileSystem.get(uri, conf);
   }
 
+  protected FileSystem getHttpFSFileSystem() throws Exception {
+    Configuration conf = new Configuration();
+    return getHttpFSFileSystem(conf);
+  }
+
   protected void testGet() throws Exception {
     FileSystem fs = getHttpFSFileSystem();
     Assert.assertNotNull(fs);
@@ -355,6 +363,51 @@ public abstract class BaseTestHttpFSWith extends 
HFSTestCase {
     assertEquals(stati[0].getPath().getName(), path.getName());
   }
 
+  private static void assertSameListing(FileSystem expected, FileSystem
+      actual, Path p) throws IOException {
+    // Consume all the entries from both iterators
+    RemoteIterator<FileStatus> exIt = expected.listStatusIterator(p);
+    List<FileStatus> exStatuses = new ArrayList<>();
+    while (exIt.hasNext()) {
+      exStatuses.add(exIt.next());
+    }
+    RemoteIterator<FileStatus> acIt = actual.listStatusIterator(p);
+    List<FileStatus> acStatuses = new ArrayList<>();
+    while (acIt.hasNext()) {
+      acStatuses.add(acIt.next());
+    }
+    assertEquals(exStatuses.size(), acStatuses.size());
+    for (int i = 0; i < exStatuses.size(); i++) {
+      FileStatus expectedStatus = exStatuses.get(i);
+      FileStatus actualStatus = acStatuses.get(i);
+      // Path URIs are fully qualified, so compare just the path component
+      assertEquals(expectedStatus.getPath().toUri().getPath(),
+          actualStatus.getPath().toUri().getPath());
+    }
+  }
+
+  private void testListStatusBatch() throws Exception {
+    // LocalFileSystem writes checksum files next to the data files, which
+    // show up when listing via LFS. This makes the listings not compare
+    // properly.
+    Assume.assumeFalse(isLocalFS());
+
+    FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
+    FileSystem httpFs = getHttpFSFileSystem(conf);
+
+    // Test an empty directory
+    Path dir = new Path(getProxiedFSTestDir(), "dir");
+    proxyFs.mkdirs(dir);
+    assertSameListing(proxyFs, httpFs, dir);
+    // Create and test in a loop
+    for (int i = 0; i < 10; i++) {
+      proxyFs.create(new Path(dir, "file" + i)).close();
+      assertSameListing(proxyFs, httpFs, dir);
+    }
+  }
+
   private void testWorkingdirectory() throws Exception {
     FileSystem fs = FileSystem.get(getProxiedFSConf());
     Path workingDir = fs.getWorkingDirectory();
@@ -863,7 +916,7 @@ public abstract class BaseTestHttpFSWith extends 
HFSTestCase {
     GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS, 
     WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER, 
     SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
-    GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION
+    GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH
   }
 
   private void operation(Operation op) throws Exception {
@@ -940,6 +993,9 @@ public abstract class BaseTestHttpFSWith extends 
HFSTestCase {
       case ENCRYPTION:
         testEncryption();
         break;
+      case LIST_STATUS_BATCH:
+        testListStatusBatch();
+        break;
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to