This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new acd4493  Add a metric to track the number of active threads of a 
container's thread pool (#1476)
acd4493 is described below

commit acd4493a62051b020f928bd288a3623a8c979f92
Author: Ziting <[email protected]>
AuthorDate: Fri Mar 19 16:19:19 2021 -0700

    Add a metric to track the number of active threads of a container's thread 
pool (#1476)
    
    Feature: Add a metric to track the number of active threads of a 
container's thread pool. This will be used to calculate the thread pool 
utilization of a job, which will be used as a signal for the autosizing 
controller to scale up or down the job thread pool size and number of 
containers.
    
    Changes:
     - Add a metric to track the number of active threads of a container's 
thread pool.
     - Update the existing container thread pool size metric to reflect the 
pool size from the thread pool instead of emitting the static 
job.container.thread.pool.size
---
 .../documentation/versioned/operations/monitoring.md      |  2 ++
 .../scala/org/apache/samza/container/SamzaContainer.scala | 15 +++++++++------
 .../apache/samza/container/SamzaContainerMetrics.scala    |  1 +
 3 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/docs/learn/documentation/versioned/operations/monitoring.md 
b/docs/learn/documentation/versioned/operations/monitoring.md
index de2497c..0671047 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -394,6 +394,8 @@ All \<system\>, \<stream\>, \<partition\>, \<store-name\>, 
\<topic\>, are popula
 | | executor-work-factor | The work factor of the run loop. A work factor of 1 
indicates full throughput, while a work factor of less than 1 will introduce 
delays into the execution to approximate the requested work factor. The work 
factor is set by the disk space monitor in accordance with the disk quota 
policy. Given the latest percentage of available disk quota, this policy 
returns the work factor that should be applied. |
 | | physical-memory-mb | The physical memory used by the Samza container 
process (native + on heap) (in MBs). |
 | | physical-memory-utilization | The ratio between the physical memory used 
by the Samza container process (native + on heap) and the total physical memory 
of the Samza container. |
+| | container-thread-pool-size | The current size of a Samza container's 
thread pool. It may or may not be the same as job.container.thread.pool.size, 
depending on the implementation. |
+| | container-active-threads | The approximate actively used threads in a 
Samza container's thread pool. |
 | | <TaskName\>-<StoreName\>-restore-time | Time taken to restore task stores 
(per task store). |
 
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3a4ba17..2e87f48 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,7 +26,7 @@ import java.nio.file.Path
 import java.time.Duration
 import java.util
 import java.util.{Base64, Optional}
-import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, 
ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -476,7 +476,6 @@ object SamzaContainer extends Logging {
 
     val threadPoolSize = jobConfig.getThreadPoolSize
     info("Got thread pool size: " + threadPoolSize)
-    samzaContainerMetrics.containerThreadPoolSize.set(threadPoolSize)
 
     val taskThreadPool = if (threadPoolSize > 0) {
       Executors.newFixedThreadPool(threadPoolSize,
@@ -619,16 +618,20 @@ object SamzaContainer extends Logging {
 
     val containerMemoryMb : Int = new 
ClusterManagerConfig(config).getContainerMemoryMb
 
-    val memoryStatisticsMonitor : SystemStatisticsMonitor = new 
StatisticsMonitorImpl()
-    memoryStatisticsMonitor.registerListener(new 
SystemStatisticsMonitor.Listener {
+    val hostStatisticsMonitor : SystemStatisticsMonitor = new 
StatisticsMonitorImpl()
+    hostStatisticsMonitor.registerListener(new 
SystemStatisticsMonitor.Listener {
       override def onUpdate(sample: SystemMemoryStatistics): Unit = {
         val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes
         val physicalMemoryMb : Float = physicalMemoryBytes / (1024.0F * 
1024.0F)
         val memoryUtilization : Float = physicalMemoryMb.toFloat / 
containerMemoryMb
+        val containerThreadPoolSize : Long = 
taskThreadPool.asInstanceOf[ThreadPoolExecutor].getPoolSize
+        val containerActiveThreads : Long = 
taskThreadPool.asInstanceOf[ThreadPoolExecutor].getActiveCount
         logger.debug("Container physical memory utilization (mb): " + 
physicalMemoryMb)
         logger.debug("Container physical memory utilization: " + 
memoryUtilization)
         samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb)
-        samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization);
+        samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization)
+        
samzaContainerMetrics.containerThreadPoolSize.set(containerThreadPoolSize)
+        
samzaContainerMetrics.containerActiveThreads.set(containerActiveThreads)
       }
     })
 
@@ -674,7 +677,7 @@ object SamzaContainer extends Logging {
       reporters = reporters,
       jvm = jvm,
       diskSpaceMonitor = diskSpaceMonitor,
-      hostStatisticsMonitor = memoryStatisticsMonitor,
+      hostStatisticsMonitor = hostStatisticsMonitor,
       taskThreadPool = taskThreadPool,
       timerExecutor = timerExecutor,
       containerContext = containerContext,
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 46de166..33d288f 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -49,6 +49,7 @@ class SamzaContainerMetrics(
   val physicalMemoryMb = newGauge("physical-memory-mb", 0.0F)
   val physicalMemoryUtilization = newGauge("physical-memory-utilization", 0.0F)
   val containerThreadPoolSize = newGauge("container-thread-pool-size", 0L)
+  val containerActiveThreads = newGauge("container-active-threads", 0L)
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new 
util.HashMap[TaskName, Gauge[Long]]()
 

Reply via email to