This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.3 by this push:
new 12c8facfa linkis-entranc - add support for specified username to use
specified groupCapacity (#2160)
12c8facfa is described below
commit 12c8facfa23a5e16ccdf8eb91d8875c063ac28bf
Author: Alexyang <[email protected]>
AuthorDate: Mon May 23 19:01:48 2022 +0800
linkis-entranc - add support for specified username to use specified
groupCapacity (#2160)
---
.../entrance/scheduler/EntranceGroupFactory.scala | 20 ++++++++++++++++++--
1 file changed, 18 insertions(+), 2 deletions(-)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
index 1db867b3f..9c8694f3c 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
@@ -36,6 +36,7 @@ import org.apache.linkis.scheduler.queue.{Group,
GroupFactory, SchedulerEvent}
import java.util
import java.util.concurrent.TimeUnit
+import java.util.regex.Pattern
import scala.collection.JavaConversions._
@@ -46,9 +47,15 @@ class EntranceGroupFactory extends GroupFactory with Logging
{
.maximumSize(EntranceConfiguration.GRORUP_CACHE_MAX.getValue).build()
private val GROUP_MAX_CAPACITY =
CommonVars("wds.linkis.entrance.max.capacity", 2000)
+ private val SPECIFIED_USERNAME_REGEX =
CommonVars("wds.linkis.entrance.specified.username.regex", "hduser.*")
+ private val GROUP_SPECIFIED_USER_MAX_CAPACITY =
CommonVars("wds.linkis.entrance.specified.max.capacity", 5000)
private val GROUP_INIT_CAPACITY =
CommonVars("wds.linkis.entrance.init.capacity", 100)
-
+ private val specifiedUsernameRegexPattern: Pattern = if
(StringUtils.isNotBlank(SPECIFIED_USERNAME_REGEX.getValue)) {
+ Pattern.compile(SPECIFIED_USERNAME_REGEX.getValue)
+ } else {
+ null
+ }
override def getOrCreateGroup(event: SchedulerEvent): Group = {
val (labels, params) = event match {
@@ -79,7 +86,16 @@ class EntranceGroupFactory extends GroupFactory with Logging
{
}("Get user configurations from configuration server failed! Next use
the default value to continue.")
val maxRunningJobs =
EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue)
val initCapacity = GROUP_INIT_CAPACITY.getValue(keyAndValue)
- val maxCapacity = GROUP_MAX_CAPACITY.getValue(keyAndValue)
+ val maxCapacity = if (null != specifiedUsernameRegexPattern) {
+ if
(specifiedUsernameRegexPattern.matcher(userCreatorLabel.getUser).find()) {
+ logger.info(s"Set maxCapacity of user ${userCreatorLabel.getUser} to
specifiedMaxCapacity :
${GROUP_SPECIFIED_USER_MAX_CAPACITY.getValue(keyAndValue)}")
+ GROUP_SPECIFIED_USER_MAX_CAPACITY.getValue(keyAndValue)
+ } else {
+ GROUP_MAX_CAPACITY.getValue(keyAndValue)
+ }
+ } else {
+ GROUP_MAX_CAPACITY.getValue(keyAndValue)
+ }
logger.info(s"Got user configurations: groupName=$groupName,
maxRunningJobs=$maxRunningJobs, initCapacity=$initCapacity,
maxCapacity=$maxCapacity.")
val group = new ParallelGroup(groupName, initCapacity, maxCapacity)
group.setMaxRunningJobs(maxRunningJobs)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]