This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new b6d02af0f Extract codes related to appending memory unit to
NodeResourceUtils (#4143)
b6d02af0f is described below
commit b6d02af0f5937c7dc136bf262b99e0f1576313c4
Author: QuantumXiecao <[email protected]>
AuthorDate: Sun Feb 5 23:47:11 2023 +0800
Extract codes related to appending memory unit to NodeResourceUtils (#4143)
Co-authored-by: xiecao <[email protected]>
---
.../engineplugin/common/EngineConnPlugin.scala | 6 +++---
.../resource/GenericEngineResourceFactory.scala | 25 +++-------------------
.../NodeResourceUtils.scala} | 16 +++++++++-----
.../executor/ElasticSearchEngineConnExecutor.scala | 16 +++++---------
.../hive/executor/HiveEngineConnExecutor.scala | 22 +++++--------------
.../io/executor/IoEngineConnExecutor.scala | 16 +++++---------
.../jdbc/executor/JDBCEngineConnExecutor.scala | 20 +++++------------
.../executor/PipelineEngineConnExecutor.scala | 23 +++++---------------
.../presto/executor/PrestoEngineConnExecutor.scala | 15 +++++--------
.../python/executor/PythonEngineConnExecutor.scala | 23 +++++---------------
.../executor/SeatunnelFlinkOnceCodeExecutor.scala | 21 +++++-------------
.../SeatunnelFlinkSQLOnceCodeExecutor.scala | 21 +++++-------------
.../executor/SeatunnelSparkOnceCodeExecutor.scala | 21 +++++-------------
.../shell/executor/ShellEngineConnExecutor.scala | 23 +++++---------------
.../trino/executor/TrinoEngineConnExecutor.scala | 16 +++++---------
15 files changed, 77 insertions(+), 207 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/EngineConnPlugin.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/EngineConnPlugin.scala
index d8562c9d1..9e0f06ef9 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/EngineConnPlugin.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/EngineConnPlugin.scala
@@ -22,11 +22,11 @@ import
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuil
import
org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory
import org.apache.linkis.manager.label.entity.Label
-import java.util
+import java.util.{List, Map}
trait EngineConnPlugin {
- def init(params: util.Map[String, AnyRef])
+ def init(params: Map[String, AnyRef])
def getEngineResourceFactory: EngineResourceFactory
@@ -34,6 +34,6 @@ trait EngineConnPlugin {
def getEngineConnFactory: EngineConnFactory
- def getDefaultLabels: util.List[Label[_]]
+ def getDefaultLabels: List[Label[_]]
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
index d89318778..6aadd6cdd 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
@@ -18,34 +18,15 @@
package org.apache.linkis.manager.engineplugin.common.resource
import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.manager.common.entity.resource.{LoadInstanceResource,
Resource}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
-
-import org.apache.commons.lang3.StringUtils
+import org.apache.linkis.manager.common.entity.resource.Resource
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import java.util
class GenericEngineResourceFactory extends AbstractEngineResourceFactory with
Logging {
override protected def getRequestResource(properties: util.Map[String,
String]): Resource = {
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (StringUtils.isBlank(settingClientMemory)) {
- properties.remove(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- } else if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
-
- new LoadInstanceResource(
-
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
- )
+ NodeResourceUtils.applyAsLoadInstanceResource(properties)
}
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/util/NodeResourceUtils.scala
similarity index 79%
copy from
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
copy to
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/util/NodeResourceUtils.scala
index d89318778..be4e56805 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/GenericEngineResourceFactory.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/util/NodeResourceUtils.scala
@@ -15,19 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.common.resource
+package org.apache.linkis.manager.engineplugin.common.util
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.manager.common.entity.resource.{LoadInstanceResource,
Resource}
+import org.apache.linkis.manager.common.entity.resource.LoadInstanceResource
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import org.apache.commons.lang3.StringUtils
import java.util
+import java.util.Objects
-class GenericEngineResourceFactory extends AbstractEngineResourceFactory with
Logging {
+object NodeResourceUtils {
+
+ def appendMemoryUnitIfMissing(properties: util.Map[String, String]): Unit = {
+ Objects.requireNonNull(properties);
- override protected def getRequestResource(properties: util.Map[String,
String]): Resource = {
if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
val settingClientMemory =
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
@@ -40,6 +42,10 @@ class GenericEngineResourceFactory extends
AbstractEngineResourceFactory with Lo
)
}
}
+ }
+
+ def applyAsLoadInstanceResource(properties: util.Map[String, String]):
LoadInstanceResource = {
+ appendMemoryUnitIfMissing(properties)
new LoadInstanceResource(
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
diff --git
a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
index d20448127..7797ac1b1 100644
---
a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.scala
@@ -43,6 +43,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.rpc.Sender
@@ -188,17 +189,10 @@ class ElasticSearchEngineConnExecutor(
override def requestExpectedResource(expectedResource: NodeResource):
NodeResource = null
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext.getOptions
+ )
+
val resource = new CommonNodeResource
val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
resource.setUsedResource(usedResource)
diff --git
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 55b3cdd09..089eef23a 100644
---
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -41,6 +41,7 @@ import org.apache.linkis.manager.common.entity.resource.{
}
import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer.{
@@ -562,24 +563,11 @@ class HiveEngineConnExecutor(
}
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
- val actualUsedResource = new LoadInstanceResource(
-
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
- )
val resource = new CommonNodeResource
- resource.setUsedResource(actualUsedResource)
+ resource.setUsedResource(
+ NodeResourceUtils
+
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+ )
resource
}
diff --git
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
index 90e0a0e02..fbc4a77d1 100644
---
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
@@ -30,6 +30,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.engineplugin.io.conf.IOEngineConnConfiguration
import org.apache.linkis.manager.engineplugin.io.domain.FSInfo
import org.apache.linkis.manager.engineplugin.io.service.FsProxyService
@@ -243,17 +244,10 @@ class IoEngineConnExecutor(val id: Int, val outputLimit:
Int = 10)
override def requestExpectedResource(expectedResource: NodeResource):
NodeResource = null
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext.getOptions
+ )
+
val resource = new CommonNodeResource
val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
resource.setUsedResource(usedResource)
diff --git
a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
index 69e28d2a1..2e9378cf8 100644
---
a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
@@ -35,6 +35,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.engineplugin.jdbc.ConnectionManager
import org.apache.linkis.manager.engineplugin.jdbc.conf.JDBCConfiguration
import
org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
@@ -356,21 +357,10 @@ class JDBCEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
override def requestExpectedResource(expectedResource: NodeResource):
NodeResource = null
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (
- !settingClientMemory
- .toLowerCase()
- .endsWith(JDBCEngineConnConstant.JDBC_ENGINE_MEMORY_UNIT)
- ) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + JDBCEngineConnConstant.JDBC_ENGINE_MEMORY_UNIT
- )
- }
- }
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext.getOptions
+ )
+
val resource = new CommonNodeResource
val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
resource.setUsedResource(usedResource)
diff --git
a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala
b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala
index 332160c51..0ba364c2d 100644
---
a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala
@@ -29,6 +29,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import
org.apache.linkis.manager.engineplugin.pipeline.errorcode.PopelineErrorCodeSummary._
import
org.apache.linkis.manager.engineplugin.pipeline.exception.PipeLineErrorException
import org.apache.linkis.manager.label.entity.Label
@@ -106,25 +107,11 @@ class PipelineEngineConnExecutor(val id: Int) extends
ComputationExecutor with L
}
override def getCurrentNodeResource(): NodeResource = {
- // todo refactor for duplicate
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
- val actualUsedResource = new LoadInstanceResource(
-
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
- )
val resource = new CommonNodeResource
- resource.setUsedResource(actualUsedResource)
+ resource.setUsedResource(
+ NodeResourceUtils
+
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+ )
resource
}
diff --git
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
index f8040dc02..cdcec7519 100644
---
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
@@ -41,6 +41,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel,
UserCreatorLabel}
import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -191,16 +192,10 @@ class PrestoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
}
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext.getOptions
+ )
+
val resource = new CommonNodeResource
val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
resource.setUsedResource(usedResource)
diff --git
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
index e8a0c823f..3b17fa60a 100644
---
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
@@ -31,6 +31,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import
org.apache.linkis.manager.engineplugin.python.conf.PythonEngineConfiguration
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -135,25 +136,11 @@ class PythonEngineConnExecutor(id: Int, pythonSession:
PythonSession, outputPrin
}
override def getCurrentNodeResource(): NodeResource = {
- // todo refactor for duplicate
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
- val actualUsedResource = new LoadInstanceResource(
-
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
- )
val resource = new CommonNodeResource
- resource.setUsedResource(actualUsedResource)
+ resource.setUsedResource(
+ NodeResourceUtils
+
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+ )
resource
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
index f5ed0820f..2a0109d12 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
@@ -50,6 +50,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse
@@ -157,23 +158,11 @@ class SeatunnelFlinkOnceCodeExecutor(
}
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
- val actualUsedResource = new LoadInstanceResource(
-
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
- )
val resource = new CommonNodeResource
- resource.setUsedResource(actualUsedResource)
+ resource.setUsedResource(
+ NodeResourceUtils
+
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+ )
resource
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
index 6795fe9a1..784ec92a4 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
@@ -50,6 +50,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse
@@ -157,23 +158,11 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
}
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
- val actualUsedResource = new LoadInstanceResource(
-
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
- )
val resource = new CommonNodeResource
- resource.setUsedResource(actualUsedResource)
+ resource.setUsedResource(
+ NodeResourceUtils
+
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+ )
resource
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
index 1e3bdd5d8..b0cf6ece7 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
@@ -37,6 +37,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse
@@ -136,23 +137,11 @@ class SeatunnelSparkOnceCodeExecutor(
}
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
- val actualUsedResource = new LoadInstanceResource(
-
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
- )
val resource = new CommonNodeResource
- resource.setUsedResource(actualUsedResource)
+ resource.setUsedResource(
+ NodeResourceUtils
+
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+ )
resource
}
diff --git
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
index 88d385f82..c9f320625 100644
---
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
@@ -30,6 +30,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import
org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst
import
org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException
import org.apache.linkis.manager.label.entity.Label
@@ -258,25 +259,11 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
}
override def getCurrentNodeResource(): NodeResource = {
- // todo refactor for duplicate
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
- val actualUsedResource = new LoadInstanceResource(
-
EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties),
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE
- )
val resource = new CommonNodeResource
- resource.setUsedResource(actualUsedResource)
+ resource.setUsedResource(
+ NodeResourceUtils
+
.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
+ )
resource
}
diff --git
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index a1089ec62..a3a0d0d90 100644
---
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -45,6 +45,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel,
UserCreatorLabel}
import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -284,17 +285,10 @@ class TrinoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
}
override def getCurrentNodeResource(): NodeResource = {
- val properties = EngineConnObject.getEngineCreationContext.getOptions
- if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
- val settingClientMemory =
- properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(
- EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
- settingClientMemory + "g"
- )
- }
- }
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext.getOptions
+ )
+
val resource = new CommonNodeResource
val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
resource.setUsedResource(usedResource)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]