This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 09f526a585 1.8.0 release feature (#5225)
09f526a585 is described below
commit 09f526a58509345383d21a4ea688f44b13e930b0
Author: GLeXios <[email protected]>
AuthorDate: Mon Mar 31 19:06:36 2025 +0800
1.8.0 release feature (#5225)
Co-authored-by: gelxiogong <[email protected]>
---
.../org/apache/linkis/common/utils/RSAUtils.scala | 2 +-
.../linkis/hadoop/common/conf/HadoopConf.scala | 3 +
.../linkis/hadoop/common/utils/HDFSUtils.scala | 5 +-
.../linkis/server/security/SecurityFilter.scala | 2 +-
.../linkis/storage/conf/LinkisStorageConf.scala | 2 +-
.../computation/client/LinkisJobBuilder.scala | 2 +-
.../client/once/simple/SimpleOnceJobBuilder.scala | 3 +-
.../common/constant/job/JobRequestConstants.scala | 2 +
.../executor/hook/PythonModuleLoad.scala | 161 -----------------
.../hook/PythonModuleLoadEngineConnHook.scala | 199 ++++++++++++++++++++-
.../executor/hook/PythonSparkEngineHook.scala | 45 -----
.../common/conf/EngineConnPluginConf.scala | 3 +
.../linkis/manager/am/conf/AMConfiguration.java | 2 +-
.../linkis/manager/rm/restful/RMMonitorRest.scala | 3 +
.../hive/executor/HiveEngineConnExecutor.scala | 60 ++++---
.../nebula/conf/NebulaConfiguration.java | 3 +
.../nebula/executor/NebulaEngineConnExecutor.java | 5 +-
.../errorcode/LinkisPythonErrorCodeSummary.java | 3 +-
.../python/executor/PythonEngineConnExecutor.scala | 15 +-
.../python/executor/PythonSession.scala | 3 +-
.../python/hook/PythonVersionEngineHook.scala | 3 +-
.../spark/executor/SparkEngineConnExecutor.scala | 5 +-
.../spark/executor/SparkPythonExecutor.scala | 5 +-
.../impl/ComputationEngineConnExecutor.scala | 2 +
.../linkis/configuration/constant/Constants.scala | 3 +-
.../metadata/service/impl/MdqServiceImpl.java | 56 +++---
.../cache/impl/DefaultQueryCacheManager.java | 32 ++--
.../linkis/jobhistory/dao/JobHistoryMapper.java | 7 +-
.../resources/mapper/mysql/JobHistoryMapper.xml | 13 +-
.../service/impl/JobHistoryQueryServiceImpl.scala | 10 +-
.../jobhistory/dao/JobHistoryMapperTest.java | 2 +-
.../errorcode/client/ClientConfiguration.java | 3 +-
.../apache/linkis/bml/conf/BmlConfiguration.scala | 3 +-
.../linkis/cs/client/utils/ContextClientConf.scala | 3 +-
.../client/config/DatasourceClientConfig.scala | 3 +-
.../filesystem/conf/WorkspaceClientConf.scala | 3 +-
.../gateway/config/GatewayConfiguration.scala | 5 +-
pom.xml | 2 +-
tool/dependencies/known-dependencies.txt | 2 +-
39 files changed, 380 insertions(+), 305 deletions(-)
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala
index 746b3600a6..bded200e24 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala
@@ -26,7 +26,7 @@ import java.nio.charset.StandardCharsets
import java.security.{KeyPair, KeyPairGenerator, PrivateKey, PublicKey}
object RSAUtils {
- private implicit val keyPair = genKeyPair(1024)
+ private implicit val keyPair = genKeyPair(2048)
def genKeyPair(keyLength: Int): KeyPair = {
val keyPair = KeyPairGenerator.getInstance("RSA")
diff --git
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
index c550b3f517..02e1762e2e 100644
---
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
+++
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
@@ -64,6 +64,9 @@ object HadoopConf {
val HDFS_ENABLE_CACHE_CLOSE =
CommonVars("linkis.hadoop.hdfs.cache.close.enable", true).getValue
+ val HDFS_ENABLE_NOT_CLOSE_USERS =
+ CommonVars("linkis.hadoop.hdfs.cache.not.close.users", "").getValue
+
val HDFS_ENABLE_CACHE_IDLE_TIME =
CommonVars("wds.linkis.hadoop.hdfs.cache.idle.time", 3 * 60 *
1000).getValue
diff --git
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
index e18837fd5f..d4b6af555a 100644
---
a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
+++
b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
@@ -68,7 +68,10 @@ object HDFSUtils extends Logging {
.foreach { hdfsFileSystemContainer =>
val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX
locker.intern() synchronized {
- if (hdfsFileSystemContainer.canRemove()) {
+ if (
+ hdfsFileSystemContainer.canRemove() &&
!HadoopConf.HDFS_ENABLE_NOT_CLOSE_USERS
+ .contains(hdfsFileSystemContainer.getUser)
+ ) {
fileSystemCache.remove(
hdfsFileSystemContainer.getUser + JOINT +
hdfsFileSystemContainer.getLabel
)
diff --git
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/security/SecurityFilter.scala
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/security/SecurityFilter.scala
index 5cc796d23e..b372ead651 100644
---
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/security/SecurityFilter.scala
+++
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/security/SecurityFilter.scala
@@ -83,7 +83,7 @@ class SecurityFilter extends Filter {
ServerConfiguration.BDP_SERVER_RESTFUL_PASS_AUTH_REQUEST_URI
.exists(r => !r.equals("") && request.getRequestURI.startsWith(r))
) {
- logger.info("pass auth uri: " + request.getRequestURI)
+ logger.debug("pass auth uri: " + request.getRequestURI)
true
} else {
val userName = Utils.tryCatch(SecurityFilter.getLoginUser(request)) {
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
index 5330983dd6..ace8509d4a 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
@@ -33,7 +33,7 @@ object LinkisStorageConf {
CommonVars
.apply(
"wds.linkis.hdfs.rest.errs",
- ".*Filesystem closed.*|.*Failed to find any Kerberos tgt.*"
+ ".*Filesystem closed.*|.*Failed to find any Kerberos tgt.*|.*The
client is stopped.*"
)
.getValue
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala
index eff8411603..36f4ee8fc4 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala
@@ -176,7 +176,7 @@ object LinkisJobBuilder {
var authTokenValue: String = CommonVars[String](
"wds.linkis.client.test.common.tokenValue",
- "LINKIS_CLI_TEST"
+ Configuration.LINKIS_TOKEN.getValue
).getValue // This is the default authToken, we usually suggest set
different ones for users.
def setDefaultClientConfig(clientConfig: DWSClientConfig): Unit =
this.clientConfig = clientConfig
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala
index d7c4746188..510aabf7f4 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.computation.client.once.simple
import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory}
+import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobBuilder
import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig
@@ -171,7 +172,7 @@ object SimpleOnceJobBuilder {
) // We think 90s is enough, if SocketTimeoutException is throw,
just set a new clientConfig to modify it.
.setAuthenticationStrategy(new TokenAuthenticationStrategy())
.setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY)
- .setAuthTokenValue(LinkisJobBuilder.authTokenValue)
+ .setAuthTokenValue(LINKIS_TOKEN.getValue)
.setDWSVersion(clientConfig.getDWSVersion)
.build()
bmlClient = BmlClientFactory.createBmlClient(newClientConfig)
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala
index 110b02b8fe..5aab48388a 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala
@@ -36,4 +36,6 @@ object JobRequestConstants {
val ENABLE_DIRECT_PUSH = "enableDirectPush"
val DIRECT_PUSH_FETCH_SIZE = "direct_push_fetch_size"
+ val LINKIS_HIVE_EC_READ_RESULT_BY_OBJECT = "readResByObject"
+
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala
deleted file mode 100644
index 34928d8525..0000000000
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineconn.computation.executor.hook
-
-import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV
-import org.apache.linkis.common.utils.{Logging, Utils}
-import
org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
-import org.apache.linkis.engineconn.computation.executor.execute.{
- ComputationExecutor,
- EngineExecutionContext
-}
-import org.apache.linkis.engineconn.core.engineconn.EngineConnManager
-import org.apache.linkis.engineconn.core.executor.ExecutorManager
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.RunType.RunType
-import org.apache.linkis.rpc.Sender
-import org.apache.linkis.udf.UDFClientConfiguration
-import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol,
ResponsePythonModuleProtocol}
-import org.apache.linkis.udf.entity.PythonModuleInfoVO
-
-import org.apache.commons.lang3.StringUtils
-
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * The PythonModuleLoad class is designed to load Python modules into the
execution environment
- * dynamically. This class is not an extension of UDFLoad, but shares a
similar philosophy of
- * handling dynamic module loading based on user preferences and system
configurations.
- */
-abstract class PythonModuleLoad extends Logging {
-
- /** Abstract properties to be defined by the subclass */
- protected val engineType: String
- protected val runType: RunType
-
- protected def getEngineType(): String = engineType
-
- protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String
-
- private def queryPythonModuleRpc(
- userName: String,
- engineType: String
- ): java.util.List[PythonModuleInfoVO] = {
- val infoList = Sender
- .getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue)
- .ask(RequestPythonModuleProtocol(userName, engineType))
- .asInstanceOf[ResponsePythonModuleProtocol]
- .getModulesInfo()
- infoList
- }
-
- protected def getLoadPythonModuleCode: Array[String] = {
- val engineCreationContext =
-
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
- val user = engineCreationContext.getUser
-
- var infoList: util.List[PythonModuleInfoVO] =
- Utils.tryAndWarn(queryPythonModuleRpc(user, getEngineType()))
- if (infoList == null) {
- logger.info("rpc get info is empty.")
- infoList = new util.ArrayList[PythonModuleInfoVO]()
- }
-
- // 替换Viewfs
- if (IS_VIEW_FS_ENV.getValue) {
- infoList.asScala.foreach { info =>
- val path = info.getPath
- logger.info(s"python path: ${path}")
- if (path.startsWith("hdfs") || path.startsWith("viewfs")) {
- info.setPath(path.replace("hdfs://", "viewfs://"))
- } else {
- info.setPath("viewfs://" + path)
- }
- }
- } else {
-
- infoList.asScala.foreach { info =>
- val path = info.getPath
- logger.info(s"hdfs python path: ${path}")
- if (!path.startsWith("hdfs")) {
- info.setPath("hdfs://" + path)
- }
- }
- }
-
- logger.info(s"${user} load python modules: ")
- infoList.asScala.foreach(l => logger.info(s"module name:${l.getName},
path:${l.getPath}\n"))
-
- // 创建加载code
- val codes: mutable.Buffer[String] = infoList.asScala
- .filter { info => StringUtils.isNotEmpty(info.getPath) }
- .map(constructCode)
- // 打印codes
- val str: String = codes.mkString("\n")
- logger.info(s"python codes: $str")
- codes.toArray
- }
-
- private def executeFunctionCode(codes: Array[String], executor:
ComputationExecutor): Unit = {
- if (null == codes || null == executor) {
- return
- }
- codes.foreach { code =>
- logger.info("Submit function registration to engine, code: " + code)
- Utils.tryCatch(executor.executeLine(new
EngineExecutionContext(executor), code)) {
- t: Throwable =>
- logger.error("Failed to load python module", t)
- null
- }
- }
- }
-
- /**
- * Generate and execute the code necessary for loading Python modules.
- *
- * @param executor
- * An object capable of executing code in the current engine context.
- */
- protected def loadPythonModules(labels: Array[Label[_]]): Unit = {
-
- val codes = getLoadPythonModuleCode
- logger.info(s"codes length: ${codes.length}")
- if (null != codes && codes.nonEmpty) {
- val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
- if (executor != null) {
- val className = executor.getClass.getName
- logger.info(s"executor class: ${className}")
- } else {
- logger.error(s"Failed to load python, executor is null")
- }
-
- executor match {
- case computationExecutor: ComputationExecutor =>
- executeFunctionCode(codes, computationExecutor)
- case _ =>
- }
- }
- logger.info(s"Successful to load python, engineType : ${engineType}")
- }
-
-}
-
-// Note: The actual implementation of methods like `executeFunctionCode` and
`construct
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala
index 80eaa888b8..386cd8f0b5 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala
@@ -17,12 +17,151 @@
package org.apache.linkis.engineconn.computation.executor.hook
+import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.common.hook.EngineConnHook
+import org.apache.linkis.engineconn.computation.executor.execute.{
+ ComputationExecutor,
+ EngineExecutionContext
+}
+import org.apache.linkis.engineconn.core.engineconn.EngineConnManager
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.hadoop.common.conf.HadoopConf
+import org.apache.linkis.hadoop.common.utils.HDFSUtils
import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel
+import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel,
RunType}
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+import org.apache.linkis.rpc.Sender
+import org.apache.linkis.udf.UDFClientConfiguration
+import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol,
ResponsePythonModuleProtocol}
+import org.apache.linkis.udf.entity.PythonModuleInfoVO
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import java.util
+import java.util.{Collections, Comparator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+abstract class PythonModuleLoad extends Logging {
+
+ /** Abstract properties to be defined by the subclass */
+ protected val engineType: String
+ protected val runType: RunType
+ protected def getEngineType(): String = engineType
+ protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String
+
+ private def queryPythonModuleRpc(
+ userName: String,
+ engineType: String
+ ): java.util.List[PythonModuleInfoVO] = {
+ val infoList = Sender
+ .getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue)
+ .ask(RequestPythonModuleProtocol(userName, engineType))
+ .asInstanceOf[ResponsePythonModuleProtocol]
+ .getModulesInfo()
+ // 使用Collections.sort()和Comparator进行排序
+ if (infoList != null && !infoList.isEmpty) {
+ Collections.sort(
+ infoList,
+ new Comparator[PythonModuleInfoVO]() {
+ override def compare(o1: PythonModuleInfoVO, o2:
PythonModuleInfoVO): Int =
+ Integer.compare(o1.getId.toInt, o1.getId.toInt)
+ }
+ )
+ }
+ infoList
+ }
+
+ protected def getLoadPythonModuleCode: Array[String] = {
+ val engineCreationContext =
+
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
+ val user = engineCreationContext.getUser
+ var infoList: util.List[PythonModuleInfoVO] =
+ Utils.tryAndWarn(queryPythonModuleRpc(user, getEngineType()))
+ if (infoList == null) {
+ logger.info("rpc get info is empty.")
+ infoList = new util.ArrayList[PythonModuleInfoVO]()
+ }
+ // 替换Viewfs
+ if (IS_VIEW_FS_ENV.getValue) {
+ infoList.asScala.foreach { info =>
+ val path = info.getPath
+ logger.info(s"python path: ${path}")
+ if (path.startsWith("hdfs") || path.startsWith("viewfs")) {
+ info.setPath(path.replace("hdfs://", "viewfs://"))
+ } else {
+ info.setPath("viewfs://" + path)
+ }
+ }
+ } else {
+ infoList.asScala.foreach { info =>
+ val path = info.getPath
+ logger.info(s"hdfs python path: ${path}")
+ if (!path.startsWith("hdfs")) {
+ info.setPath("hdfs://" + path)
+ }
+ }
+ }
+ logger.info(s"${user} load python modules: ")
+ infoList.asScala.foreach(l => logger.info(s"module name:${l.getName},
path:${l.getPath}\n"))
+ // 创建加载code
+ val codes: mutable.Buffer[String] = infoList.asScala
+ .filter { info => StringUtils.isNotEmpty(info.getPath) }
+ .map(constructCode)
+ // 打印codes
+ val str: String = codes.mkString("\n")
+ logger.info(s"python codes: $str")
+ codes.toArray
+ }
+
+ private def executeFunctionCode(codes: Array[String], executor:
ComputationExecutor): Unit = {
+ if (null == codes || null == executor) {
+ return
+ }
+ codes.foreach { code =>
+ logger.info("Submit function registration to engine, code: " + code)
+ Utils.tryCatch(executor.executeLine(new
EngineExecutionContext(executor), code)) {
+ t: Throwable =>
+ logger.error("Failed to load python module", t)
+ null
+ }
+ }
+ }
+
+ /**
+ * Generate and execute the code necessary for loading Python modules.
+ *
+ * @param executor
+ * An object capable of executing code in the current engine context.
+ */
+ protected def loadPythonModules(labels: Array[Label[_]]): Unit = {
+ val codes = getLoadPythonModuleCode
+ logger.info(s"codes length: ${codes.length}")
+ if (null != codes && codes.nonEmpty) {
+ val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
+ if (executor != null) {
+ val className = executor.getClass.getName
+ logger.info(s"executor class: ${className}")
+ } else {
+ logger.error(s"Failed to load python, executor is null")
+ }
+ executor match {
+ case computationExecutor: ComputationExecutor =>
+ executeFunctionCode(codes, computationExecutor)
+ case _ =>
+ }
+ }
+ logger.info(s"Successful to load python, engineType : ${engineType}")
+ }
+
+}
abstract class PythonModuleLoadEngineConnHook
extends PythonModuleLoad
@@ -40,7 +179,6 @@ abstract class PythonModuleLoadEngineConnHook
val labels = Array[Label[_]](codeLanguageLabel)
loadPythonModules(labels)
}(s"Failed to load Python Modules: ${engineType}")
-
}
override def afterEngineServerStartFailed(
@@ -62,3 +200,60 @@ abstract class PythonModuleLoadEngineConnHook
}
}
+
+// 加载PySpark的Python模块
+class PythonSparkEngineHook extends PythonModuleLoadEngineConnHook {
+ // 设置engineType属性为"spark",表示此挂钩适用于Spark数据处理引擎
+ override val engineType: String = "spark"
+ // 设置runType属性为RunType.PYSPARK,表示此挂钩将执行PySpark类型的代码
+ override protected val runType: RunType = RunType.PYSPARK
+
+ // 重写constructCode方法,用于根据Python模块信息构造加载模块的代码
+ override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO):
String = {
+ // 使用pythonModuleInfo的path属性,构造SparkContext.addPyFile的命令字符串
+ // 这个命令在PySpark环境中将模块文件添加到所有worker上,以便在代码中可以使用
+ val path: String = pythonModuleInfo.getPath
+ val loadCode = s"sc.addPyFile('${path}')"
+ logger.info(s"pythonLoadCode: ${loadCode}")
+ loadCode
+ }
+
+}
+
+// 加载Python的Python模块
+class PythonEngineHook extends PythonModuleLoadEngineConnHook {
+ // 设置engineType属性为"python",表示此挂钩适用于python引擎
+ override val engineType: String = "python"
+ // 设置runType属性为RunType.PYTHON,表示此挂钩将执行python类型的代码
+ override protected val runType: RunType = RunType.PYTHON
+
+ // 重写constructCode方法,用于根据Python模块信息构造加载模块的代码
+ override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO):
String = {
+ // 处理文件
+ val path: String = pythonModuleInfo.getPath
+ val engineCreationContext: EngineCreationContext =
+
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
+ val user: String = engineCreationContext.getUser
+ var loadCode: String = null
+ logger.info(s"gen code in constructCode")
+ Utils.tryAndWarn({
+ // 获取引擎临时目录
+ var tmpDir: String = EngineConnConf.getEngineTmpDir
+ if (!tmpDir.endsWith("/")) {
+ tmpDir += "/"
+ }
+ val fileName: String = new java.io.File(path).getName
+ val destPath: String = tmpDir + fileName
+ val config: Configuration =
HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue)
+ val fs: FileSystem = HDFSUtils.getHDFSUserFileSystem(user, null, config)
+ fs.copyToLocalFile(new Path(path), new Path("file://" + destPath))
+ if (fileName.endsWith("zip")) {
+ tmpDir += fileName
+ }
+ loadCode = s"import sys; sys.path.append('${tmpDir}')"
+ logger.info(s"5 load local python code: ${loadCode} in path: $destPath")
+ })
+ loadCode
+ }
+
+}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala
deleted file mode 100644
index 0fe554f93d..0000000000
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineconn.computation.executor.hook
-
-import org.apache.linkis.manager.label.entity.engine.RunType
-import org.apache.linkis.manager.label.entity.engine.RunType.RunType
-import org.apache.linkis.udf.entity.PythonModuleInfoVO
-
-/**
- * 定义一个用于Spark引擎的Python模块加载与执行挂钩的类
- */
-class PythonSparkEngineHook extends PythonModuleLoadEngineConnHook {
-
- // 设置engineType属性为"spark",表示此挂钩适用于Spark数据处理引擎
- override val engineType: String = "spark"
-
- // 设置runType属性为RunType.PYSPARK,表示此挂钩将执行PySpark类型的代码
- override protected val runType: RunType = RunType.PYSPARK
-
- // 重写constructCode方法,用于根据Python模块信息构造加载模块的代码
- override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO):
String = {
- // 使用pythonModuleInfo的path属性,构造SparkContext.addPyFile的命令字符串
- // 这个命令在PySpark环境中将模块文件添加到所有worker上,以便在代码中可以使用
- val path: String = pythonModuleInfo.getPath
- val loadCode = s"sc.addPyFile('${path}')"
- logger.info(s"pythonLoadCode: ${loadCode}")
- loadCode
- }
-
-}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EngineConnPluginConf.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EngineConnPluginConf.scala
index 704204577e..c36d2a3b1d 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EngineConnPluginConf.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EngineConnPluginConf.scala
@@ -35,4 +35,7 @@ object EngineConnPluginConf {
"org.apache.linkis.engineconn.launch.EngineConnServer"
)
+ val PYTHON_VERSION_KEY: String = "python.version"
+ val SPARK_PYTHON_VERSION_KEY: String = "spark.python.version"
+
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
index 5164542445..72b72d8ebf 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
@@ -101,7 +101,7 @@ public class AMConfiguration {
CommonVars.apply("wds.linkis.allow.batch.kill.engine.types",
"spark,hive,python");
public static final CommonVars<String> UNALLOW_BATCH_KILL_ENGINE_TYPES =
- CommonVars.apply("wds.linkis.allow.batch.kill.engine.types",
"trino,appconn,io_file");
+ CommonVars.apply("wds.linkis.allow.batch.kill.engine.types",
"trino,appconn,io_file,nebula");
public static final CommonVars<String> MULTI_USER_ENGINE_USER =
CommonVars.apply("wds.linkis.multi.user.engine.user",
getDefaultMultiEngineUser());
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
index 8b47daf87b..b39eb91ac9 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
@@ -340,8 +340,11 @@ class RMMonitorRest extends Logging {
record.put("engineInstance", node.getServiceInstance.getInstance)
}
+ // return labels
+ val labels: util.List[Label[_]] = node.getLabels
record.put("creator", userCreatorLabel.getCreator)
record.put("engineType", engineTypeLabel.getEngineType)
+ record.put("labels", labels)
if (node.getNodeResource != null) {
if (node.getNodeResource.getLockedResource != null) {
record.put("preUsedResource", node.getNodeResource.getLockedResource)
diff --git
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 788d9fbef8..03036d30b7 100644
---
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -52,6 +52,7 @@ import org.apache.linkis.storage.domain.{Column, DataType}
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
+import org.apache.commons.collections.MapUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.hive.common.HiveInterruptUtils
import org.apache.hadoop.hive.conf.HiveConf
@@ -124,6 +125,8 @@ class HiveEngineConnExecutor(
private val splitter = "_"
+ private var readResByObject = false
+
override def init(): Unit = {
LOG.info(s"Ready to change engine state!")
if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) {
@@ -137,6 +140,17 @@ class HiveEngineConnExecutor(
engineExecutorContext: EngineExecutionContext,
code: String
): ExecuteResponse = {
+ readResByObject = MapUtils.getBooleanValue(
+ engineExecutorContext.getProperties,
+ JobRequestConstants.LINKIS_HIVE_EC_READ_RESULT_BY_OBJECT,
+ false
+ )
+ if (readResByObject) {
+ hiveConf.set(
+ "list.sink.output.formatter",
+ "org.apache.hadoop.hive.serde2.thrift.ThriftFormatter"
+ )
+ }
this.engineExecutorContext = engineExecutorContext
CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)
singleSqlProgressMap.clear()
@@ -354,30 +368,36 @@ class HiveEngineConnExecutor(
val resultSetWriter =
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
resultSetWriter.addMetaData(metaData)
val colLength = metaData.columns.length
- val result = new util.ArrayList[String]()
+ val result = new util.ArrayList[Object]()
var rows = 0
while (driver.getResults(result)) {
- val scalaResult: mutable.Buffer[String] = result.asScala
+ val scalaResult: mutable.Buffer[Object] = result.asScala
scalaResult foreach { s =>
- val arr: Array[String] = s.split("\t")
- val arrAny: ArrayBuffer[Any] = new ArrayBuffer[Any]()
- if (arr.length > colLength) {
- logger.error(
- s"""There is a \t tab in the result of hive code query, hive
cannot cut it, please use spark to
execute(查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)"""
- )
- throw new ErrorException(
- 60078,
- """There is a \t tab in the result of your query, hive cannot cut
it, please use spark to execute(您查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)"""
- )
- }
- if (arr.length == colLength) arr foreach arrAny.asJava.add
- else if (arr.length == 0) for (i <- 1 to colLength) arrAny.asJava add
""
- else {
- val i = colLength - arr.length
- arr foreach arrAny.asJava.add
- for (i <- 1 to i) arrAny.asJava add ""
+ if (!readResByObject) {
+ val arr: Array[String] = s.asInstanceOf[String].split("\t")
+ val arrAny: ArrayBuffer[Any] = new ArrayBuffer[Any]()
+ if (arr.length > colLength) {
+ logger.error(
+ s"""There is a \t tab in the result of hive code query, hive
cannot cut it, please use spark to
execute(查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)"""
+ )
+ throw new ErrorException(
+ 60078,
+ """There is a \t tab in the result of your query, hive cannot
cut it, please use spark to execute(您查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)"""
+ )
+ }
+ if (arr.length == colLength) {
+ arrAny.appendAll(arr)
+ } else if (arr.length == 0) for (i <- 1 to colLength) arrAny.asJava
add ""
+ else {
+ val i = colLength - arr.length
+ arr foreach arrAny.asJava.add
+ for (i <- 1 to i) arrAny.asJava add ""
+ }
+ resultSetWriter.addRecord(new TableRecord(arrAny.toArray))
+ } else {
+ resultSetWriter.addRecord(new
TableRecord(s.asInstanceOf[Array[Any]]))
}
- resultSetWriter.addRecord(new TableRecord(arrAny.toArray))
+
}
rows += result.size
result.clear()
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
index dfbb7a8b13..8015216a34 100644
---
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
@@ -42,6 +42,9 @@ public class NebulaConfiguration {
public static final CommonVars<String> NEBULA_PASSWORD =
CommonVars.apply("linkis.nebula.password", "nebula");
+ public static final CommonVars<String> NEBULA_SPACE =
+ CommonVars.apply("linkis.nebula.space", "nebula");
+
public static final CommonVars<Boolean> NEBULA_RECONNECT_ENABLED =
CommonVars.apply(
"linkis.nebula.reconnect.enabled",
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
index 20605a9cb6..a853313ae0 100644
---
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
@@ -282,9 +282,12 @@ public class NebulaEngineConnExecutor extends
ConcurrentComputationExecutor {
String username =
NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap);
String password =
NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap);
Boolean reconnect =
NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap);
-
+ String space = NebulaConfiguration.NEBULA_SPACE.getValue(configMap);
try {
session = nebulaPool.getSession(username, password, reconnect);
+ if (StringUtils.isNotBlank(space)) {
+ session.execute("use " + space);
+ }
} catch (Exception e) {
logger.error("Nebula Session initialization failed.");
throw new NebulaClientException(
diff --git
a/linkis-engineconn-plugins/python/src/main/java/org/apache/linkis/manager/engineplugin/python/errorcode/LinkisPythonErrorCodeSummary.java
b/linkis-engineconn-plugins/python/src/main/java/org/apache/linkis/manager/engineplugin/python/errorcode/LinkisPythonErrorCodeSummary.java
index 9158ae6a8a..36ee945e2e 100644
---
a/linkis-engineconn-plugins/python/src/main/java/org/apache/linkis/manager/engineplugin/python/errorcode/LinkisPythonErrorCodeSummary.java
+++
b/linkis-engineconn-plugins/python/src/main/java/org/apache/linkis/manager/engineplugin/python/errorcode/LinkisPythonErrorCodeSummary.java
@@ -21,8 +21,7 @@ import org.apache.linkis.common.errorcode.LinkisErrorCode;
public enum LinkisPythonErrorCodeSummary implements LinkisErrorCode {
PYTHON_EXECUTE_ERROR(60002, ""),
- PYSPARK_PROCESSS_STOPPED(
- 60003, "Pyspark process has stopped, query failed!(Pyspark
进程已停止,查询失败!)"),
+ PYSPARK_PROCESSS_STOPPED(60003, "python process has stopped, query
failed!(Python 进程已停止,查询失败!)"),
INVALID_PYTHON_SESSION(400201, "Invalid python session.(无效的 python 会话.)");
/** 错误码 */
private final int errorCode;
diff --git
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
index 3b17fa60a4..57943ca329 100644
---
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.manager.engineplugin.python.executor
+import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineconn.computation.executor.execute.{
ComputationExecutor,
@@ -31,6 +32,7 @@ import org.apache.linkis.manager.common.entity.resource.{
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import
org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.PYTHON_VERSION_KEY
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import
org.apache.linkis.manager.engineplugin.python.conf.PythonEngineConfiguration
import org.apache.linkis.manager.label.entity.Label
@@ -61,7 +63,7 @@ class PythonEngineConnExecutor(id: Int, pythonSession:
PythonSession, outputPrin
private def getPyVersion(): String = {
if (null != EngineConnServer.getEngineCreationContext.getOptions) {
EngineConnServer.getEngineCreationContext.getOptions
- .getOrDefault("python.version", "python")
+ .getOrDefault(PYTHON_VERSION_KEY, "python")
} else {
PythonEngineConfiguration.PYTHON_VERSION.getValue
}
@@ -72,13 +74,13 @@ class PythonEngineConnExecutor(id: Int, pythonSession:
PythonSession, outputPrin
code: String
): ExecuteResponse = {
val pythonVersion = engineExecutionContext.getProperties
- .getOrDefault("python.version", pythonDefaultVersion)
+ .getOrDefault(PYTHON_VERSION_KEY, pythonDefaultVersion)
.toString
.toLowerCase()
logger.info(s" EngineExecutionContext user python.version = >
${pythonVersion}")
- System.getProperties.put("python.version", pythonVersion)
+ System.getProperties.put(PYTHON_VERSION_KEY, pythonVersion)
logger.info(
- s" System getProperties python.version = >
${System.getProperties.getProperty("python.version")}"
+ s" System getProperties python.version = >
${System.getProperties.getProperty(PYTHON_VERSION_KEY)}"
)
// System.getProperties.put("python.application.pyFiles",
engineExecutionContext.getProperties.getOrDefault("python.application.pyFiles",
"file:///mnt/bdap/test/test/test.zip").toString)
pythonSession.lazyInitGateway()
@@ -89,6 +91,11 @@ class PythonEngineConnExecutor(id: Int, pythonSession:
PythonSession, outputPrin
logger.info("Python executor reset new engineExecutorContext!")
}
engineExecutionContext.appendStdout(s"$getId >> ${code.trim}")
+ if (this.engineExecutionContext.getCurrentParagraph == 1) {
+ engineExecutionContext.appendStdout(
+ LogUtils.generateInfo(s"Your Python Version is:\n$pythonVersion")
+ )
+ }
pythonSession.execute(code)
// lineOutputStream.flush()
SuccessExecuteResponse()
diff --git
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
index 9ef07b3eae..7479e94f15 100644
---
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
+++
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
@@ -22,6 +22,7 @@ import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecution
import org.apache.linkis.engineconn.computation.executor.rs.RsOutputStream
import org.apache.linkis.engineconn.launch.EngineConnServer
import org.apache.linkis.governance.common.utils.GovernanceUtils
+import
org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.PYTHON_VERSION_KEY
import
org.apache.linkis.manager.engineplugin.python.conf.PythonEngineConfiguration
import
org.apache.linkis.manager.engineplugin.python.errorcode.LinkisPythonErrorCodeSummary._
import org.apache.linkis.manager.engineplugin.python.exception.{
@@ -69,7 +70,7 @@ class PythonSession extends Logging {
private def getPyVersion(): String = {
if (null != EngineConnServer.getEngineCreationContext.getOptions) {
EngineConnServer.getEngineCreationContext.getOptions
- .getOrDefault("python.version", "python")
+ .getOrDefault(PYTHON_VERSION_KEY, "python")
} else {
PythonEngineConfiguration.PYTHON_VERSION.getValue
}
diff --git
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/hook/PythonVersionEngineHook.scala
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/hook/PythonVersionEngineHook.scala
index deec7ebcaa..e9f3e2f0ef 100644
---
a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/hook/PythonVersionEngineHook.scala
+++
b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/hook/PythonVersionEngineHook.scala
@@ -21,6 +21,7 @@ import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.common.hook.EngineConnHook
+import
org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.PYTHON_VERSION_KEY
import
org.apache.linkis.manager.engineplugin.python.conf.PythonEngineConfiguration
import org.apache.linkis.manager.engineplugin.python.executor.PythonSession
@@ -36,7 +37,7 @@ class PythonVersionEngineHook extends EngineConnHook with
Logging {
val params =
if (engineCreationContext.getOptions == null) new util.HashMap[String,
String]()
else engineCreationContext.getOptions
- _pythonVersion = params.getOrDefault("python.version", "python3")
+ _pythonVersion = params.getOrDefault(PYTHON_VERSION_KEY, "python3")
_pythonExtraPackage = params
.getOrDefault("python.application.pyFiles",
"file:///mnt/bdap/test/test/test.zip")
.toString
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 7fed0f436d..388cc4f27e 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -179,7 +179,9 @@ abstract class SparkEngineConnExecutor(val sc:
SparkContext, id: Long)
// with unit if set configuration with unit
// if not set sc get will get the value of
spark.yarn.executor.memoryOverhead such as 512(without unit)
val memoryOverhead = sc.getConf.get("spark.executor.memoryOverhead",
"1G")
-
+ val pythonVersion = SparkConfiguration.SPARK_PYTHON_VERSION.getValue(
+ EngineConnObject.getEngineCreationContext.getOptions
+ )
val sb = new StringBuilder
sb.append(s"spark.executor.instances=$executorNum\n")
sb.append(s"spark.executor.memory=${executorMem}G\n")
@@ -188,6 +190,7 @@ abstract class SparkEngineConnExecutor(val sc:
SparkContext, id: Long)
sb.append(s"spark.driver.cores=$sparkDriverCores\n")
sb.append(s"spark.yarn.queue=$queue\n")
sb.append(s"spark.executor.memoryOverhead=${memoryOverhead}\n")
+ sb.append(s"spark.python.version=$pythonVersion\n")
sb.append("\n")
engineExecutionContext.appendStdout(
LogUtils.generateInfo(s" Your spark job exec with
configs:\n${sb.toString()}")
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
index f947db9338..9ebda85ef6 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
@@ -33,6 +33,7 @@ import
org.apache.linkis.engineplugin.spark.imexport.CsvRelation
import org.apache.linkis.engineplugin.spark.utils.EngineUtils
import org.apache.linkis.governance.common.paser.PythonCodeParser
import org.apache.linkis.governance.common.utils.GovernanceUtils
+import
org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.SPARK_PYTHON_VERSION_KEY
import org.apache.linkis.scheduler.executer.{ExecuteResponse,
SuccessExecuteResponse}
import org.apache.linkis.storage.resultset.ResultSetWriter
@@ -154,10 +155,10 @@ class SparkPythonExecutor(val sparkEngineSession:
SparkEngineSession, val id: In
private def initGateway = {
// If the python version set by the user is obtained from the front end
as python3, the environment variable of python3 is taken; otherwise, the
default is python2
logger.info(
- s"spark.python.version =>
${engineCreationContext.getOptions.get("spark.python.version")}"
+ s"spark.python.version =>
${engineCreationContext.getOptions.get(SPARK_PYTHON_VERSION_KEY)}"
)
val userDefinePythonVersion = engineCreationContext.getOptions
- .getOrDefault("spark.python.version", "python")
+ .getOrDefault(SPARK_PYTHON_VERSION_KEY, "python")
.toLowerCase()
val sparkPythonVersion =
if (
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
index cb7998e5c4..a93c25d508 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
@@ -44,6 +44,8 @@ class ComputationEngineConnExecutor(engineNode: EngineNode)
extends AbstractEngi
override def getServiceInstance: ServiceInstance =
engineNode.getServiceInstance
+ def getEngineNode: EngineNode = engineNode
+
private def getEngineConnSender: Sender =
Sender.getSender(getServiceInstance)
override def getTicketId: String = engineNode.getTicketId
diff --git
a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/constant/Constants.scala
b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/constant/Constants.scala
index 1743edac30..a0b163756e 100644
---
a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/constant/Constants.scala
+++
b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/constant/Constants.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.configuration.constant
import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN
object Constants {
@@ -28,7 +29,7 @@ object Constants {
CommonVars[String]("linkis.configuration.linkisclient.auth.token.key",
"Validation-Code")
val AUTH_TOKEN_VALUE: CommonVars[String] =
- CommonVars[String]("linkis.configuration.linkisclient.auth.token.value",
"LINKIS-AUTH")
+ CommonVars[String]("linkis.configuration.linkisclient.auth.token.value",
LINKIS_TOKEN.getValue)
val CONNECTION_MAX_SIZE: CommonVars[Int] =
CommonVars[Int]("linkis.configuration.linkisclient.connection.max.size",
10)
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java
index 8a09a0e726..4fa9f49175 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java
@@ -18,8 +18,8 @@
package org.apache.linkis.metadata.service.impl;
import org.apache.linkis.common.utils.ByteTimeUtils;
-import org.apache.linkis.hadoop.common.conf.HadoopConf;
import org.apache.linkis.hadoop.common.utils.HDFSUtils;
+import org.apache.linkis.hadoop.common.utils.KerberosUtils;
import org.apache.linkis.metadata.dao.MdqDao;
import org.apache.linkis.metadata.domain.mdq.DomainCoversionUtils;
import org.apache.linkis.metadata.domain.mdq.Tunple;
@@ -110,6 +110,11 @@ public class MdqServiceImpl implements MdqService {
mdqFieldList.remove(collect.get(1));
}
}
+ if (!table.getPartitionTable()) {
+ mdqFieldList.stream()
+ .filter(MdqField::getPartitionField)
+ .forEach(mdqField -> mdqField.setPartitionField(false));
+ }
mdqDao.insertFields(mdqFieldList);
if (mdqTableBO.getImportInfo() != null) {
MdqTableImportInfoBO importInfo = mdqTableBO.getImportInfo();
@@ -151,7 +156,7 @@ public class MdqServiceImpl implements MdqService {
if (isImport
&& (importType == MdqImportType.Csv.ordinal()
|| importType == MdqImportType.Excel.ordinal())) {
- String destination =
mdqTableBO.getImportInfo().getArgs().get("destination");
+ String destination = mdqTableBO.getImportInfo().getDestination();
HashMap hashMap = new Gson().fromJson(destination, HashMap.class);
if (Boolean.valueOf(hashMap.get("importData").toString())) {
logger.info(
@@ -210,10 +215,10 @@ public class MdqServiceImpl implements MdqService {
.parallelStream()
.filter(f -> queryParam.getTableName().equals(f.get("NAME")))
.findFirst();
- Map<String, Object> talbe =
+ Map<String, Object> table =
tableOptional.orElseThrow(() -> new
IllegalArgumentException("table不存在"));
MdqTableBaseInfoVO mdqTableBaseInfoVO =
- DomainCoversionUtils.mapToMdqTableBaseInfoVO(talbe,
queryParam.getDbName());
+ DomainCoversionUtils.mapToMdqTableBaseInfoVO(table,
queryParam.getDbName());
String tableComment =
hiveMetaDao.getTableComment(queryParam.getDbName(),
queryParam.getTableName());
mdqTableBaseInfoVO.getBase().setComment(tableComment);
@@ -379,14 +384,27 @@ public class MdqServiceImpl implements MdqService {
}
private String getTableSize(String tableLocation) throws IOException {
- String tableSize = "0B";
- if (StringUtils.isNotBlank(tableLocation)) {
- FileStatus tableFile = getFileStatus(tableLocation);
- tableSize =
- ByteTimeUtils.bytesToString(
-
getRootHdfs().getContentSummary(tableFile.getPath()).getLength());
+ try {
+ String tableSize = "0B";
+ if (StringUtils.isNotBlank(tableLocation) && getRootHdfs().exists(new
Path(tableLocation))) {
+ FileStatus tableFile = getFileStatus(tableLocation);
+ tableSize =
+ ByteTimeUtils.bytesToString(
+
getRootHdfs().getContentSummary(tableFile.getPath()).getLength());
+ }
+ return tableSize;
+ } catch (IOException e) {
+ String message = e.getMessage();
+ String rootCauseMessage = ExceptionUtils.getRootCauseMessage(e);
+ if (message != null &&
message.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS)
+ || rootCauseMessage.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS)) {
+ logger.info("Failed to get tableSize, retry", e);
+ resetRootHdfs();
+ return getTableSize(tableLocation);
+ } else {
+ throw e;
+ }
}
- return tableSize;
}
private static volatile FileSystem rootHdfs = null;
@@ -397,9 +415,8 @@ public class MdqServiceImpl implements MdqService {
} catch (IOException e) {
String message = e.getMessage();
String rootCauseMessage = ExceptionUtils.getRootCauseMessage(e);
- if ((message != null &&
message.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS))
- || (rootCauseMessage != null
- &&
rootCauseMessage.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS))) {
+ if (message != null &&
message.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS)
+ || rootCauseMessage.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS)) {
logger.info("Failed to getFileStatus, retry", e);
resetRootHdfs();
return getFileStatus(location);
@@ -410,11 +427,6 @@ public class MdqServiceImpl implements MdqService {
}
private void resetRootHdfs() {
- if (HadoopConf.HDFS_ENABLE_CACHE()) {
- HDFSUtils.closeHDFSFIleSystem(
- HDFSUtils.getHDFSRootUserFileSystem(),
HadoopConf.HADOOP_ROOT_USER().getValue(), true);
- return;
- }
if (rootHdfs != null) {
synchronized (this) {
if (rootHdfs != null) {
@@ -427,15 +439,11 @@ public class MdqServiceImpl implements MdqService {
}
private FileSystem getRootHdfs() {
-
- if (HadoopConf.HDFS_ENABLE_CACHE()) {
- return HDFSUtils.getHDFSRootUserFileSystem();
- }
-
if (rootHdfs == null) {
synchronized (this) {
if (rootHdfs == null) {
rootHdfs = HDFSUtils.getHDFSRootUserFileSystem();
+ KerberosUtils.startKerberosRefreshThread();
}
}
}
diff --git
a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/DefaultQueryCacheManager.java
b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/DefaultQueryCacheManager.java
index 7ff5aeb32d..695c38b703 100644
---
a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/DefaultQueryCacheManager.java
+++
b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/DefaultQueryCacheManager.java
@@ -20,7 +20,6 @@ package org.apache.linkis.jobhistory.cache.impl;
import org.apache.linkis.jobhistory.cache.QueryCacheManager;
import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration;
import org.apache.linkis.jobhistory.dao.JobHistoryMapper;
-import org.apache.linkis.jobhistory.entity.JobHistory;
import org.apache.linkis.jobhistory.util.QueryConfig;
import org.apache.commons.lang3.time.DateUtils;
@@ -32,14 +31,9 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.function.Consumer;
-import com.github.pagehelper.PageHelper;
-import com.github.pagehelper.PageInfo;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
@@ -187,24 +181,24 @@ public class DefaultQueryCacheManager implements
QueryCacheManager, Initializing
@Override
public void refreshUndoneTask() {
- PageHelper.startPage(1, 10);
- List<JobHistory> queryTasks = null;
+ List<Integer> queryTasks = null;
+ Date eDate = new Date(System.currentTimeMillis());
+ Date sDate = DateUtils.addDays(eDate, -1);
try {
-
- Date eDate = new Date(System.currentTimeMillis());
- Date sDate = DateUtils.addDays(eDate, -1);
queryTasks =
jobHistoryMapper.searchWithIdOrderAsc(
sDate, eDate, undoneTaskMinId, Arrays.asList("Running",
"Inited", "Scheduled"));
- } finally {
- PageHelper.clearPage();
+ } catch (Exception e) {
+ logger.warn("Failed to refresh undone tasks", e);
}
-
- PageInfo<JobHistory> pageInfo = new PageInfo<>(queryTasks);
- List<JobHistory> list = pageInfo.getList();
- if (!list.isEmpty()) {
- undoneTaskMinId = list.get(0).getId();
+ if (null != queryTasks && !queryTasks.isEmpty()) {
+ undoneTaskMinId = queryTasks.get(0).longValue();
logger.info("Refreshing undone tasks, minimum id: {}", undoneTaskMinId);
+ } else {
+ Integer maxID = jobHistoryMapper.maxID(sDate, eDate, undoneTaskMinId);
+ if (null != maxID && maxID > undoneTaskMinId) {
+ undoneTaskMinId = maxID.longValue();
+ }
}
}
diff --git
a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
index 34f8933176..5783e86a91 100644
---
a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
+++
b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
@@ -45,12 +45,17 @@ public interface JobHistoryMapper {
backoff = @Backoff(delay = 10000))
void updateJobHistory(JobHistory jobReq);
- List<JobHistory> searchWithIdOrderAsc(
+ List<Integer> searchWithIdOrderAsc(
@Param("startDate") Date startDate,
@Param("endDate") Date endDate,
@Param("startId") Long startId,
@Param("status") List<String> status);
+ Integer maxID(
+ @Param("startDate") Date startDate,
+ @Param("endDate") Date endDate,
+ @Param("startId") Long startId);
+
List<JobHistory> search(
@Param("id") Long id,
@Param("umUser") String username,
diff --git
a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml
b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml
index 74cf9057f5..4122fdcf2f 100644
---
a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml
+++
b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml
@@ -118,13 +118,22 @@
ORDER BY job.created_time DESC
</select>
- <select id="searchWithIdOrderAsc" useCache="true"
resultMap="jobHistoryMap" >
- /*slave*/ SELECT * FROM linkis_ps_job_history_group_history
+ <select id="searchWithIdOrderAsc" useCache="true"
resultType="java.lang.Integer" >
+ /*slave*/ SELECT id FROM linkis_ps_job_history_group_history
<where>
<if test="startDate != null">and created_time >= #{startDate} AND
created_time <![CDATA[<=]]> #{endDate}</if>
<if test="startId != null">and id >= #{startId}</if>
<if test="status != null">and <foreach collection="status"
item="element" close=")" separator="," open="status in
(">#{element}</foreach></if>
</where>
+ limit 2
+ </select>
+
+ <select id="maxID" useCache="true" resultType="java.lang.Integer" >
+ /*slave*/ SELECT id FROM linkis_ps_job_history_group_history
+ <where>
+ <if test="startDate != null">and created_time >= #{startDate} AND
created_time <![CDATA[<=]]> #{endDate}</if>
+ <if test="startId != null">and id >= #{startId}</if>
+ </where>
ORDER BY linkis_ps_job_history_group_history.created_time
</select>
diff --git
a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index 76fe090ce6..68441b9e85 100644
---
a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++
b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -384,7 +384,13 @@ class JobHistoryQueryServiceImpl extends
JobHistoryQueryService with Logging {
cacheKey,
new Callable[Integer] {
override def call(): Integer = {
- getCountUndoneTasks(username, creator, sDate, eDate, engineType,
startJobId)
+ try {
+ getCountUndoneTasks(username, creator, sDate, eDate, engineType,
startJobId)
+ } catch {
+ case e: Exception =>
+ logger.error("Failed to get count undone tasks", e)
+ 0
+ }
}
}
)
@@ -399,7 +405,7 @@ class JobHistoryQueryServiceImpl extends
JobHistoryQueryService with Logging {
engineType: String,
startJobId: lang.Long
): Integer = {
- logger.info("Get count undone Tasks {}, {}, {}", username, creator,
engineType)
+ logger.info("Get count undone Tasks {}, {}, {}, {}", username, creator,
engineType, startJobId)
val statusList: util.List[String] = new util.ArrayList[String]()
statusList.add(TaskStatus.Running.toString)
statusList.add(TaskStatus.Inited.toString)
diff --git
a/linkis-public-enhancements/linkis-jobhistory/src/test/java/org/apache/linkis/jobhistory/dao/JobHistoryMapperTest.java
b/linkis-public-enhancements/linkis-jobhistory/src/test/java/org/apache/linkis/jobhistory/dao/JobHistoryMapperTest.java
index b27d7e7d15..d987144c6c 100644
---
a/linkis-public-enhancements/linkis-jobhistory/src/test/java/org/apache/linkis/jobhistory/dao/JobHistoryMapperTest.java
+++
b/linkis-public-enhancements/linkis-jobhistory/src/test/java/org/apache/linkis/jobhistory/dao/JobHistoryMapperTest.java
@@ -102,7 +102,7 @@ public class JobHistoryMapperTest extends BaseDaoTest {
status.add("Succeed");
Date eDate = new Date(System.currentTimeMillis());
Date sDate = DateUtils.addDays(eDate, -1);
- List<JobHistory> histories = jobHistoryMapper.searchWithIdOrderAsc(sDate,
eDate, 1L, status);
+ List<Integer> histories = jobHistoryMapper.searchWithIdOrderAsc(sDate,
eDate, 1L, status);
Assertions.assertTrue(histories.isEmpty());
}
diff --git
a/linkis-public-enhancements/linkis-pes-client/src/main/java/org/apache/linkis/errorcode/client/ClientConfiguration.java
b/linkis-public-enhancements/linkis-pes-client/src/main/java/org/apache/linkis/errorcode/client/ClientConfiguration.java
index e3b3cebba7..27062ad597 100644
---
a/linkis-public-enhancements/linkis-pes-client/src/main/java/org/apache/linkis/errorcode/client/ClientConfiguration.java
+++
b/linkis-public-enhancements/linkis-pes-client/src/main/java/org/apache/linkis/errorcode/client/ClientConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.linkis.errorcode.client;
import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.conf.Configuration;
public class ClientConfiguration {
@@ -37,7 +38,7 @@ public class ClientConfiguration {
CommonVars.apply("wds.linkis.errorcode.read.timeout", 10 * 60 * 1000L);
public static final CommonVars<String> AUTH_TOKEN_VALUE =
- CommonVars.apply("wds.linkis.errorcode.auth.token", "LINKIS-AUTH");
+ CommonVars.apply("wds.linkis.errorcode.auth.token",
Configuration.LINKIS_TOKEN().getValue());
public static final CommonVars<Long> FUTURE_TIME_OUT =
CommonVars.apply("wds.linkis.errorcode.future.timeout", 2000L);
diff --git
a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/conf/BmlConfiguration.scala
b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/conf/BmlConfiguration.scala
index 00f1d95559..cff372440a 100644
---
a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/conf/BmlConfiguration.scala
+++
b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/conf/BmlConfiguration.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.bml.conf
import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN
object BmlConfiguration {
@@ -35,7 +36,7 @@ object BmlConfiguration {
CommonVars[String]("wds.linkis.bml.auth.token.key", "Validation-Code")
val AUTH_TOKEN_VALUE: CommonVars[String] =
- CommonVars[String]("wds.linkis.bml.auth.token.value", "LINKIS-AUTH")
+ CommonVars[String]("wds.linkis.bml.auth.token.value",
LINKIS_TOKEN.getValue)
val CONNECTION_MAX_SIZE: CommonVars[Int] =
CommonVars[Int]("wds.linkis.bml.connection.max.size", 10)
diff --git
a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/cs/client/utils/ContextClientConf.scala
b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/cs/client/utils/ContextClientConf.scala
index ee1b6e02ad..120271d38a 100644
---
a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/cs/client/utils/ContextClientConf.scala
+++
b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/cs/client/utils/ContextClientConf.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.cs.client.utils
import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN
object ContextClientConf {
@@ -25,7 +26,7 @@ object ContextClientConf {
CommonVars[String]("wds.linkis.context.client.auth.key", "Token-Code")
val CONTEXT_CLIENT_AUTH_VALUE: CommonVars[String] =
- CommonVars[String]("wds.linkis.context.client.auth.value", "LINKIS-AUTH")
+ CommonVars[String]("wds.linkis.context.client.auth.value",
LINKIS_TOKEN.getValue)
val URL_PREFIX: CommonVars[String] =
CommonVars[String](
diff --git
a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/datasource/client/config/DatasourceClientConfig.scala
b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/datasource/client/config/DatasourceClientConfig.scala
index eff380f60b..426c5aee1d 100644
---
a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/datasource/client/config/DatasourceClientConfig.scala
+++
b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/datasource/client/config/DatasourceClientConfig.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.datasource.client.config
import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN
object DatasourceClientConfig {
@@ -34,7 +35,7 @@ object DatasourceClientConfig {
CommonVars[String]("wds.linkis.server.dsm.auth.token.key", "Token-Code")
val AUTH_TOKEN_VALUE: CommonVars[String] =
- CommonVars[String]("wds.linkis.server.dsm.auth.token.value", "LINKIS-AUTH")
+ CommonVars[String]("wds.linkis.server.dsm.auth.token.value",
LINKIS_TOKEN.getValue)
val DATA_SOURCE_SERVICE_CLIENT_NAME: CommonVars[String] =
CommonVars[String]("wds.linkis.server.dsm.client.name",
"DataSource-Client")
diff --git
a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/filesystem/conf/WorkspaceClientConf.scala
b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/filesystem/conf/WorkspaceClientConf.scala
index e5e1c963e0..b37dd785b3 100644
---
a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/filesystem/conf/WorkspaceClientConf.scala
+++
b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/filesystem/conf/WorkspaceClientConf.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.filesystem.conf
import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN
object WorkspaceClientConf {
@@ -37,7 +38,7 @@ object WorkspaceClientConf {
CommonVars[String]("wds.linkis.filesystem.token.key",
"Validation-Code").getValue
val tokenValue: String =
- CommonVars[String]("wds.linkis.filesystem.token.value",
"LINKIS-AUTH").getValue
+ CommonVars[String]("wds.linkis.filesystem.token.value",
LINKIS_TOKEN.getValue).getValue
val scriptFromBMLUrl: String = prefix + scriptFromBML
}
diff --git
a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala
index 61a9750ce5..5fc80d7afc 100644
---
a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala
+++
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.gateway.config
import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN
object GatewayConfiguration {
@@ -69,7 +70,9 @@ object GatewayConfiguration {
val ENABLE_GATEWAY_AUTH = CommonVars("wds.linkis.enable.gateway.auth", false)
val AUTH_IP_FILE = CommonVars("wds.linkis.gateway.auth.file", "auth.txt")
- val DEFAULT_GATEWAY_ACCESS_TOKEN =
CommonVars("wds.linkis.gateway.access.token", "LINKIS-AUTH")
+
+ val DEFAULT_GATEWAY_ACCESS_TOKEN =
+ CommonVars("wds.linkis.gateway.access.token", LINKIS_TOKEN.getValue)
val CONTROL_WORKSPACE_ID_LIST =
CommonVars("wds.linkis.gateway.control.workspace.ids", "224")
diff --git a/pom.xml b/pom.xml
index 88cf0be8b3..28aba1ef3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -154,7 +154,7 @@
<jettison.version>1.5.4</jettison.version>
<xstream.core.version>1.4.21</xstream.core.version>
<woodstox.version>6.4.0</woodstox.version>
- <snakeyaml.version>1.33</snakeyaml.version>
+ <snakeyaml.version>2.0</snakeyaml.version>
<protobuf.version>3.25.5</protobuf.version>
diff --git a/tool/dependencies/known-dependencies.txt
b/tool/dependencies/known-dependencies.txt
index 6654575af3..c1f7b7ecfb 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -698,7 +698,7 @@ sketches-core-0.9.0.jar
slf4j-api-1.7.30.jar
slf4j-reload4j-1.7.36.jar
slice-0.38.jar
-snakeyaml-1.33.jar
+snakeyaml-2.0.jar
snappy-java-1.1.4.jar
snappy-java-1.1.7.7.jar
snappy-java-1.1.8.2.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]