This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 509b10defae HADOOP-19650. [ABFS] Fixing NPE when close() called on uninitialized AzureBlobFileSystem (#7880) (#7889) 509b10defae is described below commit 509b10defae6aaea1ffd929355f673e096f9a6cc Author: Anuj Modi <anujmodi2...@gmail.com> AuthorDate: Thu Aug 21 09:50:47 2025 +0000 HADOOP-19650. [ABFS] Fixing NPE when close() called on uninitialized AzureBlobFileSystem (#7880) (#7889) Contributed by Anuj Modi --- .../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 81 +++++++++------ .../hadoop/fs/azurebfs/services/AbfsErrors.java | 1 + .../fs/azurebfs/ITestFileSystemInitialization.java | 112 ++++++++++++++++++++- 3 files changed, 160 insertions(+), 34 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 369f68cf722..7b1f798efe0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -131,6 +131,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_INVALID_ABFS_STATE; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_SAS; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; @@ -148,7 +149,11 @@ public class AzureBlobFileSystem extends FileSystem private URI uri; private Path workingDir; private AzureBlobFileSystemStore abfsStore; - private boolean isClosed; + + /** + * Flag to indicate whether the file system is closed or not initiated. + */ + private boolean isClosed = true; private final String fileSystemId = UUID.randomUUID().toString(); private boolean delegationTokenEnabled = false; @@ -312,6 +317,7 @@ public void initialize(URI uri, Configuration configuration) } rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit()); + isClosed = false; LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri); } @@ -329,8 +335,8 @@ public String toString() { final StringBuilder sb = new StringBuilder( "AzureBlobFileSystem{"); sb.append("uri=").append(fullPathUri); - sb.append(", user='").append(abfsStore.getUser()).append('\''); - sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); + sb.append(", user='").append(getAbfsStore().getUser()).append('\''); + sb.append(", primaryUserGroup='").append(getAbfsStore().getPrimaryGroup()).append('\''); sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]"); sb.append('}'); return sb.toString(); @@ -354,7 +360,7 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx // bufferSize is unused. LOG.debug( "AzureBlobFileSystem.open path: {} bufferSize as configured in 'fs.azure.read.request.size': {}", - path, abfsStore.getAbfsConfiguration().getReadBufferSize()); + path, getAbfsStore().getAbfsConfiguration().getReadBufferSize()); return open(path, Optional.empty()); } @@ -517,7 +523,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.APPEND, tracingHeaderFormat, listener); - OutputStream outputStream = abfsStore + OutputStream outputStream = getAbfsStore() .openFileForWrite(qualifiedPath, statistics, false, tracingContext); return new FSDataOutputStream(outputStream, statistics); } catch (AzureBlobFileSystemException ex) { @@ -782,7 +788,7 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.MKDIR, false, tracingHeaderFormat, listener); - abfsStore.createDirectory(qualifiedPath, + getAbfsStore().createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission, FsPermission.getUMask(getConf()), tracingContext); statIncrement(DIRECTORIES_CREATED); @@ -795,10 +801,10 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce @Override public synchronized void close() throws IOException { - if (isClosed) { + if (isClosed()) { return; } - if (abfsStore.getClient().isMetricCollectionEnabled()) { + if (getAbfsStore().getClient().isMetricCollectionEnabled()) { TracingContext tracingMetricContext = new TracingContext( clientCorrelationId, fileSystemId, FSOperationType.GET_ATTR, true, @@ -819,7 +825,7 @@ public synchronized void close() throws IOException { IOSTATISTICS_LOGGING_LEVEL_DEFAULT); logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics()); } - IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager, + IOUtils.cleanupWithLogger(LOG, getAbfsStore(), delegationTokenManager, getAbfsClient()); this.isClosed = true; if (LOG.isDebugEnabled()) { @@ -866,7 +872,7 @@ public void breakLease(final Path f) throws IOException { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.BREAK_LEASE, tracingHeaderFormat, listener); - abfsStore.breakLease(qualifiedPath, tracingContext); + getAbfsStore().breakLease(qualifiedPath, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(f, ex); } @@ -884,6 +890,8 @@ public void breakLease(final Path f) throws IOException { */ @Override public Path makeQualified(Path path) { + // Every API works on qualified paths. If store is null better to fail early. + Preconditions.checkState(getAbfsStore() != null); // To support format: abfs://{dfs.nameservices}/file/path, // path need to be first converted to URI, then get the raw path string, // during which {dfs.nameservices} will be omitted. @@ -917,7 +925,7 @@ public String getScheme() { public Path getHomeDirectory() { return makeQualified(new Path( FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX - + "/" + abfsStore.getUser())); + + "/" + getAbfsStore().getUser())); } /** @@ -939,7 +947,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, if (file.getLen() < start) { return new BlockLocation[0]; } - final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); + final String blobLocationHost = getAbfsStore().getAbfsConfiguration().getAzureBlockLocationHost(); final String[] name = {blobLocationHost}; final String[] host = {blobLocationHost}; @@ -973,7 +981,7 @@ protected void finalize() throws Throwable { * @return the short name of the user who instantiated the FS */ public String getOwnerUser() { - return abfsStore.getUser(); + return getAbfsStore().getUser(); } /** @@ -981,7 +989,7 @@ public String getOwnerUser() { * @return primary group name */ public String getOwnerUserPrimaryGroup() { - return abfsStore.getPrimaryGroup(); + return getAbfsStore().getPrimaryGroup(); } private boolean deleteRoot() throws IOException { @@ -1053,7 +1061,7 @@ public void setOwner(final Path path, final String owner, final String group) Path qualifiedPath = makeQualified(path); try { - abfsStore.setOwner(qualifiedPath, + getAbfsStore().setOwner(qualifiedPath, owner, group, tracingContext); @@ -1090,15 +1098,15 @@ public void setXAttr(final Path path, TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.SET_ATTR, true, tracingHeaderFormat, listener); - Hashtable<String, String> properties = abfsStore + Hashtable<String, String> properties = getAbfsStore() .getPathStatus(qualifiedPath, tracingContext); String xAttrName = ensureValidAttributeName(name); boolean xAttrExists = properties.containsKey(xAttrName); XAttrSetFlag.validate(name, xAttrExists, flag); - String xAttrValue = abfsStore.decodeAttribute(value); + String xAttrValue = getAbfsStore().decodeAttribute(value); properties.put(xAttrName, xAttrValue); - abfsStore.setPathProperties(qualifiedPath, properties, tracingContext); + getAbfsStore().setPathProperties(qualifiedPath, properties, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -1130,12 +1138,12 @@ public byte[] getXAttr(final Path path, final String name) TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.GET_ATTR, true, tracingHeaderFormat, listener); - Hashtable<String, String> properties = abfsStore + Hashtable<String, String> properties = getAbfsStore() .getPathStatus(qualifiedPath, tracingContext); String xAttrName = ensureValidAttributeName(name); if (properties.containsKey(xAttrName)) { String xAttrValue = properties.get(xAttrName); - value = abfsStore.encodeAttribute(xAttrValue); + value = getAbfsStore().encodeAttribute(xAttrValue); } } catch (AzureBlobFileSystemException ex) { checkException(path, ex); @@ -1173,7 +1181,7 @@ public void setPermission(final Path path, final FsPermission permission) Path qualifiedPath = makeQualified(path); try { - abfsStore.setPermission(qualifiedPath, permission, tracingContext); + getAbfsStore().setPermission(qualifiedPath, permission, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -1210,7 +1218,7 @@ public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) Path qualifiedPath = makeQualified(path); try { - abfsStore.modifyAclEntries(qualifiedPath, aclSpec, tracingContext); + getAbfsStore().modifyAclEntries(qualifiedPath, aclSpec, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -1245,7 +1253,7 @@ public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) Path qualifiedPath = makeQualified(path); try { - abfsStore.removeAclEntries(qualifiedPath, aclSpec, tracingContext); + getAbfsStore().removeAclEntries(qualifiedPath, aclSpec, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -1273,7 +1281,7 @@ public void removeDefaultAcl(final Path path) throws IOException { Path qualifiedPath = makeQualified(path); try { - abfsStore.removeDefaultAcl(qualifiedPath, tracingContext); + getAbfsStore().removeDefaultAcl(qualifiedPath, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -1303,7 +1311,7 @@ public void removeAcl(final Path path) throws IOException { Path qualifiedPath = makeQualified(path); try { - abfsStore.removeAcl(qualifiedPath, tracingContext); + getAbfsStore().removeAcl(qualifiedPath, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -1340,7 +1348,7 @@ public void setAcl(final Path path, final List<AclEntry> aclSpec) Path qualifiedPath = makeQualified(path); try { - abfsStore.setAcl(qualifiedPath, aclSpec, tracingContext); + getAbfsStore().setAcl(qualifiedPath, aclSpec, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -1368,7 +1376,7 @@ public AclStatus getAclStatus(final Path path) throws IOException { Path qualifiedPath = makeQualified(path); try { - return abfsStore.getAclStatus(qualifiedPath, tracingContext); + return getAbfsStore().getAclStatus(qualifiedPath, tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); return null; @@ -1395,7 +1403,7 @@ public void access(final Path path, final FsAction mode) throws IOException { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.ACCESS, tracingHeaderFormat, listener); - this.abfsStore.access(qualifiedPath, mode, tracingContext); + this.getAbfsStore().access(qualifiedPath, mode, tracingContext); } catch (AzureBlobFileSystemException ex) { checkCheckAccessException(path, ex); } @@ -1417,11 +1425,11 @@ public boolean exists(Path f) throws IOException { public RemoteIterator<FileStatus> listStatusIterator(Path path) throws IOException { LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path); - if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) { + if (getAbfsStore().getAbfsConfiguration().enableAbfsListIterator()) { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener); AbfsListStatusRemoteIterator abfsLsItr = - new AbfsListStatusRemoteIterator(path, abfsStore, + new AbfsListStatusRemoteIterator(path, getAbfsStore(), tracingContext); return RemoteIterators.typeCastingRemoteIterator(abfsLsItr); } else { @@ -1503,7 +1511,7 @@ private boolean fileSystemExists() throws IOException { try { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.TEST_OP, tracingHeaderFormat, listener); - abfsStore.getFilesystemProperties(tracingContext); + getAbfsStore().getFilesystemProperties(tracingContext); } catch (AzureBlobFileSystemException ex) { try { checkException(null, ex); @@ -1522,7 +1530,7 @@ private void createFileSystem(TracingContext tracingContext) throws IOException LOG.debug( "AzureBlobFileSystem.createFileSystem uri: {}", uri); try { - abfsStore.createFilesystem(tracingContext); + getAbfsStore().createFilesystem(tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(null, ex); } @@ -1745,14 +1753,21 @@ public boolean failed() { @VisibleForTesting public AzureBlobFileSystemStore getAbfsStore() { + if (abfsStore == null) { + throw new IllegalStateException(ERR_INVALID_ABFS_STATE); + } return abfsStore; } @VisibleForTesting AbfsClient getAbfsClient() { - return abfsStore.getClient(); + return getAbfsStore().getClient(); } + @VisibleForTesting + boolean isClosed() { + return isClosed; + } /** * Get any Delegation Token manager created by the filesystem. * @return the DT manager or null. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java index 00862931105..930a785e4aa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java @@ -77,5 +77,6 @@ public final class AbfsErrors { public static final String ERR_BLOB_LIST_PARSING = "Parsing of XML List Response Failed in BlobClient."; public static final String ERR_DFS_LIST_PARSING = "Parsing of Json List Response Failed in DfsClient."; public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem."; + public static final String ERR_INVALID_ABFS_STATE = "Invalid state for AzureBlobFilesystem. Either Filesystem was closed or not initialized."; private AbfsErrors() {} } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java index f7d4a5b7a83..4aa966f0009 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.azurebfs; import java.net.URI; +import java.util.ArrayList; +import java.util.EnumSet; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -27,14 +29,20 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.services.AuthType; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME; import static org.apache.hadoop.fs.CommonPathCapabilities.FS_ACLS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; -import static org.junit.Assume.assumeTrue; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_INVALID_ABFS_STATE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Test AzureBlobFileSystem initialization. @@ -106,4 +114,106 @@ public void testFileSystemCapabilities() throws Throwable { FS_ACLS, acls, fs) .isEqualTo(acls); } + + /** + * Test that the AzureBlobFileSystem close without init works + * @throws Exception if an error occurs + */ + @Test + public void testABFSCloseWithoutInit() throws Exception { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + Assertions.assertThat(fs.isClosed()).isTrue(); + fs.close(); + fs.initialize(this.getFileSystem().getUri(), getRawConfiguration()); + Assertions.assertThat(fs.isClosed()).isFalse(); + fs.close(); + Assertions.assertThat(fs.isClosed()).isTrue(); + } + + /** + * Test that the AzureBlobFileSystem throws an exception + * when trying to perform an operation without initialization. + * @throws Exception if an error occurs + */ + @Test + public void testABFSUninitializedFileSystem() throws Exception { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + Assertions.assertThat(fs.isClosed()).isTrue(); + Path testPath = new Path("testPath"); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + fs::toString); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.open(testPath, ONE_MB)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.create(testPath, FsPermission.getDefault(), false, ONE_MB, + fs.getDefaultReplication(testPath), ONE_MB, null)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.createNonRecursive(testPath, FsPermission.getDefault(), false, ONE_MB, + fs.getDefaultReplication(testPath), ONE_MB, null)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.append(testPath, ONE_MB, null)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.rename(testPath, testPath)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.delete(testPath, true)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.listStatus(testPath)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.mkdirs(testPath, FsPermission.getDefault())); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.getFileStatus(testPath)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.breakLease(testPath)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.makeQualified(testPath)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.setOwner(testPath, EMPTY_STRING, EMPTY_STRING)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.setXAttr(testPath, "xattr", new byte[0], + EnumSet.of(XAttrSetFlag.CREATE))); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.getXAttr(testPath, "xattr")); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.setPermission(testPath, FsPermission.getDefault())); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.modifyAclEntries(testPath, new ArrayList<>())); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.removeAclEntries(testPath, new ArrayList<>())); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.removeDefaultAcl(testPath)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.removeAcl(testPath)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.setAcl(testPath, new ArrayList<>())); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.getAclStatus(testPath)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.access(testPath, FsAction.ALL)); + + intercept(IllegalStateException.class, ERR_INVALID_ABFS_STATE, + () -> fs.exists(testPath)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org