This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 913a437ce Configure Hot Loading (#3841)
913a437ce is described below
commit 913a437ce6402dc864c8c03b80d9921879994e14
Author: 成彬彬 <[email protected]>
AuthorDate: Thu Nov 17 21:52:46 2022 +0800
Configure Hot Loading (#3841)
* linkis-common - add hot-load support for CommonVars
---
.../linkis/common/conf/BDPConfiguration.scala | 117 ++++++++++++++++++---
.../org/apache/linkis/common/conf/CommonVars.scala | 23 +++-
.../linkis/common/commonvars/CommonVarsTest.java | 93 ++++++++++++++++
.../persistence/QueryPersistenceEngine.java | 4 +-
.../entrance/restful/EntranceRestfulApi.java | 4 +-
.../linkis/entrance/EntranceWebSocketService.scala | 6 +-
.../cli/heartbeat/CliHeartbeatMonitor.scala | 3 +-
.../linkis/entrance/cs/CSEntranceHelper.scala | 10 +-
.../linkis/entrance/interceptor/impl/Explain.scala | 6 +-
.../linkis/entrance/log/CacheLogManager.scala | 2 +-
.../EntranceUserParallelOrchestratorPlugin.scala | 6 +-
.../entrance/parser/CommonEntranceParser.scala | 13 +--
.../engineplugin/io/service/FsProxyService.scala | 13 ---
13 files changed, 245 insertions(+), 55 deletions(-)
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/BDPConfiguration.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/BDPConfiguration.scala
index 928b7fb64..c74e8393b 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/BDPConfiguration.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/BDPConfiguration.scala
@@ -24,8 +24,11 @@ import org.apache.commons.lang3.StringUtils
import java.io.{File, FileInputStream, InputStream, IOException}
import java.util.Properties
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
private[conf] object BDPConfiguration extends Logging {
@@ -33,11 +36,17 @@ private[conf] object BDPConfiguration extends Logging {
val DEFAULT_SERVER_CONF_FILE_NAME = "linkis-server.properties"
+ val DEFAULT_CONFIG_HOT_LOAD_DELAY_MILLS = 3 * 60 * 1000L
+
private val extractConfig = new Properties
private val config = new Properties
private val sysProps = sys.props
private val env = sys.env
+ private val configList = new ArrayBuffer[String]
+ private val configReload = new Properties
+ private val lock = new ReentrantReadWriteLock()
+
private def init: Unit = {
// load pub linkis conf
@@ -48,6 +57,7 @@ private[conf] object BDPConfiguration extends Logging {
s"******************* Notice: The Linkis configuration file is
$propertyFile ! *******************"
)
initConfig(config, configFileURL.getPath)
+ configList.append(configFileURL.getPath)
} else {
logger.warn(
s"************ Notice: The Linkis configuration file $propertyFile is
not exists! *******************"
@@ -62,6 +72,7 @@ private[conf] object BDPConfiguration extends Logging {
s"*********************** Notice: The Linkis serverConf file is
$serverConf ! ******************"
)
initConfig(config, serverConfFileURL.getPath)
+ configList.append(serverConfFileURL.getPath)
} else {
logger.warn(
s"**************** Notice: The Linkis serverConf file $serverConf is
not exists! *******************"
@@ -79,6 +90,7 @@ private[conf] object BDPConfiguration extends Logging {
s"************** Notice: The Linkis server.confs is file
$propertyF ****************"
)
initConfig(config, configFileURL.getPath)
+ configList.append(configFileURL.getPath)
} else {
logger.warn(
s"********** Notice: The Linkis server.confs file $propertyF is
not exists! **************"
@@ -87,6 +99,38 @@ private[conf] object BDPConfiguration extends Logging {
}
}
+ // init hot-load config task
+ val hotLoadTask = new Runnable {
+ override def run(): Unit = {
+ var tmpConfigPath = ""
+ var tmpConfig = new Properties()
+ Utils.tryCatch {
+ // refresh configuration
+ configList.foreach(configPath => {
+ if (logger.isDebugEnabled()) {
+ logger.debug(s"reload config file : ${configPath}")
+ }
+ tmpConfigPath = configPath
+ initConfig(tmpConfig, configPath)
+ })
+ } { case e: Exception =>
+ logger.error(s"reload config file : ${tmpConfigPath} failed, because
: ${e.getMessage}")
+ logger.warn("Will reset config to origin config.")
+ tmpConfig = config
+ }
+ lock.writeLock().lock()
+ configReload.clear()
+ configReload.putAll(tmpConfig)
+ lock.writeLock().unlock()
+ }
+ }
+ Utils.defaultScheduler.scheduleWithFixedDelay(
+ hotLoadTask,
+ 3000L,
+ DEFAULT_CONFIG_HOT_LOAD_DELAY_MILLS,
+ TimeUnit.MILLISECONDS
+ )
+ logger.info("hotload config task inited.")
}
Utils.tryCatch {
@@ -110,11 +154,18 @@ private[conf] object BDPConfiguration extends Logging {
}
}
- def getOption(key: String): Option[String] = {
+ def getOption(key: String, hotload: Boolean = false): Option[String] = {
if (extractConfig.containsKey(key)) {
return Some(extractConfig.getProperty(key))
}
- val value = config.getProperty(key)
+ var value = ""
+ if (hotload) {
+ lock.readLock().lock()
+ value = configReload.getProperty(key)
+ lock.readLock().unlock()
+ } else {
+ value = config.getProperty(key)
+ }
if (StringUtils.isNotEmpty(value)) {
return Some(value)
}
@@ -134,16 +185,45 @@ private[conf] object BDPConfiguration extends Logging {
props
}
+ def hotProperties(): Properties = {
+ val props = new Properties
+ mergePropertiesFromMap(props, env)
+ mergePropertiesFromMap(props, sysProps.toMap)
+ lock.readLock().lock()
+ mergePropertiesFromMap(props, configReload.asScala.toMap)
+ lock.readLock().unlock()
+ mergePropertiesFromMap(props, extractConfig.asScala.toMap)
+ props
+ }
+
def mergePropertiesFromMap(props: Properties, mapProps: Map[String,
String]): Unit = {
mapProps.foreach { case (k, v) => props.put(k, v) }
}
- def getOption[T](commonVars: CommonVars[T]): Option[T] = if
(commonVars.value != null) {
- Option(commonVars.value)
- } else {
- val value = BDPConfiguration.getOption(commonVars.key)
- if (value.isEmpty) Option(commonVars.defaultValue)
- else formatValue(commonVars.defaultValue, value)
+ def getOption[T](commonVars: CommonVars[T], hotload: Boolean): Option[T] = {
+ if (hotload) {
+ val value = BDPConfiguration.getOption(commonVars.key, hotload = true)
+ if (value.isEmpty) Option(commonVars.defaultValue)
+ else formatValue(commonVars.defaultValue, value)
+ } else {
+ if (commonVars.value != null) {
+ Option(commonVars.value)
+ } else {
+ val value = BDPConfiguration.getOption(commonVars.key)
+ if (value.isEmpty) Option(commonVars.defaultValue)
+ else formatValue(commonVars.defaultValue, value)
+ }
+ }
+ }
+
+ def getOption[T](commonVars: CommonVars[T]): Option[T] = {
+ if (commonVars.value != null) {
+ Option(commonVars.value)
+ } else {
+ val value = BDPConfiguration.getOption(commonVars.key)
+ if (value.isEmpty) Option(commonVars.defaultValue)
+ else formatValue(commonVars.defaultValue, value)
+ }
}
private[common] def formatValue[T](defaultValue: T, value: Option[String]):
Option[T] = {
@@ -170,19 +250,28 @@ private[conf] object BDPConfiguration extends Logging {
def setIfNotExists(key: String, value: String): Any =
if (!config.containsKey(key)) set(key, value)
- def getBoolean(key: String, default: Boolean): Boolean =
- getOption(key).map(_.toBoolean).getOrElse(default)
+ def getBoolean(key: String, default: Boolean, hotload: Boolean = false):
Boolean =
+ getOption(key, hotload).map(_.toBoolean).getOrElse(default)
def getBoolean(commonVars: CommonVars[Boolean]): Option[Boolean] =
getOption(commonVars)
- def get(key: String, default: String): String =
getOption(key).getOrElse(default)
+ def get(key: String, default: String): String =
+ getOption(key, false).getOrElse(default)
+
+ def get(key: String, default: String, hotload: Boolean): String = {
+ getOption(key, hotload).getOrElse(default)
+ }
+
def get(commonVars: CommonVars[String]): Option[String] =
getOption(commonVars)
- def get(key: String): String = getOption(key).getOrElse(throw new
NoSuchElementException(key))
+ def get(key: String, hotload: Boolean = false): String =
+ getOption(key, hotload).getOrElse(throw new NoSuchElementException(key))
+
+ def getInt(key: String, default: Int, hotload: Boolean = false): Int =
+ getOption(key, hotload).map(_.toInt).getOrElse(default)
- def getInt(key: String, default: Int): Int =
getOption(key).map(_.toInt).getOrElse(default)
def getInt(commonVars: CommonVars[Int]): Option[Int] = getOption(commonVars)
- def contains(key: String): Boolean = getOption(key).isDefined
+ def contains(key: String, hotload: Boolean = false): Boolean =
getOption(key, hotload).isDefined
}
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/CommonVars.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/CommonVars.scala
index 3fc4c49c6..5475448c6 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/CommonVars.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/CommonVars.scala
@@ -24,14 +24,29 @@ import scala.collection.JavaConverters._
case class CommonVars[T](key: String, defaultValue: T, value: T, description:
String = null) {
val getValue: T = BDPConfiguration.getOption(this).getOrElse(defaultValue)
+ def getHotValue(): T = BDPConfiguration.getOption(this,
true).getOrElse(defaultValue)
+
def getValue(properties: java.util.Map[String, String]): T = {
if (properties == null || !properties.containsKey(key) ||
properties.get(key) == null) {
getValue
} else BDPConfiguration.formatValue(defaultValue,
Option(properties.get(key))).get
}
- def getValue(properties: Map[String, String]): T =
getValue(properties.asJava)
- def acquireNew: T = BDPConfiguration.getOption(this).getOrElse(defaultValue)
+ def getValue(properties: Map[String, String], hotload: Boolean = false): T =
getValue(
+ properties.asJava
+ )
+
+ def getValue(properties: java.util.Map[String, String], hotload: Boolean): T
= {
+ if (properties == null || !properties.containsKey(key) ||
properties.get(key) == null) {
+ if (hotload) {
+ getHotValue()
+ } else {
+ getValue
+ }
+ } else BDPConfiguration.formatValue(defaultValue,
Option(properties.get(key))).get
+ }
+
+ def acquireNew: T = getHotValue()
}
object CommonVars {
@@ -42,8 +57,10 @@ object CommonVars {
implicit def apply[T](key: String, defaultValue: T): CommonVars[T] =
new CommonVars(key, defaultValue, null.asInstanceOf[T], null)
- implicit def apply[T](key: String): CommonVars[T] = apply(key,
null.asInstanceOf[T])
+ implicit def apply[T](key: String): CommonVars[T] =
+ apply(key, null.asInstanceOf[T], null.asInstanceOf[T], null)
def properties: Properties = BDPConfiguration.properties
+ def hotProperties: Properties = BDPConfiguration.hotProperties
}
diff --git
a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/commonvars/CommonVarsTest.java
b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/commonvars/CommonVarsTest.java
new file mode 100644
index 000000000..5b502abea
--- /dev/null
+++
b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/commonvars/CommonVarsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.commonvars;
+
+import org.apache.linkis.common.conf.BDPConfiguration;
+import org.apache.linkis.common.conf.CommonVars;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CommonVarsTest {
+
+ @Test
+ public void testGetVars() {
+ {
+ String testKeyNotHotload = "wds.linkis.test___test___test.key1";
+ String defaultValueNotHotload = "defaultValueNotHotload";
+ CommonVars<String> strVar1 = CommonVars.apply(testKeyNotHotload,
defaultValueNotHotload);
+ assertEquals(defaultValueNotHotload, strVar1.defaultValue());
+ assertEquals(defaultValueNotHotload, strVar1.getValue());
+ }
+
+ {
+ String testKeyNotHotloadSet = "wds.linkis.test___test___test.key2";
+ String defaultValueNotHotloadSet1 = "defaultValueNotHotloadSet1";
+ String defaultValueNotHotloadSet2 = "defaultValueNotHotloadSet2";
+ String valueNotHotloadSet1 = "valueNotHotloadSet1";
+ String valueNotHotloadSet2 = "valueNotHotloadSet2";
+ CommonVars<String> strVar2 =
+ CommonVars.apply(testKeyNotHotloadSet, defaultValueNotHotloadSet1);
+ assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
+ assertEquals(defaultValueNotHotloadSet1, strVar2.getValue());
+
+ BDPConfiguration.setIfNotExists(testKeyNotHotloadSet,
valueNotHotloadSet1);
+ assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
+ // assertEquals(valueNotHotloadSet1, strVar2.getValue());
+ BDPConfiguration.setIfNotExists(testKeyNotHotloadSet,
valueNotHotloadSet2);
+ // assertEquals(valueNotHotloadSet1, strVar2.getValue());
+
+ BDPConfiguration.set(testKeyNotHotloadSet, valueNotHotloadSet2);
+ assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
+ // assertEquals(valueNotHotloadSet2, strVar2.getValue());
+ }
+ }
+
+ @Test
+ public void testGetHotloadVars() {
+ {
+ String testKeyHotload = "wds.linkis.test___test___test.key1";
+ String defaultValueHotload = "defaultValueHotload";
+ CommonVars<String> strVar1 = CommonVars.apply(testKeyHotload,
defaultValueHotload);
+ assertEquals(defaultValueHotload, strVar1.defaultValue());
+ assertEquals(defaultValueHotload, strVar1.getValue());
+ }
+
+ {
+ String testKeyHotloadSet = "wds.linkis.test___test___test.hotload.key2";
+ String defaultValueNotHotloadSet1 = "defaultValueNotHotloadSet1";
+ String defaultValueNotHotloadSet2 = "defaultValueNotHotloadSet2";
+ String valueNotHotloadSet1 = "valueNotHotloadSet1";
+ String valueNotHotloadSet2 = "valueNotHotloadSet2";
+ CommonVars<String> strVar2 = CommonVars.apply(testKeyHotloadSet,
defaultValueNotHotloadSet1);
+ assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
+ assertEquals(defaultValueNotHotloadSet1, strVar2.getValue());
+
+ BDPConfiguration.setIfNotExists(testKeyHotloadSet, valueNotHotloadSet1);
+ assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
+ // assertEquals(valueNotHotloadSet1, strVar2.getValue());
+ BDPConfiguration.setIfNotExists(testKeyHotloadSet, valueNotHotloadSet2);
+ // assertEquals(valueNotHotloadSet1, strVar2.getValue());
+
+ BDPConfiguration.set(testKeyHotloadSet, valueNotHotloadSet2);
+ assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
+ // assertEquals(valueNotHotloadSet2, strVar2.getValue());
+ }
+ }
+}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceEngine.java
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceEngine.java
index 8e2dc0d80..1625141c1 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceEngine.java
+++
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceEngine.java
@@ -59,7 +59,7 @@ public class QueryPersistenceEngine extends
AbstractPersistenceEngine {
private static final int MAX_DESC_LEN =
GovernanceCommonConf.ERROR_CODE_DESC_LEN();
private static final int RETRY_NUMBER =
- EntranceConfiguration.JOBINFO_UPDATE_RETRY_MAX_TIME().getValue();
+ EntranceConfiguration.JOBINFO_UPDATE_RETRY_MAX_TIME().getHotValue();
public QueryPersistenceEngine() {
/*
@@ -97,7 +97,7 @@ public class QueryPersistenceEngine extends
AbstractPersistenceEngine {
}
if (retry) {
try {
-
Thread.sleep(EntranceConfiguration.JOBINFO_UPDATE_RETRY_INTERVAL().getValue());
+
Thread.sleep(EntranceConfiguration.JOBINFO_UPDATE_RETRY_INTERVAL().getHotValue());
} catch (Exception ex) {
logger.warn(ex.getMessage());
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index 660e868be..f69baac58 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -348,11 +348,11 @@ public class EntranceRestfulApi implements
EntranceRestfulRemote {
corePercent =
cores.get().floatValue()
/
EntranceConfiguration.YARN_QUEUE_CORES_MAX()
- .getValue();
+ .getHotValue();
memoryPercent =
memory.get().floatValue()
/
(EntranceConfiguration.YARN_QUEUE_MEMORY_MAX()
- .getValue()
+ .getHotValue()
.longValue()
* 1024
* 1024
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
index b3abc3d1d..06b6520d1 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
@@ -423,7 +423,7 @@ class EntranceWebSocketService
val sparkLogSpecial: String =
EntranceConfiguration.SPARK_SPECIAL_LOG_INCLUDE.getValue
val hiveCreateTableLog: String =
EntranceConfiguration.HIVE_CREATE_TABLE_LOG.getValue
if (singleLog.contains(hiveLogSpecial) &&
singleLog.contains(hiveCreateTableLog)) {
- val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getValue
+ val threadName =
EntranceConfiguration.HIVE_THREAD_NAME.getHotValue()
val printInfo =
EntranceConfiguration.HIVE_PRINT_INFO_LOG.getValue
val start = singleLog.indexOf(threadName)
val end = singleLog.indexOf(printInfo) + printInfo.length
@@ -437,8 +437,8 @@ class EntranceWebSocketService
singleLog.contains(hiveLogSpecial) &&
singleLog.contains("map") && singleLog
.contains("reduce")
) {
- val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getValue
- val stageName = EntranceConfiguration.HIVE_STAGE_NAME.getValue
+ val threadName =
EntranceConfiguration.HIVE_THREAD_NAME.getHotValue()
+ val stageName =
EntranceConfiguration.HIVE_STAGE_NAME.getHotValue()
val start = singleLog.indexOf(threadName)
val end = singleLog.indexOf(stageName)
if (start > 0 && end > 0) {
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
index 7142a4f6f..b6a6ac30f 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
@@ -140,7 +140,8 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler)
extends Logging {
logger.debug("ClientHeartbeatMonitor ends scanning for one iteration")
}
- private val monitorCreators =
EntranceConfiguration.CLIENT_MONITOR_CREATOR.getValue.split(",")
+ private val monitorCreators =
+ EntranceConfiguration.CLIENT_MONITOR_CREATOR.getValue.split(",")
private def isCliJob(job: EntranceJob): Boolean = {
monitorCreators.exists(job.getCreator.equalsIgnoreCase)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
index dff122612..813315e7d 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
@@ -151,14 +151,16 @@ object CSEntranceHelper extends Logging {
case contextID: LinkisWorkflowContextID =>
if
(CSCommonUtils.CONTEXT_ENV_PROD.equalsIgnoreCase(contextID.getEnv)) {
logger.info(
- s"reset creator from ${userCreatorLabel.getCreator} to " +
EntranceConfiguration.SCHEDULER_CREATOR.getValue
+ s"reset creator from ${userCreatorLabel.getCreator} to " +
EntranceConfiguration.SCHEDULER_CREATOR
+ .getHotValue()
)
-
userCreatorLabel.setCreator(EntranceConfiguration.SCHEDULER_CREATOR.getValue)
+
userCreatorLabel.setCreator(EntranceConfiguration.SCHEDULER_CREATOR.getHotValue())
} else {
logger.info(
- s"reset creator from ${userCreatorLabel.getCreator} to " +
EntranceConfiguration.FLOW_EXECUTION_CREATOR.getValue
+ s"reset creator from ${userCreatorLabel.getCreator} to " +
EntranceConfiguration.FLOW_EXECUTION_CREATOR
+ .getHotValue()
)
-
userCreatorLabel.setCreator(EntranceConfiguration.FLOW_EXECUTION_CREATOR.getValue)
+
userCreatorLabel.setCreator(EntranceConfiguration.FLOW_EXECUTION_CREATOR.getHotValue())
}
case _ =>
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/Explain.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/Explain.scala
index 10502f56d..c16c1a7dd 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/Explain.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/Explain.scala
@@ -60,7 +60,7 @@ object SparkExplain extends Explain {
private val LOG: Logger = LoggerFactory.getLogger(getClass)
override def authPass(code: String, error: StringBuilder): Boolean = {
- if (EntranceConfiguration.SKIP_AUTH.getValue) {
+ if (EntranceConfiguration.SKIP_AUTH.getHotValue()) {
return true
}
if (scStop.matcher(code).find()) {
@@ -346,7 +346,7 @@ object PythonExplain extends Explain {
private val SC_STOP = """sc\.stop""".r.unanchored
override def authPass(code: String, error: StringBuilder): Boolean = {
- if (EntranceConfiguration.SKIP_AUTH.getValue) {
+ if (EntranceConfiguration.SKIP_AUTH.getHotValue()) {
return true
}
@@ -408,7 +408,7 @@ object ScalaExplain extends Explain {
private val LOG: Logger = LoggerFactory.getLogger(getClass)
override def authPass(code: String, error: StringBuilder): Boolean = {
- if (EntranceConfiguration.SKIP_AUTH.getValue) {
+ if (EntranceConfiguration.SKIP_AUTH.getHotValue()) {
return true
}
code match {
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
index 8b851e402..54491ad0b 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
@@ -74,7 +74,7 @@ class CacheLogManager extends LogManager with Logging {
}
job match {
case entranceExecutionJob: EntranceExecutionJob =>
- val cache: Cache =
Cache(EntranceConfiguration.DEFAULT_CACHE_MAX.getValue)
+ val cache: Cache =
Cache(EntranceConfiguration.DEFAULT_CACHE_MAX.getHotValue())
val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
val fsLogPath = new FsPath(logPath)
val cacheLogWriter: LogWriter =
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/orchestrator/plugin/EntranceUserParallelOrchestratorPlugin.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/orchestrator/plugin/EntranceUserParallelOrchestratorPlugin.scala
index f1c7378c3..5799473cd 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/orchestrator/plugin/EntranceUserParallelOrchestratorPlugin.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/orchestrator/plugin/EntranceUserParallelOrchestratorPlugin.scala
@@ -43,7 +43,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader,
LoadingCache}
class EntranceUserParallelOrchestratorPlugin extends
UserParallelOrchestratorPlugin with Logging {
- private val DEFAULT_MAX_RUNNING =
EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue
+ private val DEFAULT_MAX_RUNNING =
EntranceConfiguration.WDS_LINKIS_INSTANCE.getHotValue()
private val SPLIT = ","
@@ -75,11 +75,11 @@ class EntranceUserParallelOrchestratorPlugin extends
UserParallelOrchestratorPlu
) {
logger.error(
s"cannot found user configuration
key:${EntranceConfiguration.WDS_LINKIS_INSTANCE.key}," +
- s"will use default value
${EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue}。All config map:
${BDPJettyServerHelper.gson
+ s"will use default value
${EntranceConfiguration.WDS_LINKIS_INSTANCE.getHotValue()}。All config map:
${BDPJettyServerHelper.gson
.toJson(keyAndValue)}"
)
}
- val maxRunningJobs =
EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue)
+ val maxRunningJobs =
EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue, true)
maxRunningJobs
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index e2881a9c7..146eb2c2c 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -183,7 +183,7 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
.asInstanceOf[UserCreatorLabel]
if (null == userCreatorLabel) {
userCreatorLabel =
labelBuilderFactory.createLabel(classOf[UserCreatorLabel])
- val creator =
EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue
+ val creator =
EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getHotValue()
userCreatorLabel.setUser(executeUser)
userCreatorLabel.setCreator(creator)
labels.put(userCreatorLabel.getLabelKey, userCreatorLabel)
@@ -221,13 +221,13 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
val executeApplicationName =
params.get(TaskConstant.EXECUTEAPPLICATIONNAME).asInstanceOf[String]
if (StringUtils.isBlank(creator)) {
- creator = EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue
+ creator =
EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getHotValue()
}
// When the execution type is IDE, executioncode and scriptpath cannot be
empty at the same time
if (
- EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue.equals(
- creator
- ) && StringUtils.isEmpty(source.get(TaskConstant.SCRIPTPATH)) &&
+ EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME
+ .getHotValue()
+ .equals(creator) &&
StringUtils.isEmpty(source.get(TaskConstant.SCRIPTPATH)) &&
StringUtils.isEmpty(executionCode)
) {
throw new EntranceIllegalParamException(
@@ -238,7 +238,8 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
var runType: String = null
if (StringUtils.isNotEmpty(executionCode)) {
runType = params.get(TaskConstant.RUNTYPE).asInstanceOf[String]
- if (StringUtils.isEmpty(runType)) runType =
EntranceConfiguration.DEFAULT_RUN_TYPE.getValue
+ if (StringUtils.isEmpty(runType))
+ runType = EntranceConfiguration.DEFAULT_RUN_TYPE.getHotValue()
// If formatCode is not empty, we need to format it(如果formatCode
不为空的话,我们需要将其进行格式化)
if (formatCode) executionCode = format(executionCode)
jobReq.setExecutionCode(executionCode)
diff --git
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/service/FsProxyService.scala
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/service/FsProxyService.scala
index d530df778..53b7eb585 100644
---
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/service/FsProxyService.scala
+++
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/service/FsProxyService.scala
@@ -29,18 +29,5 @@ class FsProxyService extends Logging {
case StorageConfiguration.HDFS_ROOT_USER.getValue => StorageUtils.HDFS
== fsType
case _ => true // creatorUser.equals(proxyUser)
}
- /* if(creatorUser.equals(proxyUser)) {
- return true
- }
- if(creatorUser == StorageConfiguration.STORAGE_ROOT_USER.getValue ) return
t
- if(StorageUtils.FILE == fsType && creatorUser ==
StorageConfiguration.LOCAL_ROOT_USER.getValue) {
- return true
- }
- if(StorageUtils.HDFS == fsType && creatorUser ==
StorageConfiguration.HDFS_ROOT_USER.getValue) {
- return true
- }
- info(s"$creatorUser Failed to proxy user:$proxyUser of FsType:$fsType") {
- true
- } */
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]