This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 95a0c3334 add the metric "container-active-threads" back (#1638) 95a0c3334 is described below commit 95a0c3334fbb701145e2cca922422f46af1544ae Author: Alan Zhang <shuai....@gmail.com> AuthorDate: Thu Nov 3 15:09:14 2022 -0700 add the metric "container-active-threads" back (#1638) Symptom No data was emitted for the metric container-active-threads Cause This PR(#1501 ) removed the logic to emit data for metric accidently: https://github.com/apache/samza/pull/1501/files#diff-f79781ad4c55ae7860829b06fd9dfd15e8069c37e64f8854d8f27ca2cd1f3ee5L637 Changes Add the deleted logic back in a new classSamzaContainerMonitorListener --- .../container/SamzaContainerMonitorListener.java | 63 +++++++++++++++++++ .../apache/samza/container/SamzaContainer.scala | 19 ++---- .../TestSamzaContainerMonitorListener.java | 71 ++++++++++++++++++++++ 3 files changed, 138 insertions(+), 15 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java new file mode 100644 index 000000000..b498b799e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container; + +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.container.host.SystemMemoryStatistics; +import org.apache.samza.container.host.SystemStatisticsMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SamzaContainerMonitorListener implements SystemStatisticsMonitor.Listener { + private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerMonitorListener.class); + + private final SamzaContainerMetrics containerMetrics; + private final ExecutorService taskThreadPool; + private final int containerMemoryMb; + + public SamzaContainerMonitorListener(Config config, SamzaContainerMetrics containerMetrics, + ExecutorService taskThreadPool) { + this.containerMetrics = containerMetrics; + this.taskThreadPool = taskThreadPool; + this.containerMemoryMb = new ClusterManagerConfig(config).getContainerMemoryMb(); + } + + @Override + public void onUpdate(SystemMemoryStatistics sample) { + // update memory metric + long physicalMemoryBytes = sample.getPhysicalMemoryBytes(); + float physicalMemoryMb = physicalMemoryBytes / (1024.0F * 1024.0F); + float memoryUtilization = physicalMemoryMb / containerMemoryMb; + LOGGER.debug("Container physical memory utilization (mb): " + physicalMemoryMb); + LOGGER.debug("Container physical memory utilization: " + memoryUtilization); + containerMetrics.physicalMemoryMb().set(physicalMemoryMb); + containerMetrics.physicalMemoryUtilization().set(memoryUtilization); + + // update thread related metrics + if (Objects.nonNull(taskThreadPool) && taskThreadPool instanceof ThreadPoolExecutor) { + int containerActiveThreads = ((ThreadPoolExecutor) taskThreadPool).getActiveCount(); + LOGGER.debug("Container active threads count: " + containerActiveThreads); + containerMetrics.containerActiveThreads().set(containerActiveThreads); + } + } +} diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 69c7d4e5a..fa0843ab3 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -639,20 +639,9 @@ object SamzaContainer extends Logging { appConfig.getRunId, isHighLevelApiJob) - val containerMemoryMb : Int = new ClusterManagerConfig(config).getContainerMemoryMb - - val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl() - memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener { - override def onUpdate(sample: SystemMemoryStatistics): Unit = { - val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes - val physicalMemoryMb : Float = physicalMemoryBytes / (1024.0F * 1024.0F) - val memoryUtilization : Float = physicalMemoryMb.toFloat / containerMemoryMb - logger.debug("Container physical memory utilization (mb): " + physicalMemoryMb) - logger.debug("Container physical memory utilization: " + memoryUtilization) - samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb) - samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization); - } - }) + val systemStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl() + systemStatisticsMonitor.registerListener( + new SamzaContainerMonitorListener(config, samzaContainerMetrics, taskThreadPool)) val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue) samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes) @@ -697,7 +686,7 @@ object SamzaContainer extends Logging { reporters = reporters, jvm = jvm, diskSpaceMonitor = diskSpaceMonitor, - hostStatisticsMonitor = memoryStatisticsMonitor, + hostStatisticsMonitor = systemStatisticsMonitor, taskThreadPool = taskThreadPool, commitThreadPool = commitThreadPool, timerExecutor = timerExecutor, diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java new file mode 100644 index 000000000..b38377b2e --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container; + +import java.util.Collections; +import java.util.concurrent.ThreadPoolExecutor; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.host.SystemMemoryStatistics; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +public class TestSamzaContainerMonitorListener { + + @Mock + private SystemMemoryStatistics sample; + @Mock + private ThreadPoolExecutor taskThreadPool; + + private final int containerMemoryMb = 2048; + private final long physicalMemoryBytes = 1024000L; + private final int activeThreadCount = 2; + + private final Config config = + new MapConfig(Collections.singletonMap("cluster-manager.container.memory.mb", String.valueOf(containerMemoryMb))); + private final SamzaContainerMetrics containerMetrics = + new SamzaContainerMetrics("container", new MetricsRegistryMap(), ""); + + private SamzaContainerMonitorListener samzaContainerMonitorListener; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + when(sample.getPhysicalMemoryBytes()).thenReturn(physicalMemoryBytes); + when(taskThreadPool.getActiveCount()).thenReturn(activeThreadCount); + + samzaContainerMonitorListener = new SamzaContainerMonitorListener(config, containerMetrics, taskThreadPool); + } + + @Test + public void testOnUpdate() { + samzaContainerMonitorListener.onUpdate(sample); + float physicalMemoryMb = physicalMemoryBytes / 1024.0F / 1024.0F; + assertEquals(physicalMemoryMb, containerMetrics.physicalMemoryMb().getValue()); + assertEquals(physicalMemoryMb / containerMemoryMb, containerMetrics.physicalMemoryUtilization().getValue()); + assertEquals(activeThreadCount, containerMetrics.containerActiveThreads().getValue()); + } +}