This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 5ffcee8 HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701) 5ffcee8 is described below commit 5ffcee8979e1cfb043bec9b2c6d463911d5025dc Author: Mike <m.prya...@gmail.com> AuthorDate: Tue Feb 23 20:03:27 2021 +0300 HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701) Contributed by Mike Pryakhin. Change-Id: I59ef67c38c313f30c5e000b2fe41fcf715cf3a4b --- .../org/apache/hadoop/fs/sftp/SFTPFileSystem.java | 31 +++++++++++++++++++++- .../apache/hadoop/fs/sftp/TestSFTPFileSystem.java | 11 ++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java index 898f615..297ec04 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URLDecoder; import java.util.ArrayList; import java.util.Vector; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -50,6 +51,7 @@ public class SFTPFileSystem extends FileSystem { private SFTPConnectionPool connectionPool; private URI uri; + private final AtomicBoolean closed = new AtomicBoolean(false); private static final int DEFAULT_SFTP_PORT = 22; private static final int DEFAULT_MAX_CONNECTION = 5; @@ -83,6 +85,7 @@ public class SFTPFileSystem extends FileSystem { "Destination path %s already exist, cannot rename!"; public static final String E_FAILED_GETHOME = "Failed to get home directory"; public static final String E_FAILED_DISCONNECT = "Failed to disconnect"; + public static final String E_FS_CLOSED = "FileSystem is closed!"; /** * Set configuration from UI. @@ -138,8 +141,9 @@ public class SFTPFileSystem extends FileSystem { * @throws IOException */ private ChannelSftp connect() throws IOException { - Configuration conf = getConf(); + checkNotClosed(); + Configuration conf = getConf(); String host = conf.get(FS_SFTP_HOST, null); int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT); String user = conf.get(FS_SFTP_USER_PREFIX + host, null); @@ -703,6 +707,31 @@ public class SFTPFileSystem extends FileSystem { } } + @Override + public void close() throws IOException { + if (closed.getAndSet(true)) { + return; + } + try { + super.close(); + } finally { + if (connectionPool != null) { + connectionPool.shutdown(); + } + } + } + + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed.get()) { + throw new IOException(uri + ": " + E_FS_CLOSED); + } + } + @VisibleForTesting SFTPConnectionPool getConnectionPool() { return connectionPool; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java index 6939262..58452f8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java @@ -374,4 +374,15 @@ public class TestSFTPFileSystem { assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), is(1)); } + + @Test + public void testCloseFileSystemClosesConnectionPool() throws Exception { + SFTPFileSystem fs = (SFTPFileSystem) sftpFs; + fs.getHomeDirectory(); + assertThat(fs.getConnectionPool().getLiveConnCount(), is(1)); + fs.close(); + assertThat(fs.getConnectionPool().getLiveConnCount(), is(0)); + ///making sure that re-entrant close calls are safe + fs.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org