This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 91d086185c112aeeeaa8a21e79c6fdf228e791d7
Author: peacewong <[email protected]>
AuthorDate: Tue Oct 10 20:41:36 2023 +0800

    add init fs retry
---
 .../io/executor/IoEngineConnExecutor.scala         | 46 +++++++++++++++-------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
index 0facea45a..9055834f6 100644
--- 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
@@ -29,6 +29,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   LoadResource,
   NodeResource
 }
+import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
 import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.engineplugin.io.conf.IOEngineConnConfiguration
 import org.apache.linkis.manager.engineplugin.io.domain.FSInfo
@@ -283,29 +284,44 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: 
Int = 10)
     val fsType = methodEntity.getFsType
     val proxyUser = methodEntity.getProxyUser
     if (!userFSInfos.containsKey(proxyUser)) {
-      throw new StorageErrorException(
-        StorageErrorCode.FS_NOT_INIT.getCode,
-        s"not exist storage $fsType, please init first."
-      )
+      if (methodEntity.getId != -1) {
+        createUserFS(methodEntity)
+      } else {
+        throw new StorageErrorException(
+          StorageErrorCode.FS_NOT_INIT.getCode,
+          s"not exist storage $fsType, 
${StorageErrorCode.FS_NOT_INIT.getMessage}"
+        )
+      }
     }
+    var fs: Fs = null
     userFSInfos.get(proxyUser) synchronized {
-      val userFsInfo = userFSInfos
+      val userFsInfoOption = userFSInfos
         .get(proxyUser)
         .find(fsInfo => fsInfo != null && fsInfo.id == methodEntity.getId)
-        .getOrElse(
-          throw new StorageErrorException(
-            StorageErrorCode.FS_NOT_INIT.getCode,
-            s"not exist storage $fsType, please init first."
-          )
+      if (userFsInfoOption.isDefined) {
+        val userFsInfo = userFsInfoOption.get
+        userFsInfo.lastAccessTime = System.currentTimeMillis()
+        fs = userFsInfo.fs
+      }
+    }
+    if (null == fs) {
+      if (methodEntity.getId != -1) {
+        createUserFS(methodEntity)
+        getUserFS(methodEntity)
+      } else {
+        throw new StorageErrorException(
+          StorageErrorCode.FS_NOT_INIT.getCode,
+          s"not exist storage $fsType, 
${StorageErrorCode.FS_NOT_INIT.getMessage}"
         )
-      userFsInfo.lastAccessTime = System.currentTimeMillis()
-      userFsInfo.fs
+      }
+    } else {
+      fs
     }
   }
 
   private def createUserFS(methodEntity: MethodEntity): Long = {
     logger.info(
-      s"Creator 
${methodEntity.getCreatorUser}准备为用户${methodEntity.getProxyUser}初始化FS:$methodEntity"
+      s"Creator ${methodEntity.getCreatorUser} for user 
${methodEntity.getProxyUser} init fs:$methodEntity"
     )
     var fsId = methodEntity.getId
     val properties = methodEntity.getParams()(0).asInstanceOf[Map[String, 
String]]
@@ -330,12 +346,12 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: 
Int = 10)
       if (!userFsInfo.exists(fsInfo => fsInfo != null && fsInfo.id == 
methodEntity.getId)) {
         val fs = FSFactory.getFs(methodEntity.getFsType)
         fs.init(properties.asJava)
-        fsId = getFSId()
+        fsId = if (fsId == -1) getFSId() else fsId
         userFsInfo += new FSInfo(fsId, fs)
       }
     }
     logger.info(
-      s"Creator 
${methodEntity.getCreatorUser}为用户${methodEntity.getProxyUser}初始化结束 fsId=$fsId"
+      s"Creator ${methodEntity.getCreatorUser}for user 
${methodEntity.getProxyUser} end init fs fsId=$fsId"
     )
     fsId
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to