This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 91d086185c112aeeeaa8a21e79c6fdf228e791d7 Author: peacewong <[email protected]> AuthorDate: Tue Oct 10 20:41:36 2023 +0800 add init fs retry --- .../io/executor/IoEngineConnExecutor.scala | 46 +++++++++++++++------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala index 0facea45a..9055834f6 100644 --- a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala @@ -29,6 +29,7 @@ import org.apache.linkis.manager.common.entity.resource.{ LoadResource, NodeResource } +import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils import org.apache.linkis.manager.engineplugin.io.conf.IOEngineConnConfiguration import org.apache.linkis.manager.engineplugin.io.domain.FSInfo @@ -283,29 +284,44 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10) val fsType = methodEntity.getFsType val proxyUser = methodEntity.getProxyUser if (!userFSInfos.containsKey(proxyUser)) { - throw new StorageErrorException( - StorageErrorCode.FS_NOT_INIT.getCode, - s"not exist storage $fsType, please init first." - ) + if (methodEntity.getId != -1) { + createUserFS(methodEntity) + } else { + throw new StorageErrorException( + StorageErrorCode.FS_NOT_INIT.getCode, + s"not exist storage $fsType, ${StorageErrorCode.FS_NOT_INIT.getMessage}" + ) + } } + var fs: Fs = null userFSInfos.get(proxyUser) synchronized { - val userFsInfo = userFSInfos + val userFsInfoOption = userFSInfos .get(proxyUser) .find(fsInfo => fsInfo != null && fsInfo.id == methodEntity.getId) - .getOrElse( - throw new StorageErrorException( - StorageErrorCode.FS_NOT_INIT.getCode, - s"not exist storage $fsType, please init first." - ) + if (userFsInfoOption.isDefined) { + val userFsInfo = userFsInfoOption.get + userFsInfo.lastAccessTime = System.currentTimeMillis() + fs = userFsInfo.fs + } + } + if (null == fs) { + if (methodEntity.getId != -1) { + createUserFS(methodEntity) + getUserFS(methodEntity) + } else { + throw new StorageErrorException( + StorageErrorCode.FS_NOT_INIT.getCode, + s"not exist storage $fsType, ${StorageErrorCode.FS_NOT_INIT.getMessage}" ) - userFsInfo.lastAccessTime = System.currentTimeMillis() - userFsInfo.fs + } + } else { + fs } } private def createUserFS(methodEntity: MethodEntity): Long = { logger.info( - s"Creator ${methodEntity.getCreatorUser}准备为用户${methodEntity.getProxyUser}初始化FS:$methodEntity" + s"Creator ${methodEntity.getCreatorUser} for user ${methodEntity.getProxyUser} init fs:$methodEntity" ) var fsId = methodEntity.getId val properties = methodEntity.getParams()(0).asInstanceOf[Map[String, String]] @@ -330,12 +346,12 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10) if (!userFsInfo.exists(fsInfo => fsInfo != null && fsInfo.id == methodEntity.getId)) { val fs = FSFactory.getFs(methodEntity.getFsType) fs.init(properties.asJava) - fsId = getFSId() + fsId = if (fsId == -1) getFSId() else fsId userFsInfo += new FSInfo(fsId, fs) } } logger.info( - s"Creator ${methodEntity.getCreatorUser}为用户${methodEntity.getProxyUser}初始化结束 fsId=$fsId" + s"Creator ${methodEntity.getCreatorUser}for user ${methodEntity.getProxyUser} end init fs fsId=$fsId" ) fsId } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
