[ 
https://issues.apache.org/jira/browse/HADOOP-17475?focusedWorklogId=537771&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-537771
 ]

ASF GitHub Bot logged work on HADOOP-17475:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jan/21 12:15
            Start Date: 19/Jan/21 12:15
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#2548:
URL: https://github.com/apache/hadoop/pull/2548#discussion_r560125822



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -45,6 +45,8 @@
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;

Review comment:
       should go above so imports are in order; goal is to reduce conflict 
between patches, branches and versions

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -982,6 +985,19 @@ public boolean exists(Path f) throws IOException {
     return super.exists(f);
   }
 
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(Path path)
+      throws IOException {
+    LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path);
+    if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) {

Review comment:
       would you ever want to make this not optional? one code path == better 
testing

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test ListStatusRemoteIterator operation.
+ */
+public class ITestAbfsListStatusIterator extends AbstractAbfsIntegrationTest {
+
+  private static final int TEST_FILES_NUMBER = 1000;
+
+  public ITestAbfsListStatusIterator() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testListStatusRemoteIterator() throws Exception {
+    Path testDir = createTestDirectory();
+    setPageSize(10);
+    final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+        testDir, "testListPath");
+
+    ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());

Review comment:
       With an interface, you don't need to play mockito games any more. 
Instead just provide a dummy impl to simulate deep/wide directories, 
controllable page size etc

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+public class AbfsListStatusRemoteIterator implements 
RemoteIterator<FileStatus> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AbfsListStatusRemoteIterator.class);
+
+  private static final boolean FETCH_ALL_FALSE = false;
+  private static final int MAX_QUEUE_SIZE = 10;
+
+  private final FileStatus fileStatus;
+  private final ListingSupport listingSupport;
+  private final ArrayBlockingQueue<Iterator<FileStatus>> iteratorsQueue;
+  private final Object asyncOpLock = new Object();
+
+  private volatile boolean isAsyncInProgress = false;
+  private boolean firstBatch = true;
+  private String continuation;
+  private Iterator<FileStatus> currIterator;
+  private IOException ioException;
+
+  public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
+      final ListingSupport listingSupport) {
+    this.fileStatus = fileStatus;
+    this.listingSupport = listingSupport;
+    iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
+    currIterator = Collections.emptyIterator();
+    fetchBatchesAsync();
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    if (currIterator.hasNext()) {
+      return true;
+    }
+    updateCurrentIterator();
+    return currIterator.hasNext();
+  }
+
+  @Override
+  public FileStatus next() throws IOException {
+    if (!this.hasNext()) {
+      throw new NoSuchElementException();
+    }
+    return currIterator.next();
+  }
+
+  private void updateCurrentIterator() throws IOException {
+    fetchBatchesAsync();
+    synchronized (this) {
+      if (iteratorsQueue.isEmpty()) {
+        if (ioException != null) {
+          throw ioException;
+        }
+        if (isListingComplete()) {
+          return;
+        }
+      }
+    }
+    try {
+      currIterator = iteratorsQueue.take();
+      if (!currIterator.hasNext() && !isListingComplete()) {
+        updateCurrentIterator();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("Thread got interrupted: {}", e);
+    }
+  }
+
+  private synchronized boolean isListingComplete() {
+    return !firstBatch && (continuation == null || continuation.isEmpty());
+  }
+
+  private void fetchBatchesAsync() {
+    if (isAsyncInProgress) {
+      return;
+    }
+    synchronized (asyncOpLock) {
+      if (isAsyncInProgress) {
+        return;
+      }
+      isAsyncInProgress = true;
+    }
+    CompletableFuture.runAsync(() -> asyncOp());
+  }
+
+  private void asyncOp() {
+    try {
+      while (!isListingComplete() && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
+        addNextBatchIteratorToQueue();
+      }
+    } catch (IOException e) {
+      ioException = e;

Review comment:
       * should only be set with first IOE, as that's usually first sign of 
failure. Rest just log message @ warn and stack @ debug.
   * what about other exceptions? are they handled by the normal Futures code? 
In which case ExecutionException needs to be picked up and unwrapped. The code 
in org.apache.hadoop.util.functional.FutureIO can help there

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test ListStatusRemoteIterator operation.
+ */
+public class ITestAbfsListStatusIterator extends AbstractAbfsIntegrationTest {
+
+  private static final int TEST_FILES_NUMBER = 1000;
+
+  public ITestAbfsListStatusIterator() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testListStatusRemoteIterator() throws Exception {
+    Path testDir = createTestDirectory();
+    setPageSize(10);
+    final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+        testDir, "testListPath");
+
+    ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
+    RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
+        getFileSystem().getFileStatus(testDir), listngSupport);
+    Assertions.assertThat(fsItr)
+        .describedAs("RemoteIterator should be instance of "
+            + "AbfsListStatusRemoteIterator by default")
+        .isInstanceOf(AbfsListStatusRemoteIterator.class);
+    int itrCount = 0;
+    while (fsItr.hasNext()) {
+      FileStatus fileStatus = fsItr.next();
+      String pathStr = fileStatus.getPath().toString();
+      fileNames.remove(pathStr);
+      itrCount++;
+    }
+    Assertions.assertThat(itrCount)
+        .describedAs("Number of iterations should be equal to the files "
+            + "created")
+        .isEqualTo(TEST_FILES_NUMBER);
+    Assertions.assertThat(fileNames.size())
+        .describedAs("After removing every iterm found from the iterator, "
+            + "there should be no more elements in the fileNames")
+        .isEqualTo(0);
+    verify(listngSupport, Mockito.atLeast(100))
+        .listStatus(any(Path.class), nullable(String.class),
+            anyList(), anyBoolean(),
+            nullable(String.class));
+  }
+
+  @Test
+  public void testListStatusRemoteIteratorWithoutHasNext() throws Exception {
+    Path testDir = createTestDirectory();
+    setPageSize(10);
+    final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+        testDir, "testListPath");
+
+    ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
+    RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
+        getFileSystem().getFileStatus(testDir), listngSupport);
+    Assertions.assertThat(fsItr)
+        .describedAs("RemoteIterator should be instance of "
+            + "AbfsListStatusRemoteIterator by default")
+        .isInstanceOf(AbfsListStatusRemoteIterator.class);
+    int itrCount = 0;
+    for (int i = 0; i < TEST_FILES_NUMBER; i++) {
+      FileStatus fileStatus = fsItr.next();
+      String pathStr = fileStatus.getPath().toString();
+      fileNames.remove(pathStr);
+      itrCount++;
+    }
+    Assertions.assertThatThrownBy(() -> fsItr.next())
+        .describedAs(
+            "next() should throw NoSuchElementException since next has been "
+                + "called " + TEST_FILES_NUMBER + " times")
+        .isInstanceOf(NoSuchElementException.class);
+    Assertions.assertThat(itrCount)
+        .describedAs("Number of iterations should be equal to the files "
+            + "created")
+        .isEqualTo(TEST_FILES_NUMBER);
+    Assertions.assertThat(fileNames.size())
+        .describedAs("After removing every iterm found from the iterator, "
+            + "there should be no more elements in the fileNames")
+        .isEqualTo(0);
+    verify(listngSupport, Mockito.atLeast(100))
+        .listStatus(any(Path.class), nullable(String.class),
+            anyList(), anyBoolean(),
+            nullable(String.class));
+  }
+
+  @Test
+  public void testWithAbfsIteratorDisabled() throws Exception {
+    Path testDir = createTestDirectory();
+    setPageSize(10);
+    setEnableAbfsIterator(false);
+    final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+        testDir, "testListPath");
+
+    RemoteIterator<FileStatus> fsItr =
+        getFileSystem().listStatusIterator(testDir);
+    Assertions.assertThat(fsItr)
+        .describedAs("RemoteIterator should not be instance of "
+            + "AbfsListStatusRemoteIterator when it is disabled")
+        .isNotInstanceOf(AbfsListStatusRemoteIterator.class);
+    int itrCount = 0;
+    while (fsItr.hasNext()) {
+      FileStatus fileStatus = fsItr.next();
+      String pathStr = fileStatus.getPath().toString();
+      fileNames.remove(pathStr);
+      itrCount++;
+    }
+    Assertions.assertThat(itrCount)
+        .describedAs("Number of iterations should be equal to the files "
+            + "created")
+        .isEqualTo(TEST_FILES_NUMBER);
+    Assertions.assertThat(fileNames.size())
+        .describedAs("After removing every iterm found from the iterator, "
+            + "there should be no more elements in the fileNames")
+        .isEqualTo(0);
+  }
+
+  @Test
+  public void testWithAbfsIteratorDisabledWithutHasNext() throws Exception {
+    Path testDir = createTestDirectory();
+    setPageSize(10);
+    setEnableAbfsIterator(false);
+    final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+        testDir, "testListPath");
+
+    RemoteIterator<FileStatus> fsItr =
+        getFileSystem().listStatusIterator(testDir);
+    Assertions.assertThat(fsItr)
+        .describedAs("RemoteIterator should not be instance of "
+            + "AbfsListStatusRemoteIterator when it is disabled")
+        .isNotInstanceOf(AbfsListStatusRemoteIterator.class);
+    int itrCount = 0;
+    for (int i = 0; i < TEST_FILES_NUMBER; i++) {
+      FileStatus fileStatus = fsItr.next();
+      String pathStr = fileStatus.getPath().toString();
+      fileNames.remove(pathStr);
+      itrCount++;
+    }
+    Assertions.assertThatThrownBy(() -> fsItr.next())
+        .describedAs(
+            "next() should throw NoSuchElementException since next has been "
+                + "called " + TEST_FILES_NUMBER + " times")
+        .isInstanceOf(NoSuchElementException.class);
+    Assertions.assertThat(itrCount)
+        .describedAs("Number of iterations should be equal to the files "
+            + "created")
+        .isEqualTo(TEST_FILES_NUMBER);
+    Assertions.assertThat(fileNames.size())
+        .describedAs("After removing every iterm found from the iterator, "
+            + "there should be no more elements in the fileNames")
+        .isEqualTo(0);
+  }
+
+  @Test
+  public void testNextWhenNoMoreElementsPresent() throws Exception {
+    Path testDir = createTestDirectory();
+    setPageSize(10);
+    RemoteIterator fsItr =
+        new 
AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
+            getFileSystem().getAbfsStore());
+    fsItr = Mockito.spy(fsItr);
+    Mockito.doReturn(false).when(fsItr).hasNext();
+
+    RemoteIterator<FileStatus> finalFsItr = fsItr;
+    Assertions.assertThatThrownBy(() -> finalFsItr.next())
+        .describedAs(
+        "next() should throw NoSuchElementException if hasNext() return "
+            + "false")
+        .isInstanceOf(NoSuchElementException.class);
+  }
+
+  @Test
+  public void testHasNextForEmptyDir() throws Exception {
+    Path testDir = createTestDirectory();
+    setPageSize(10);
+    RemoteIterator<FileStatus> fsItr = getFileSystem()
+        .listStatusIterator(testDir);
+    Assertions.assertThat(fsItr.hasNext())
+        .describedAs("hasNext returns false for empty directory")
+        .isFalse();
+  }
+
+  @Test
+  public void testHasNextForFile() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    String testFileName = "testFile";
+    Path testFile = new Path(testFileName);
+    getFileSystem().create(testFile);
+    setPageSize(10);
+    RemoteIterator<FileStatus> fsItr = fs.listStatusIterator(testFile);
+    Assertions.assertThat(fsItr.hasNext())
+        .describedAs("hasNext returns true for file").isTrue();
+    Assertions.assertThat(fsItr.next().getPath().toString())
+        .describedAs("next returns the file itself")
+        .endsWith(testFileName);
+  }
+
+  @Test
+  public void testIOException() throws Exception {
+    Path testDir = createTestDirectory();
+    setPageSize(10);
+    getFileSystem().mkdirs(testDir);
+
+    String exceptionMessage = "test exception";
+    ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
+    RemoteIterator fsItr =
+        new 
AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
+        lsSupport);
+
+    Assertions.assertThatThrownBy(() -> fsItr.next())
+        .describedAs(
+        "When ioException is not null and queue is empty exception should be "
+            + "thrown")
+        .isInstanceOf(IOException.class)
+        .hasMessage(exceptionMessage);
+  }
+
+  @Test
+  public void testNonExistingPath() throws Throwable {
+    Path nonExistingDir = new Path("nonExistingPath");
+    Assertions.assertThatThrownBy(
+        () -> getFileSystem().listStatusIterator(nonExistingDir)).describedAs(
+        "test the listStatusIterator call on a path which is not "
+            + "present should result in FileNotFoundException")
+        .isInstanceOf(FileNotFoundException.class);
+  }
+
+  private ListingSupport getMockListingSupport(String exceptionMessage) {
+    return new ListingSupport() {
+      @Override
+      public FileStatus[] listStatus(Path path) throws IOException {
+        return null;
+      }
+
+      @Override
+      public FileStatus[] listStatus(Path path, String startFrom)
+          throws IOException {
+        return null;
+      }
+
+      @Override
+      public String listStatus(Path path, String startFrom,
+          List<FileStatus> fileStatuses, boolean fetchAll, String continuation)
+          throws IOException {
+        throw new IOException(exceptionMessage);
+      }
+    };
+  }
+
+  private Path createTestDirectory() throws IOException {
+    String testDirectoryName = "testDirectory" + System.currentTimeMillis();
+    Path testDirectory = new Path(testDirectoryName);
+    getFileSystem().mkdirs(testDirectory);
+    return testDirectory;
+  }
+
+  private void setEnableAbfsIterator(boolean shouldEnable) throws IOException {
+    AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem());
+    abfsStore.getAbfsConfiguration().setEnableAbfsListIterator(shouldEnable);
+  }
+
+  private void setPageSize(int pageSize) throws IOException {
+    AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem());
+    abfsStore.getAbfsConfiguration().setListMaxResults(pageSize);
+  }
+
+  private List<String> createFilesUnderDirectory(int numFiles, Path rootPath,
+      String filenamePrefix)
+      throws ExecutionException, InterruptedException, IOException {
+    final List<Future<Void>> tasks = new ArrayList<>();
+    final List<String> fileNames = new ArrayList<>();
+    ExecutorService es = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < numFiles; i++) {
+      final Path filePath = new Path(rootPath, filenamePrefix + i);
+      Callable<Void> callable = new Callable<Void>() {

Review comment:
       you can just use a () -> { } closure here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 537771)
    Time Spent: 0.5h  (was: 20m)

> Implement listStatusIterator
> ----------------------------
>
>                 Key: HADOOP-17475
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17475
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.0
>            Reporter: Bilahari T H
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to