This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 35c93ef HADOOP-17475. ABFS : add high performance listStatusIterator
(#2548)
35c93ef is described below
commit 35c93ef5f371414bec578e75ed6bb345ea17b79f
Author: bilaharith <[email protected]>
AuthorDate: Thu Feb 4 19:06:19 2021 +0530
HADOOP-17475. ABFS : add high performance listStatusIterator (#2548)
The ABFS connector now implements listStatusIterator() with
asynchronous prefetching of the next page(s) of results.
For listing large directories this can provide tangible speedups.
If for any reason this needs to be disabled, set
fs.azure.enable.abfslistiterator to false.
Contributed by Bilahari T H.
Change-Id: Ic9a52b80df1d0ffed4c81beae92c136e2a12698c
---
.../hadoop-azure/dev-support/findbugs-exclude.xml | 9 +
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 13 +
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 16 +
.../fs/azurebfs/AzureBlobFileSystemStore.java | 33 +-
.../fs/azurebfs/constants/ConfigurationKeys.java | 2 +
.../constants/FileSystemConfigurations.java | 2 +
.../services/AbfsListStatusRemoteIterator.java | 159 ++++++++++
.../fs/azurebfs/services/ListingSupport.java | 79 +++++
.../ITestAbfsListStatusRemoteIterator.java | 340 +++++++++++++++++++++
9 files changed, 643 insertions(+), 10 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index 7087d78..b750b8b 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -74,4 +74,13 @@
<Class name="org.apache.hadoop.fs.azure.FileMetadata" />
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS" />
</Match>
+
+ <!-- continuation is returned from an external http call. Keeping this
+ outside synchronized block since the same is costly. -->
+ <Match>
+ <Class
name="org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator" />
+ <Field name="continuation" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
+
</FindBugsFilter>
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 5a70323..193be48 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -275,6 +275,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
private long sasTokenRenewPeriodForStreamsInSeconds;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue =
DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
+ private boolean enableAbfsListIterator;
+
public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException,
IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
@@ -896,6 +900,10 @@ public class AbfsConfiguration{
return this.maxWriteRequestsToQueue;
}
+ public boolean enableAbfsListIterator() {
+ return this.enableAbfsListIterator;
+ }
+
@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
@@ -961,4 +969,9 @@ public class AbfsConfiguration{
this.optimizeFooterRead = optimizeFooterRead;
}
+ @VisibleForTesting
+ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
+ this.enableAbfsListIterator = enableAbfsListIterator;
+ }
+
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 4d28553..ead8566 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
+import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -79,6 +81,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
@@ -983,6 +986,19 @@ public class AzureBlobFileSystem extends FileSystem {
return super.exists(f);
}
+ @Override
+ public RemoteIterator<FileStatus> listStatusIterator(Path path)
+ throws IOException {
+ LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path);
+ if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) {
+ AbfsListStatusRemoteIterator abfsLsItr =
+ new AbfsListStatusRemoteIterator(getFileStatus(path), abfsStore);
+ return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
+ } else {
+ return super.listStatusIterator(path);
+ }
+ }
+
private FileStatus tryGetFileStatus(final Path f) {
try {
return getFileStatus(f);
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index c8dd518..f4be159 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -102,6 +102,7 @@ import
org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
+import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
@@ -131,7 +132,7 @@ import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class AzureBlobFileSystemStore implements Closeable {
+public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final Logger LOG =
LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
private AbfsClient client;
@@ -838,6 +839,7 @@ public class AzureBlobFileSystemStore implements Closeable {
* @param path The list path.
* @return the entries in the path.
* */
+ @Override
public FileStatus[] listStatus(final Path path) throws IOException {
return listStatus(path, null);
}
@@ -854,7 +856,17 @@ public class AzureBlobFileSystemStore implements Closeable
{
* @return the entries in the path start from "startFrom" in lexical order.
* */
@InterfaceStability.Unstable
+ @Override
public FileStatus[] listStatus(final Path path, final String startFrom)
throws IOException {
+ List<FileStatus> fileStatuses = new ArrayList<>();
+ listStatus(path, startFrom, fileStatuses, true, null);
+ return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
+ }
+
+ @Override
+ public String listStatus(final Path path, final String startFrom,
+ List<FileStatus> fileStatuses, final boolean fetchAll,
+ String continuation) throws IOException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
long countAggregate = 0;
boolean shouldContinue = true;
@@ -865,16 +877,16 @@ public class AzureBlobFileSystemStore implements
Closeable {
startFrom);
final String relativePath = getRelativePath(path);
- String continuation = null;
- // generate continuation token if a valid startFrom is provided.
- if (startFrom != null && !startFrom.isEmpty()) {
- continuation = getIsNamespaceEnabled()
- ? generateContinuationTokenForXns(startFrom)
- : generateContinuationTokenForNonXns(relativePath, startFrom);
+ if (continuation == null || continuation.isEmpty()) {
+ // generate continuation token if a valid startFrom is provided.
+ if (startFrom != null && !startFrom.isEmpty()) {
+ continuation = getIsNamespaceEnabled()
+ ? generateContinuationTokenForXns(startFrom)
+ : generateContinuationTokenForNonXns(relativePath, startFrom);
+ }
}
- ArrayList<FileStatus> fileStatuses = new ArrayList<>();
do {
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
AbfsRestOperation op = client.listPath(relativePath, false,
@@ -928,7 +940,8 @@ public class AzureBlobFileSystemStore implements Closeable {
perfInfo.registerSuccess(true);
countAggregate++;
- shouldContinue = continuation != null && !continuation.isEmpty();
+ shouldContinue =
+ fetchAll && continuation != null && !continuation.isEmpty();
if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
@@ -936,7 +949,7 @@ public class AzureBlobFileSystemStore implements Closeable {
}
} while (shouldContinue);
- return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
+ return continuation;
}
// generate continuation token for xns account
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index cdef9c9..8a9c63d 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -130,6 +130,8 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT =
"fs.azure.identity.transformer.skip.superuser.replacement";
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER =
"fs.azure.account.keyprovider";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT =
"fs.azure.shellkeyprovider.script";
+ /** Setting this true will make the driver use it's own RemoteIterator
implementation */
+ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR =
"fs.azure.enable.abfslistiterator";
/** End point of ABFS account: {@value}. */
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index a23dfd5..9b760c4 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -101,5 +101,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true;
public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 *
1000; // 5 mins
+ public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
+
private FileSystemConfigurations() {}
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
new file mode 100644
index 0000000..0c664fc
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.activation.UnsupportedDataTypeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+public class AbfsListStatusRemoteIterator
+ implements RemoteIterator<FileStatus> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AbfsListStatusRemoteIterator.class);
+
+ private static final boolean FETCH_ALL_FALSE = false;
+ private static final int MAX_QUEUE_SIZE = 10;
+ private static final long POLL_WAIT_TIME_IN_MS = 250;
+
+ private final FileStatus fileStatus;
+ private final ListingSupport listingSupport;
+ private final ArrayBlockingQueue<Object> iteratorsQueue;
+
+ private volatile boolean isAsyncInProgress = false;
+ private boolean isIterationComplete = false;
+ private String continuation;
+ private Iterator<FileStatus> currIterator;
+
+ public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
+ final ListingSupport listingSupport) {
+ this.fileStatus = fileStatus;
+ this.listingSupport = listingSupport;
+ iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
+ currIterator = Collections.emptyIterator();
+ fetchBatchesAsync();
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (currIterator.hasNext()) {
+ return true;
+ }
+ currIterator = getNextIterator();
+ return currIterator.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ if (!this.hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return currIterator.next();
+ }
+
+ private Iterator<FileStatus> getNextIterator() throws IOException {
+ fetchBatchesAsync();
+ try {
+ Object obj = null;
+ while (obj == null
+ && (!isIterationComplete || !iteratorsQueue.isEmpty())) {
+ obj = iteratorsQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS);
+ }
+ if (obj == null) {
+ return Collections.emptyIterator();
+ } else if (obj instanceof Iterator) {
+ return (Iterator<FileStatus>) obj;
+ } else if (obj instanceof IOException) {
+ throw (IOException) obj;
+ } else {
+ throw new UnsupportedDataTypeException();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", e);
+ throw new IOException(e);
+ }
+ }
+
+ private void fetchBatchesAsync() {
+ if (isAsyncInProgress || isIterationComplete) {
+ return;
+ }
+ synchronized (this) {
+ if (isAsyncInProgress || isIterationComplete) {
+ return;
+ }
+ isAsyncInProgress = true;
+ }
+ CompletableFuture.runAsync(() -> asyncOp());
+ }
+
+ private void asyncOp() {
+ try {
+ while (!isIterationComplete && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
+ addNextBatchIteratorToQueue();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fetching filestatuses failed", ioe);
+ try {
+ iteratorsQueue.put(ioe);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", interruptedException);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", e);
+ } finally {
+ synchronized (this) {
+ isAsyncInProgress = false;
+ }
+ }
+ }
+
+ private void addNextBatchIteratorToQueue()
+ throws IOException, InterruptedException {
+ List<FileStatus> fileStatuses = new ArrayList<>();
+ continuation = listingSupport
+ .listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
+ continuation);
+ if (!fileStatuses.isEmpty()) {
+ iteratorsQueue.put(fileStatuses.iterator());
+ }
+ synchronized (this) {
+ if (continuation == null || continuation.isEmpty()) {
+ isIterationComplete = true;
+ }
+ }
+ }
+
+}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
new file mode 100644
index 0000000..4c44940
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
[email protected]
[email protected]
+public interface ListingSupport {
+
+ /**
+ * @param path The list path.
+ * @return the entries in the path.
+ * @throws IOException in case of error
+ */
+ FileStatus[] listStatus(Path path) throws IOException;
+
+ /**
+ * @param path Path the list path.
+ * @param startFrom The entry name that list results should start with.
+ * For example, if folder "/folder" contains four
+ * files: "afile", "bfile", "hfile", "ifile". Then
+ * listStatus(Path("/folder"), "hfile") will return
+ * "/folder/hfile" and "folder/ifile" Notice that if
+ * startFrom is a non-existent entry name, then the
+ * list response contains all entries after this
+ * non-existent entry in lexical order: listStatus
+ * (Path("/folder"), "cfile") will return
+ * "/folder/hfile" and "/folder/ifile".
+ * @return the entries in the path start from "startFrom" in lexical order.
+ * @throws IOException in case of error
+ */
+ FileStatus[] listStatus(Path path, String startFrom) throws IOException;
+
+ /**
+ * @param path The list path
+ * @param startFrom The entry name that list results should start with.
+ * For example, if folder "/folder" contains four
+ * files: "afile", "bfile", "hfile", "ifile". Then
+ * listStatus(Path("/folder"), "hfile") will return
+ * "/folder/hfile" and "folder/ifile" Notice that if
+ * startFrom is a non-existent entry name, then the
+ * list response contains all entries after this
+ * non-existent entry in lexical order: listStatus
+ * (Path("/folder"), "cfile") will return
+ * "/folder/hfile" and "/folder/ifile".
+ * @param fileStatuses This list has to be filled with the FileStatus objects
+ * @param fetchAll flag to indicate if the above list needs to be
+ * filled with just one page os results or the entire
+ * result.
+ * @param continuation Contiuation token. null means start rom the begining.
+ * @return Continuation tokem
+ * @throws IOException in case of error
+ */
+ String listStatus(Path path, String startFrom, List<FileStatus> fileStatuses,
+ boolean fetchAll, String continuation) throws IOException;
+}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
new file mode 100644
index 0000000..6d5e4cf
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test ListStatusRemoteIterator operation.
+ */
+public class ITestAbfsListStatusRemoteIterator extends
AbstractAbfsIntegrationTest {
+
+ private static final int TEST_FILES_NUMBER = 1000;
+
+ public ITestAbfsListStatusRemoteIterator() throws Exception {
+ }
+
+ @Test
+ public void testAbfsIteratorWithHasNext() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
+ RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
+ getFileSystem().getFileStatus(testDir), listngSupport);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should be instance of "
+ + "AbfsListStatusRemoteIterator by default")
+ .isInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ while (fsItr.hasNext()) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ int minNumberOfInvokations = TEST_FILES_NUMBER / 10;
+ verify(listngSupport, Mockito.atLeast(minNumberOfInvokations))
+ .listStatus(any(Path.class), nullable(String.class),
+ anyList(), anyBoolean(),
+ nullable(String.class));
+ }
+
+ @Test
+ public void testAbfsIteratorWithoutHasNext() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
+ RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
+ getFileSystem().getFileStatus(testDir), listngSupport);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should be instance of "
+ + "AbfsListStatusRemoteIterator by default")
+ .isInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ for (int i = 0; i < TEST_FILES_NUMBER; i++) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException since next has been "
+ + "called " + TEST_FILES_NUMBER + " times")
+ .isInstanceOf(NoSuchElementException.class);
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ int minNumberOfInvokations = TEST_FILES_NUMBER / 10;
+ verify(listngSupport, Mockito.atLeast(minNumberOfInvokations))
+ .listStatus(any(Path.class), nullable(String.class),
+ anyList(), anyBoolean(),
+ nullable(String.class));
+ }
+
+ @Test
+ public void testWithAbfsIteratorDisabled() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ setEnableAbfsIterator(false);
+ final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ RemoteIterator<FileStatus> fsItr =
+ getFileSystem().listStatusIterator(testDir);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should not be instance of "
+ + "AbfsListStatusRemoteIterator when it is disabled")
+ .isNotInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ while (fsItr.hasNext()) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testWithAbfsIteratorDisabledWithoutHasNext() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ setEnableAbfsIterator(false);
+ final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ RemoteIterator<FileStatus> fsItr =
+ getFileSystem().listStatusIterator(testDir);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should not be instance of "
+ + "AbfsListStatusRemoteIterator when it is disabled")
+ .isNotInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ for (int i = 0; i < TEST_FILES_NUMBER; i++) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException since next has been "
+ + "called " + TEST_FILES_NUMBER + " times")
+ .isInstanceOf(NoSuchElementException.class);
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testNextWhenNoMoreElementsPresent() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ RemoteIterator fsItr =
+ new
AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
+ getFileSystem().getAbfsStore());
+ fsItr = Mockito.spy(fsItr);
+ Mockito.doReturn(false).when(fsItr).hasNext();
+
+ RemoteIterator<FileStatus> finalFsItr = fsItr;
+ Assertions.assertThatThrownBy(() -> finalFsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException if hasNext() return "
+ + "false")
+ .isInstanceOf(NoSuchElementException.class);
+ }
+
+ @Test
+ public void testHasNextForEmptyDir() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ RemoteIterator<FileStatus> fsItr = getFileSystem()
+ .listStatusIterator(testDir);
+ Assertions.assertThat(fsItr.hasNext())
+ .describedAs("hasNext returns false for empty directory")
+ .isFalse();
+ }
+
+ @Test
+ public void testHasNextForFile() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ String testFileName = "testFile";
+ Path testFile = new Path(testFileName);
+ getFileSystem().create(testFile);
+ setPageSize(10);
+ RemoteIterator<FileStatus> fsItr = fs.listStatusIterator(testFile);
+ Assertions.assertThat(fsItr.hasNext())
+ .describedAs("hasNext returns true for file").isTrue();
+ Assertions.assertThat(fsItr.next().getPath().toString())
+ .describedAs("next returns the file itself")
+ .endsWith(testFileName);
+ }
+
+ @Test
+ public void testIOException() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ getFileSystem().mkdirs(testDir);
+
+ String exceptionMessage = "test exception";
+ ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
+ RemoteIterator fsItr =
+ new
AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
+ lsSupport);
+
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "When ioException is not null and queue is empty exception should be "
+ + "thrown")
+ .isInstanceOf(IOException.class)
+ .hasMessage(exceptionMessage);
+ }
+
+ @Test
+ public void testNonExistingPath() throws Throwable {
+ Path nonExistingDir = new Path("nonExistingPath");
+ Assertions.assertThatThrownBy(
+ () -> getFileSystem().listStatusIterator(nonExistingDir)).describedAs(
+ "test the listStatusIterator call on a path which is not "
+ + "present should result in FileNotFoundException")
+ .isInstanceOf(FileNotFoundException.class);
+ }
+
+ private ListingSupport getMockListingSupport(String exceptionMessage) {
+ return new ListingSupport() {
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path, String startFrom)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public String listStatus(Path path, String startFrom,
+ List<FileStatus> fileStatuses, boolean fetchAll, String continuation)
+ throws IOException {
+ throw new IOException(exceptionMessage);
+ }
+ };
+ }
+
+ private Path createTestDirectory() throws IOException {
+ String testDirectoryName = "testDirectory" + System.currentTimeMillis();
+ Path testDirectory = new Path(testDirectoryName);
+ getFileSystem().mkdirs(testDirectory);
+ return testDirectory;
+ }
+
+ private void setEnableAbfsIterator(boolean shouldEnable) throws IOException {
+ AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem());
+ abfsStore.getAbfsConfiguration().setEnableAbfsListIterator(shouldEnable);
+ }
+
+ private void setPageSize(int pageSize) throws IOException {
+ AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem());
+ abfsStore.getAbfsConfiguration().setListMaxResults(pageSize);
+ }
+
+ private List<String> createFilesUnderDirectory(int numFiles, Path rootPath,
+ String filenamePrefix)
+ throws ExecutionException, InterruptedException, IOException {
+ final List<Future<Void>> tasks = new ArrayList<>();
+ final List<String> fileNames = new ArrayList<>();
+ ExecutorService es = Executors.newFixedThreadPool(10);
+ try {
+ for (int i = 0; i < numFiles; i++) {
+ final Path filePath = new Path(rootPath, filenamePrefix + i);
+ Callable<Void> callable = () -> {
+ getFileSystem().create(filePath);
+ fileNames.add(makeQualified(filePath).toString());
+ return null;
+ };
+ tasks.add(es.submit(callable));
+ }
+ for (Future<Void> task : tasks) {
+ task.get();
+ }
+ } finally {
+ es.shutdownNow();
+ }
+ return fileNames;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]