This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit dd782c7d4e4e54c26bf6d4680ff844e78c274f86 Author: peacewong <[email protected]> AuthorDate: Tue Mar 19 22:01:03 2024 +0800 Relieve memory usage --- .../linkis/scheduler/conf/SchedulerConfiguration.scala | 4 ++-- .../main/scala/org/apache/linkis/scheduler/queue/Job.scala | 13 ++++++++++--- .../linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala | 2 ++ .../queue/parallelqueue/ParallelConsumerManager.scala | 4 ++-- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala index 8fd6f1c6f0..c2829052ac 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala @@ -25,10 +25,10 @@ object SchedulerConfiguration { CommonVars("wds.linkis.fifo.consumer.auto.clear.enabled", true) val FIFO_CONSUMER_MAX_IDLE_TIME = - CommonVars("wds.linkis.fifo.consumer.max.idle.time", new TimeType("1h")).getValue.toLong + CommonVars("wds.linkis.fifo.consumer.max.idle.time", new TimeType("10m")).getValue.toLong val FIFO_CONSUMER_IDLE_SCAN_INTERVAL = - CommonVars("wds.linkis.fifo.consumer.idle.scan.interval", new TimeType("2h")) + CommonVars("wds.linkis.fifo.consumer.idle.scan.interval", new TimeType("30m")) val FIFO_CONSUMER_IDLE_SCAN_INIT_TIME = CommonVars("wds.linkis.fifo.consumer.idle.scan.init.time", new TimeType("1s")) diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala index 2087153813..7534f74071 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala @@ -210,9 +210,6 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg case _ => jobDaemon.foreach(_.kill()) jobListener.foreach(_.onJobCompleted(this)) -// if(getJobInfo != null) logListener.foreach(_.onLogUpdate(this, getJobInfo.getMetric)) - logListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("job is completed."))) - // TODO job end event } protected def transitionCompleted(executeCompleted: CompletedExecuteResponse): Unit = { @@ -351,6 +348,16 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg } override def toString: String = if (StringUtils.isNotBlank(getName)) getName else getId + + /** + * clear job memory + */ + def clear(): Unit = { + logger.info(s" clear job base info $getId") + this.executor = null + this.jobDaemon = null + } + } /** diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala index fcab44a731..df296f8329 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala @@ -232,6 +232,8 @@ class FIFOUserConsumer( case _ => } } + // clear cache + queue.clearAll() this.runningJobs.foreach { job => if (job != null && !job.isCompleted) { diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala index 396b6fb315..1e753ea866 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala @@ -126,8 +126,8 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String) override def destroyConsumer(groupName: String): Unit = consumerGroupMap.get(groupName).foreach { tmpConsumer => - tmpConsumer.shutdown() - consumerGroupMap.remove(groupName) + Utils.tryAndWarn(tmpConsumer.shutdown()) + Utils.tryAndWarn(consumerGroupMap.remove(groupName)) consumerListener.foreach(_.onConsumerDestroyed(tmpConsumer)) logger.warn(s"Consumer of group ($groupName) in $schedulerName is destroyed.") } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
