This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new b6d02af0f Extract codes related to appending memory unit to 
NodeResourceUtils (#4143)
b6d02af0f is described below

commit b6d02af0f5937c7dc136bf262b99e0f1576313c4
Author: QuantumXiecao <[email protected]>
AuthorDate: Sun Feb 5 23:47:11 2023 +0800

    Extract codes related to appending memory unit to NodeResourceUtils (#4143)
    
    Co-authored-by: xiecao <[email protected]>
---
 .../engineplugin/common/EngineConnPlugin.scala     |  6 +++---
 .../resource/GenericEngineResourceFactory.scala    | 25 +++-------------------
 .../NodeResourceUtils.scala}                       | 16 +++++++++-----
 .../executor/ElasticSearchEngineConnExecutor.scala | 16 +++++---------
 .../hive/executor/HiveEngineConnExecutor.scala     | 22 +++++--------------
 .../io/executor/IoEngineConnExecutor.scala         | 16 +++++---------
 .../jdbc/executor/JDBCEngineConnExecutor.scala     | 20 +++++------------
 .../executor/PipelineEngineConnExecutor.scala      | 23 +++++---------------
 .../presto/executor/PrestoEngineConnExecutor.scala | 15 +++++--------
 .../python/executor/PythonEngineConnExecutor.scala | 23 +++++---------------
 .../executor/SeatunnelFlinkOnceCodeExecutor.scala  | 21 +++++-------------
 .../SeatunnelFlinkSQLOnceCodeExecutor.scala        | 21 +++++-------------
 .../executor/SeatunnelSparkOnceCodeExecutor.scala  | 21 +++++-------------
 .../shell/executor/ShellEngineConnExecutor.scala   | 23 +++++---------------
 .../trino/executor/TrinoEngineConnExecutor.scala   | 16 +++++---------
 15 files changed, 77 insertions(+), 207 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/EngineConnPlugin.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/EngineConnPlugin.scala
index d8562c9d1..9e0f06ef9 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/EngineConnPlugin.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/EngineConnPlugin.scala
@@ -22,11 +22,11 @@ import 
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuil
 import 
org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory
 import org.apache.linkis.manager.label.entity.Label
 
-import java.util
+import java.util.{List, Map}
 
 trait EngineConnPlugin {
 
-  def init(params: util.Map[String, AnyRef])
+  def init(params: Map[String, AnyRef])
 
   def getEngineResourceFactory: EngineResourceFactory
 
@@ -34,6 +34,6 @@ trait EngineConnPlugin {
 
   def getEngineConnFactory: EngineConnFactory
 
-  def getDefaultLabels: util.List[Label[_]]
+  def getDefaultLabels: List[Label[_]]
 
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
index d89318778..6aadd6cdd 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
@@ -18,34 +18,15 @@
 package org.apache.linkis.manager.engineplugin.common.resource
 
 import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.manager.common.entity.resource.{LoadInstanceResource, 
Resource}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
-
-import org.apache.commons.lang3.StringUtils
+import org.apache.linkis.manager.common.entity.resource.Resource
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 
 import java.util
 
 class GenericEngineResourceFactory extends AbstractEngineResourceFactory with 
Logging {
 
   override protected def getRequestResource(properties: util.Map[String, 
String]): Resource = {
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (StringUtils.isBlank(settingClientMemory)) {
-        properties.remove(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      } else if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
-
-    new LoadInstanceResource(
-      
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
-    )
+    NodeResourceUtils.applyAsLoadInstanceResource(properties)
   }
 
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/util/NodeResourceUtils.scala
similarity index 79%
copy from 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
copy to 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/util/NodeResourceUtils.scala
index d89318778..be4e56805 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/util/NodeResourceUtils.scala
@@ -15,19 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.common.resource
+package org.apache.linkis.manager.engineplugin.common.util
 
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.manager.common.entity.resource.{LoadInstanceResource, 
Resource}
+import org.apache.linkis.manager.common.entity.resource.LoadInstanceResource
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
 
 import org.apache.commons.lang3.StringUtils
 
 import java.util
+import java.util.Objects
 
-class GenericEngineResourceFactory extends AbstractEngineResourceFactory with 
Logging {
+object NodeResourceUtils {
+
+  def appendMemoryUnitIfMissing(properties: util.Map[String, String]): Unit = {
+    Objects.requireNonNull(properties);
 
-  override protected def getRequestResource(properties: util.Map[String, 
String]): Resource = {
     if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
       val settingClientMemory =
         properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
@@ -40,6 +42,10 @@ class GenericEngineResourceFactory extends 
AbstractEngineResourceFactory with Lo
         )
       }
     }
+  }
+
+  def applyAsLoadInstanceResource(properties: util.Map[String, String]): 
LoadInstanceResource = {
+    appendMemoryUnitIfMissing(properties)
 
     new LoadInstanceResource(
       
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
diff --git 
a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
index d20448127..7797ac1b1 100644
--- 
a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
@@ -43,6 +43,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.rpc.Sender
@@ -188,17 +189,10 @@ class ElasticSearchEngineConnExecutor(
   override def requestExpectedResource(expectedResource: NodeResource): 
NodeResource = null
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
+    NodeResourceUtils.appendMemoryUnitIfMissing(
+      EngineConnObject.getEngineCreationContext.getOptions
+    )
+
     val resource = new CommonNodeResource
     val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
     resource.setUsedResource(usedResource)
diff --git 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 55b3cdd09..089eef23a 100644
--- 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -41,6 +41,7 @@ import org.apache.linkis.manager.common.entity.resource.{
 }
 import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.scheduler.executer.{
@@ -562,24 +563,11 @@ class HiveEngineConnExecutor(
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
-    val actualUsedResource = new LoadInstanceResource(
-      
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
-    )
     val resource = new CommonNodeResource
-    resource.setUsedResource(actualUsedResource)
+    resource.setUsedResource(
+      NodeResourceUtils
+        
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+    )
     resource
   }
 
diff --git 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
index 90e0a0e02..fbc4a77d1 100644
--- 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
@@ -30,6 +30,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.engineplugin.io.conf.IOEngineConnConfiguration
 import org.apache.linkis.manager.engineplugin.io.domain.FSInfo
 import org.apache.linkis.manager.engineplugin.io.service.FsProxyService
@@ -243,17 +244,10 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: 
Int = 10)
   override def requestExpectedResource(expectedResource: NodeResource): 
NodeResource = null
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
+    NodeResourceUtils.appendMemoryUnitIfMissing(
+      EngineConnObject.getEngineCreationContext.getOptions
+    )
+
     val resource = new CommonNodeResource
     val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
     resource.setUsedResource(usedResource)
diff --git 
a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
index 69e28d2a1..2e9378cf8 100644
--- 
a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
@@ -35,6 +35,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.engineplugin.jdbc.ConnectionManager
 import org.apache.linkis.manager.engineplugin.jdbc.conf.JDBCConfiguration
 import 
org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
@@ -356,21 +357,10 @@ class JDBCEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
   override def requestExpectedResource(expectedResource: NodeResource): 
NodeResource = null
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (
-          !settingClientMemory
-            .toLowerCase()
-            .endsWith(JDBCEngineConnConstant.JDBC_ENGINE_MEMORY_UNIT)
-      ) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + JDBCEngineConnConstant.JDBC_ENGINE_MEMORY_UNIT
-        )
-      }
-    }
+    NodeResourceUtils.appendMemoryUnitIfMissing(
+      EngineConnObject.getEngineCreationContext.getOptions
+    )
+
     val resource = new CommonNodeResource
     val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
     resource.setUsedResource(usedResource)
diff --git 
a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala
index 332160c51..0ba364c2d 100644
--- 
a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala
@@ -29,6 +29,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import 
org.apache.linkis.manager.engineplugin.pipeline.errorcode.PopelineErrorCodeSummary._
 import 
org.apache.linkis.manager.engineplugin.pipeline.exception.PipeLineErrorException
 import org.apache.linkis.manager.label.entity.Label
@@ -106,25 +107,11 @@ class PipelineEngineConnExecutor(val id: Int) extends 
ComputationExecutor with L
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    // todo refactor for duplicate
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
-    val actualUsedResource = new LoadInstanceResource(
-      
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
-    )
     val resource = new CommonNodeResource
-    resource.setUsedResource(actualUsedResource)
+    resource.setUsedResource(
+      NodeResourceUtils
+        
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+    )
     resource
   }
 
diff --git 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
index f8040dc02..cdcec7519 100644
--- 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
@@ -41,6 +41,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, 
UserCreatorLabel}
 import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -191,16 +192,10 @@ class PrestoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory = 
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
+    NodeResourceUtils.appendMemoryUnitIfMissing(
+      EngineConnObject.getEngineCreationContext.getOptions
+    )
+
     val resource = new CommonNodeResource
     val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
     resource.setUsedResource(usedResource)
diff --git 
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
index e8a0c823f..3b17fa60a 100644
--- 
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
@@ -31,6 +31,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import 
org.apache.linkis.manager.engineplugin.python.conf.PythonEngineConfiguration
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -135,25 +136,11 @@ class PythonEngineConnExecutor(id: Int, pythonSession: 
PythonSession, outputPrin
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    // todo refactor for duplicate
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
-    val actualUsedResource = new LoadInstanceResource(
-      
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
-    )
     val resource = new CommonNodeResource
-    resource.setUsedResource(actualUsedResource)
+    resource.setUsedResource(
+      NodeResourceUtils
+        
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+    )
     resource
   }
 
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
index f5ed0820f..2a0109d12 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
@@ -50,6 +50,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.protocol.constants.TaskConstant
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.scheduler.executer.ErrorExecuteResponse
@@ -157,23 +158,11 @@ class SeatunnelFlinkOnceCodeExecutor(
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory = 
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
-    val actualUsedResource = new LoadInstanceResource(
-      
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
-    )
     val resource = new CommonNodeResource
-    resource.setUsedResource(actualUsedResource)
+    resource.setUsedResource(
+      NodeResourceUtils
+        
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+    )
     resource
   }
 
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
index 6795fe9a1..784ec92a4 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
@@ -50,6 +50,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.protocol.constants.TaskConstant
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.scheduler.executer.ErrorExecuteResponse
@@ -157,23 +158,11 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory = 
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
-    val actualUsedResource = new LoadInstanceResource(
-      
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
-    )
     val resource = new CommonNodeResource
-    resource.setUsedResource(actualUsedResource)
+    resource.setUsedResource(
+      NodeResourceUtils
+        
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+    )
     resource
   }
 
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
index 1e3bdd5d8..b0cf6ece7 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
@@ -37,6 +37,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.protocol.constants.TaskConstant
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.scheduler.executer.ErrorExecuteResponse
@@ -136,23 +137,11 @@ class SeatunnelSparkOnceCodeExecutor(
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory = 
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
-    val actualUsedResource = new LoadInstanceResource(
-      
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
-    )
     val resource = new CommonNodeResource
-    resource.setUsedResource(actualUsedResource)
+    resource.setUsedResource(
+      NodeResourceUtils
+        
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+    )
     resource
   }
 
diff --git 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
index 88d385f82..c9f320625 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
@@ -30,6 +30,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import 
org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst
 import 
org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException
 import org.apache.linkis.manager.label.entity.Label
@@ -258,25 +259,11 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    // todo refactor for duplicate
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
-    val actualUsedResource = new LoadInstanceResource(
-      
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
-      EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
-    )
     val resource = new CommonNodeResource
-    resource.setUsedResource(actualUsedResource)
+    resource.setUsedResource(
+      NodeResourceUtils
+        
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+    )
     resource
   }
 
diff --git 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index a1089ec62..a3a0d0d90 100644
--- 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -45,6 +45,7 @@ import org.apache.linkis.manager.common.entity.resource.{
   NodeResource
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, 
UserCreatorLabel}
 import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -284,17 +285,10 @@ class TrinoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
   }
 
   override def getCurrentNodeResource(): NodeResource = {
-    val properties = EngineConnObject.getEngineCreationContext.getOptions
-    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
-      val settingClientMemory =
-        properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(
-          EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
-          settingClientMemory + "g"
-        )
-      }
-    }
+    NodeResourceUtils.appendMemoryUnitIfMissing(
+      EngineConnObject.getEngineCreationContext.getOptions
+    )
+
     val resource = new CommonNodeResource
     val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
     resource.setUsedResource(usedResource)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to