This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new acd4493 Add a metric to track the number of active threads of a
container's thread pool (#1476)
acd4493 is described below
commit acd4493a62051b020f928bd288a3623a8c979f92
Author: Ziting <[email protected]>
AuthorDate: Fri Mar 19 16:19:19 2021 -0700
Add a metric to track the number of active threads of a container's thread
pool (#1476)
Feature: Add a metric to track the number of active threads of a
container's thread pool. This will be used to calculate the thread pool
utilization of a job, which will be used as a signal for the autosizing
controller to scale up or down the job thread pool size and number of
containers.
Changes:
- Add a metric to track the number of active threads of a container's
thread pool.
- Update the existing container thread pool size metric to reflect the
pool size from the thread pool instead of emitting the static
job.container.thread.pool.size
---
.../documentation/versioned/operations/monitoring.md | 2 ++
.../scala/org/apache/samza/container/SamzaContainer.scala | 15 +++++++++------
.../apache/samza/container/SamzaContainerMetrics.scala | 1 +
3 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/docs/learn/documentation/versioned/operations/monitoring.md
b/docs/learn/documentation/versioned/operations/monitoring.md
index de2497c..0671047 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -394,6 +394,8 @@ All \<system\>, \<stream\>, \<partition\>, \<store-name\>,
\<topic\>, are popula
| | executor-work-factor | The work factor of the run loop. A work factor of 1
indicates full throughput, while a work factor of less than 1 will introduce
delays into the execution to approximate the requested work factor. The work
factor is set by the disk space monitor in accordance with the disk quota
policy. Given the latest percentage of available disk quota, this policy
returns the work factor that should be applied. |
| | physical-memory-mb | The physical memory used by the Samza container
process (native + on heap) (in MBs). |
| | physical-memory-utilization | The ratio between the physical memory used
by the Samza container process (native + on heap) and the total physical memory
of the Samza container. |
+| | container-thread-pool-size | The current size of a Samza container's
thread pool. It may or may not be the same as job.container.thread.pool.size,
depending on the implementation. |
+| | container-active-threads | The approximate actively used threads in a
Samza container's thread pool. |
| | <TaskName\>-<StoreName\>-restore-time | Time taken to restore task stores
(per task store). |
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3a4ba17..2e87f48 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,7 +26,7 @@ import java.nio.file.Path
import java.time.Duration
import java.util
import java.util.{Base64, Optional}
-import java.util.concurrent.{CountDownLatch, ExecutorService, Executors,
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutorService, Executors,
ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -476,7 +476,6 @@ object SamzaContainer extends Logging {
val threadPoolSize = jobConfig.getThreadPoolSize
info("Got thread pool size: " + threadPoolSize)
- samzaContainerMetrics.containerThreadPoolSize.set(threadPoolSize)
val taskThreadPool = if (threadPoolSize > 0) {
Executors.newFixedThreadPool(threadPoolSize,
@@ -619,16 +618,20 @@ object SamzaContainer extends Logging {
val containerMemoryMb : Int = new
ClusterManagerConfig(config).getContainerMemoryMb
- val memoryStatisticsMonitor : SystemStatisticsMonitor = new
StatisticsMonitorImpl()
- memoryStatisticsMonitor.registerListener(new
SystemStatisticsMonitor.Listener {
+ val hostStatisticsMonitor : SystemStatisticsMonitor = new
StatisticsMonitorImpl()
+ hostStatisticsMonitor.registerListener(new
SystemStatisticsMonitor.Listener {
override def onUpdate(sample: SystemMemoryStatistics): Unit = {
val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes
val physicalMemoryMb : Float = physicalMemoryBytes / (1024.0F *
1024.0F)
val memoryUtilization : Float = physicalMemoryMb.toFloat /
containerMemoryMb
+ val containerThreadPoolSize : Long =
taskThreadPool.asInstanceOf[ThreadPoolExecutor].getPoolSize
+ val containerActiveThreads : Long =
taskThreadPool.asInstanceOf[ThreadPoolExecutor].getActiveCount
logger.debug("Container physical memory utilization (mb): " +
physicalMemoryMb)
logger.debug("Container physical memory utilization: " +
memoryUtilization)
samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb)
- samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization);
+ samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization)
+
samzaContainerMetrics.containerThreadPoolSize.set(containerThreadPoolSize)
+
samzaContainerMetrics.containerActiveThreads.set(containerActiveThreads)
}
})
@@ -674,7 +677,7 @@ object SamzaContainer extends Logging {
reporters = reporters,
jvm = jvm,
diskSpaceMonitor = diskSpaceMonitor,
- hostStatisticsMonitor = memoryStatisticsMonitor,
+ hostStatisticsMonitor = hostStatisticsMonitor,
taskThreadPool = taskThreadPool,
timerExecutor = timerExecutor,
containerContext = containerContext,
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 46de166..33d288f 100644
---
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -49,6 +49,7 @@ class SamzaContainerMetrics(
val physicalMemoryMb = newGauge("physical-memory-mb", 0.0F)
val physicalMemoryUtilization = newGauge("physical-memory-utilization", 0.0F)
val containerThreadPoolSize = newGauge("container-thread-pool-size", 0L)
+ val containerActiveThreads = newGauge("container-active-threads", 0L)
val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new
util.HashMap[TaskName, Gauge[Long]]()