This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit ae2e3524d1ab72fcbe8b651ae446fd8334137cc1 Author: peacewong <[email protected]> AuthorDate: Tue Mar 19 21:57:13 2024 +0800 For FS opened with public tenants, we should not perform close action --- .../apache/linkis/hadoop/common/conf/HadoopConf.scala | 9 +++++++++ .../hadoop/common/entity/HDFSFileSystemContainer.scala | 3 +-- .../apache/linkis/hadoop/common/utils/HDFSUtils.scala | 16 ++++++++++++---- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala index b3e5cf2024..16fb45ee84 100644 --- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala +++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala @@ -53,8 +53,17 @@ object HadoopConf { val HADOOP_EXTERNAL_CONF_DIR_PREFIX = CommonVars("wds.linkis.hadoop.external.conf.dir.prefix", "/appcom/config/external-conf/hadoop") + /** + * Whether to close the hdfs underlying cache or turn it off if it is ture + */ + val FS_CACHE_DISABLE = + CommonVars[java.lang.Boolean]("wds.linkis.fs.hdfs.impl.disable.cache", false) + val HDFS_ENABLE_CACHE = CommonVars("wds.linkis.hadoop.hdfs.cache.enable", false).getValue + val HDFS_ENABLE_CACHE_CLOSE = + CommonVars("linkis.hadoop.hdfs.cache.close.enable", true).getValue + val HDFS_ENABLE_CACHE_IDLE_TIME = CommonVars("wds.linkis.hadoop.hdfs.cache.idle.time", 3 * 60 * 1000).getValue diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala index 6b4eaaeceb..f87f89393e 100644 --- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala +++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala @@ -48,8 +48,7 @@ class HDFSFileSystemContainer(fs: FileSystem, user: String, label: String) { def canRemove(): Boolean = { val currentTime = System.currentTimeMillis() val idleTime = currentTime - this.lastAccessTime - idleTime > HadoopConf.HDFS_ENABLE_CACHE_MAX_TIME || (System - .currentTimeMillis() - this.lastAccessTime > HadoopConf.HDFS_ENABLE_CACHE_IDLE_TIME) && count <= 0 + idleTime > HadoopConf.HDFS_ENABLE_CACHE_MAX_TIME || ((idleTime > HadoopConf.HDFS_ENABLE_CACHE_IDLE_TIME) && count <= 0) } } diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala index f2a615e996..1a6951054e 100644 --- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala +++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala @@ -31,21 +31,29 @@ import org.apache.hadoop.security.UserGroupInformation import java.io.File import java.nio.file.Paths import java.security.PrivilegedExceptionAction -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ object HDFSUtils extends Logging { private val fileSystemCache: java.util.Map[String, HDFSFileSystemContainer] = - new java.util.HashMap[String, HDFSFileSystemContainer]() + new ConcurrentHashMap[String, HDFSFileSystemContainer]() private val LOCKER_SUFFIX = "_HDFS" private val DEFAULT_CACHE_LABEL = "default" private val JOINT = "_" - if (HadoopConf.HDFS_ENABLE_CACHE) { - logger.info("HDFS Cache enabled ") + private val count = new AtomicLong + + /** + * For FS opened with public tenants, we should not perform close action, but should close only + * when hdfsfilesystem encounters closed problem + * 对于使用公共租户开启的FS,我们不应该去执行close动作,应该由hdfsfilesystem遇到closed问题时才进行关闭 + */ + if (HadoopConf.HDFS_ENABLE_CACHE && HadoopConf.HDFS_ENABLE_CACHE_CLOSE) { + logger.info("HDFS Cache clear enabled ") Utils.defaultScheduler.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryAndWarn { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
