This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit ae2e3524d1ab72fcbe8b651ae446fd8334137cc1
Author: peacewong <[email protected]>
AuthorDate: Tue Mar 19 21:57:13 2024 +0800

    For FS opened with public tenants, we should not perform close action
---
 .../apache/linkis/hadoop/common/conf/HadoopConf.scala    |  9 +++++++++
 .../hadoop/common/entity/HDFSFileSystemContainer.scala   |  3 +--
 .../apache/linkis/hadoop/common/utils/HDFSUtils.scala    | 16 ++++++++++++----
 3 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
 
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
index b3e5cf2024..16fb45ee84 100644
--- 
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
+++ 
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
@@ -53,8 +53,17 @@ object HadoopConf {
   val HADOOP_EXTERNAL_CONF_DIR_PREFIX =
     CommonVars("wds.linkis.hadoop.external.conf.dir.prefix", 
"/appcom/config/external-conf/hadoop")
 
+  /**
+   * Whether to close the hdfs underlying cache or turn it off if it is ture
+   */
+  val FS_CACHE_DISABLE =
+    CommonVars[java.lang.Boolean]("wds.linkis.fs.hdfs.impl.disable.cache", 
false)
+
   val HDFS_ENABLE_CACHE = CommonVars("wds.linkis.hadoop.hdfs.cache.enable", 
false).getValue
 
+  val HDFS_ENABLE_CACHE_CLOSE =
+    CommonVars("linkis.hadoop.hdfs.cache.close.enable", true).getValue
+
   val HDFS_ENABLE_CACHE_IDLE_TIME =
     CommonVars("wds.linkis.hadoop.hdfs.cache.idle.time", 3 * 60 * 
1000).getValue
 
diff --git 
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
 
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
index 6b4eaaeceb..f87f89393e 100644
--- 
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
+++ 
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
@@ -48,8 +48,7 @@ class HDFSFileSystemContainer(fs: FileSystem, user: String, 
label: String) {
   def canRemove(): Boolean = {
     val currentTime = System.currentTimeMillis()
     val idleTime = currentTime - this.lastAccessTime
-    idleTime > HadoopConf.HDFS_ENABLE_CACHE_MAX_TIME || (System
-      .currentTimeMillis() - this.lastAccessTime > 
HadoopConf.HDFS_ENABLE_CACHE_IDLE_TIME) && count <= 0
+    idleTime > HadoopConf.HDFS_ENABLE_CACHE_MAX_TIME || ((idleTime > 
HadoopConf.HDFS_ENABLE_CACHE_IDLE_TIME) && count <= 0)
   }
 
 }
diff --git 
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
 
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
index f2a615e996..1a6951054e 100644
--- 
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
+++ 
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
@@ -31,21 +31,29 @@ import org.apache.hadoop.security.UserGroupInformation
 import java.io.File
 import java.nio.file.Paths
 import java.security.PrivilegedExceptionAction
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 
 object HDFSUtils extends Logging {
 
   private val fileSystemCache: java.util.Map[String, HDFSFileSystemContainer] =
-    new java.util.HashMap[String, HDFSFileSystemContainer]()
+    new ConcurrentHashMap[String, HDFSFileSystemContainer]()
 
   private val LOCKER_SUFFIX = "_HDFS"
   private val DEFAULT_CACHE_LABEL = "default"
   private val JOINT = "_"
 
-  if (HadoopConf.HDFS_ENABLE_CACHE) {
-    logger.info("HDFS Cache enabled ")
+  private val count = new AtomicLong
+
+  /**
+   * For FS opened with public tenants, we should not perform close action, 
but should close only
+   * when hdfsfilesystem encounters closed problem
+   * 对于使用公共租户开启的FS,我们不应该去执行close动作,应该由hdfsfilesystem遇到closed问题时才进行关闭
+   */
+  if (HadoopConf.HDFS_ENABLE_CACHE && HadoopConf.HDFS_ENABLE_CACHE_CLOSE) {
+    logger.info("HDFS Cache clear enabled ")
     Utils.defaultScheduler.scheduleAtFixedRate(
       new Runnable {
         override def run(): Unit = Utils.tryAndWarn {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to