Repository: samza Updated Branches: refs/heads/master 2187d6bd9 -> 40445e2c6
SAMZA-972: holistic physical memory monitoring Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40445e2c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40445e2c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40445e2c Branch: refs/heads/master Commit: 40445e2c6ce0e3f31d36373633ee5ea215f3024b Parents: 2187d6b Author: Jagadish Venkatraman <[email protected]> Authored: Wed Jul 27 10:16:30 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Wed Jul 27 10:16:30 2016 -0700 ---------------------------------------------------------------------- build.gradle | 1 + checkstyle/import-control.xml | 1 + .../disk/PollingScanDiskSpaceMonitor.java | 12 +- .../host/PosixCommandBasedStatisticsGetter.java | 84 +++++++++ .../container/host/StatisticsMonitorImpl.java | 188 +++++++++++++++++++ .../container/host/SystemMemoryStatistics.java | 62 ++++++ .../container/host/SystemStatisticsGetter.java | 33 ++++ .../container/host/SystemStatisticsMonitor.java | 60 ++++++ .../apache/samza/container/SamzaContainer.scala | 34 +++- .../samza/container/SamzaContainerMetrics.scala | 2 + .../host/TestStatisticsMonitorImpl.java | 98 ++++++++++ 11 files changed, 571 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index ba4a9d1..1d4eb74 100644 --- a/build.gradle +++ b/build.gradle @@ -145,6 +145,7 @@ project(":samza-core_$scalaVersion") { dependencies { compile project(':samza-api') + compile "com.google.guava:guava:$guavaVersion" compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c85dc94..7e77702 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -144,6 +144,7 @@ <allow pkg="org.apache.samza.config" /> <allow pkg="org.apache.samza.container" /> <allow pkg="org.apache.samza.coordinator.stream" /> + <allow pkg="com.google.common" /> <allow pkg="org.apache.samza.util" /> <allow pkg="junit.framework" /> <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" /> http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java index 50c8500..75e461d 100644 --- a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java @@ -18,6 +18,9 @@ */ package org.apache.samza.container.disk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; @@ -45,6 +48,8 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor { private enum State { INIT, RUNNING, STOPPED } private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl(); + private static final Logger log = LoggerFactory.getLogger(PollingScanDiskSpaceMonitor.class); + // Note: we use this as a set where the value is always Boolean.TRUE. private final ConcurrentMap<Listener, Boolean> listenerSet = new ConcurrentHashMap<>(); @@ -184,7 +189,12 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor { private void updateSample() { long totalBytes = getSpaceUsed(watchPaths); for (Listener listener : listenerSet.keySet()) { - listener.onUpdate(totalBytes); + try { + listener.onUpdate(totalBytes); + } catch (Throwable e) { + // catch an exception thrown by one listener so that it does not impact other listeners. + log.error("Exception thrown by a listener ", e); + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java new file mode 100644 index 0000000..dbcd370 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.host; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * An implementation of {@link SystemStatisticsGetter} that relies on using Posix commands like ps. + */ +public class PosixCommandBasedStatisticsGetter implements SystemStatisticsGetter { + + private static final Logger log = LoggerFactory.getLogger(PosixCommandBasedStatisticsGetter.class); + + /** + * A convenience method to execute shell commands and return the first line of their output. + * + * @param cmdArray the command to run + * @return the first line of the output. + * @throws IOException + */ + private String getCommandOutput(String [] cmdArray) throws IOException { + Process executable = Runtime.getRuntime().exec(cmdArray); + BufferedReader processReader = null; + String psOutput = null; + + try { + processReader = new BufferedReader(new InputStreamReader(executable.getInputStream())); + psOutput = processReader.readLine(); + } finally { + if (processReader != null) { + processReader.close(); + } + } + return psOutput; + } + + private long getPhysicalMemory() throws IOException { + + // returns a single long value that represents the rss memory of the process. + String commandOutput = getCommandOutput(new String[]{"sh", "-c", "ps -o rss= -p $PPID"}); + + // this should never happen. + if (commandOutput == null) { + throw new IOException("ps returned unexpected output: " + commandOutput); + } + + long rssMemoryKb = Long.parseLong(commandOutput.trim()); + //convert to bytes + return rssMemoryKb * 1024; + } + + + @Override + public SystemMemoryStatistics getSystemMemoryStatistics() { + try { + long memory = getPhysicalMemory(); + return new SystemMemoryStatistics(memory); + } catch (Exception e) { + log.warn("Error when running ps: ", e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java new file mode 100644 index 0000000..3dfdf36 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.host; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An implementation of {@link SystemStatisticsMonitor} for unix and mac platforms. Users can implement their own + * ways of getting {@link SystemMemoryStatistics} and provide a {@link SystemStatisticsGetter} implementation. The default + * behavior is to rely on unix commands like ps to obtain {@link SystemMemoryStatistics} + * + * All callback invocations are from the same thread - hence, are guaranteed to be serialized. An exception thrown + * from a callback will suppress all subsequent callbacks. If the execution of a + * {@link org.apache.samza.container.host.SystemStatisticsMonitor.Listener} callback takes longer than the polling + * interval, subsequent callback invocations may start late but will not be invoked concurrently. + * + * This class is thread-safe. + */ +public class StatisticsMonitorImpl implements SystemStatisticsMonitor { + + private static final ThreadFactory THREAD_FACTORY = new StatisticsMonitorThreadFactory(); + private static final Logger LOG = LoggerFactory.getLogger(StatisticsMonitorImpl.class); + + /** + * Polling interval of this monitor. The monitor will report host statistics periodically via a callback + * after pollingIntervalMillis, pollingIntervalMillis *2, pollingIntervalMillis * 3 and so on. + * + */ + private final long pollingIntervalMillis; + + + // Use a private lock instead of synchronized because an instance of StatisticsMonitorImpl could be used as a + // lock else-where. + private final Object lock = new Object(); + + // Single threaded executor to handle callback invocations. + private final ScheduledExecutorService schedulerService = + Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + + // Use this as a set with value always set to True + private final ConcurrentMap<StatisticsMonitorImpl.Listener, Boolean> listenerSet = new ConcurrentHashMap<>(); + private final SystemStatisticsGetter statisticsGetter; + + /** + * Tracks the state of the monitor. Typical state transitions from INIT (when the monitor is created) to RUNNING (when + * the start is invoked on the monitor) to STOPPED (when stop is invoked) + */ + private enum State { INIT, RUNNING, STOPPED } + private volatile State currentState; + + + /** + * Creates a new {@link StatisticsMonitorImpl} that reports statistics every 60 seconds + * + */ + public StatisticsMonitorImpl() { + this(60000, new PosixCommandBasedStatisticsGetter()); + } + + /** + * Creates a new {@link StatisticsMonitorImpl} that reports statistics periodically + + * @param pollingIntervalMillis The polling interval to report statistics. + * @param statisticsGetter the getter to gather system stats info + */ + public StatisticsMonitorImpl(long pollingIntervalMillis, SystemStatisticsGetter statisticsGetter) { + this.pollingIntervalMillis = pollingIntervalMillis; + this.statisticsGetter = statisticsGetter; + currentState = State.INIT; + } + + @Override + public void start() { + synchronized (lock) { + switch (currentState) { + case INIT: + currentState = State.RUNNING; + schedulerService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + sampleStatistics(); + } + }, pollingIntervalMillis, pollingIntervalMillis, TimeUnit.MILLISECONDS); + break; + + case RUNNING: + return; + + case STOPPED: + throw new IllegalStateException("Attempting to start an already stopped StatisticsMonitor"); + } + } + } + + private void sampleStatistics() { + SystemMemoryStatistics statistics = null; + try { + statistics = statisticsGetter.getSystemMemoryStatistics(); + } catch (Throwable e) { + LOG.error("Error during obtaining statistics: ", e); + } + + for (Listener listener : listenerSet.keySet()) { + if (statistics != null) { + try { + // catch all exceptions to shield one listener from exceptions thrown by others. + listener.onUpdate(statistics); + } catch (Throwable e) { + // delete this listener so that it does not receive future callbacks. + listenerSet.remove(listener); + LOG.error("Listener threw an exception: ", e); + } + } + } + } + + + /** + * Stops the monitor. Once the monitor is stopped, no new samples will be delivered to the listeners. If stop is + * invoked during the period a {@link org.apache.samza.container.host.SystemStatisticsMonitor.Listener} callback is + * under execution, may cause the callback to be interrupted. + */ + + @Override + public void stop() { + synchronized (lock) { + schedulerService.shutdownNow(); + listenerSet.clear(); + currentState = State.STOPPED; + } + } + + /** + * @see org.apache.samza.container.host.SystemStatisticsMonitor.Listener#registerListener(Listener) + */ + @Override + public boolean registerListener(Listener listener) { + synchronized (lock) { + if (currentState == State.STOPPED) { + LOG.error("Attempting to register a listener after monitor was stopped."); + return false; + } else { + if (listenerSet.containsKey(listener)) { + LOG.error("Attempting to register an already registered listener"); + return false; + } + listenerSet.put(listener, Boolean.TRUE); + return true; + } + } + } + + // A convenience class that provides named threads + private static class StatisticsMonitorThreadFactory implements ThreadFactory { + private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(); + private static final String PREFIX = "Samza-StatisticsMonitor-Thread-"; + + @Override + public Thread newThread(Runnable runnable) { + return new Thread(runnable, PREFIX + INSTANCE_COUNT.getAndIncrement()); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java new file mode 100644 index 0000000..3dd769f --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.host; + +/** + * A {@link SystemMemoryStatistics} object represents information about the physical process that runs the + * {@link org.apache.samza.container.SamzaContainer}. + */ +public class SystemMemoryStatistics { + + /** + * The physical memory used by the Samza container process (native + on heap) in bytes. + */ + private final long physicalMemoryBytes; + + SystemMemoryStatistics(long physicalMemoryBytes) { + this.physicalMemoryBytes = physicalMemoryBytes; + } + + @Override + public String toString() { + return "SystemStatistics{" + + "containerPhysicalMemory=" + physicalMemoryBytes + + '}'; + } + + public long getPhysicalMemoryBytes() { + return physicalMemoryBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SystemMemoryStatistics that = (SystemMemoryStatistics) o; + + return physicalMemoryBytes == that.physicalMemoryBytes; + + } + + @Override + public int hashCode() { + return (int) (physicalMemoryBytes ^ (physicalMemoryBytes >>> 32)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java new file mode 100644 index 0000000..541c2fb --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.host; + +/** + * An object that returns {@link SystemMemoryStatistics} for the {@link org.apache.samza.container.SamzaContainer}. + */ +public interface SystemStatisticsGetter { + + /** + * Returns the {@link SystemMemoryStatistics} for the current Samza container process. A 'null' value is + * returned if no statistics are available. + * + * @return {@link SystemMemoryStatistics} for the Samza container + */ + SystemMemoryStatistics getSystemMemoryStatistics(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java new file mode 100644 index 0000000..84f4ec3 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.host; + +/** + * An object that monitors per-process system statistics like memory utilization and reports this via a + * {@link SystemStatisticsMonitor.Listener}. + */ +public interface SystemStatisticsMonitor { + /** + * Starts the system statistics monitor. + */ + void start(); + + /** + * Stops the memory usage monitor. Once shutdown is complete listeners will not longer receive + * new samples. A stopped monitor cannot be restarted with {@link #start()}. + */ + void stop(); + + /** + * Registers the specified listener with this monitor. The listener will be called + * when the monitor has a new sample. The update interval is implementation specific. + * + * @param listener the listener to register + * @return {@code true} if the registration was successful and {@code false} if not. Registration + * can fail if the monitor has been stopped or if the listener was already registered. + */ + boolean registerListener(Listener listener); + + /** + * A listener that is notified when the monitor has sampled a new statistic. + * Register this listener with {@link #registerListener(Listener)} to receive updates. + */ + interface Listener { + /** + * Invoked with new samples as they become available. + * + * @param sample the currently sampled statistic. + */ + void onUpdate(SystemMemoryStatistics sample); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 90d7279..a37f353 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -20,9 +20,6 @@ package org.apache.samza.container import java.io.File -import java.lang.Thread.UncaughtExceptionHandler -import java.net.URL -import java.net.UnknownHostException import java.nio.file.Path import java.util import java.util.concurrent.ExecutorService @@ -47,6 +44,7 @@ import org.apache.samza.container.disk.DiskSpaceMonitor import org.apache.samza.container.disk.DiskSpaceMonitor.Listener import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor +import org.apache.samza.container.host.{SystemMemoryStatistics, SystemStatisticsMonitor, StatisticsMonitorImpl} import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel @@ -556,6 +554,16 @@ object SamzaContainer extends Logging { val executor = new ThrottlingExecutor( config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1))) + val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl() + memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener { + override def onUpdate(sample: SystemMemoryStatistics): Unit = { + val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes + val physicalMemoryMb : Double = physicalMemoryBytes / (1024.0 * 1024.0) + logger.debug("Container physical memory utilization (mb): " + physicalMemoryMb) + samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb) + } + }) + val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue) samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes) @@ -606,6 +614,7 @@ object SamzaContainer extends Logging { jvm = jvm, jmxServer = jmxServer, diskSpaceMonitor = diskSpaceMonitor, + hostStatisticsMonitor = memoryStatisticsMonitor, taskThreadPool = taskThreadPool) } } @@ -619,6 +628,7 @@ class SamzaContainer( metrics: SamzaContainerMetrics, jmxServer: JmxServer, diskSpaceMonitor: DiskSpaceMonitor = null, + hostStatisticsMonitor: SystemStatisticsMonitor = null, offsetManager: OffsetManager = new OffsetManager, localityManager: LocalityManager = null, securityManager: SecurityManager = null, @@ -637,6 +647,7 @@ class SamzaContainer( startLocalityManager startStores startDiskSpaceMonitor + startHostStatisticsMonitor startProducers startTask startConsumers @@ -656,6 +667,7 @@ class SamzaContainer( shutdownTask shutdownStores shutdownDiskSpaceMonitor + shutdownHostStatisticsMonitor shutdownProducers shutdownLocalityManager shutdownOffsetManager @@ -673,6 +685,13 @@ class SamzaContainer( } } + def startHostStatisticsMonitor: Unit = { + if (hostStatisticsMonitor != null) { + info("Starting host statistics monitor") + hostStatisticsMonitor.start() + } + } + def startMetrics { info("Registering task instances with metrics.") @@ -867,4 +886,13 @@ class SamzaContainer( diskSpaceMonitor.stop() } } + + def shutdownHostStatisticsMonitor: Unit = { + if (hostStatisticsMonitor != null) { + info("Shutting down host statistics monitor.") + hostStatisticsMonitor.stop() + } + } + + } http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index e3891cf..1e7515e 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -43,10 +43,12 @@ class SamzaContainerMetrics( val diskUsageBytes = newGauge("disk-usage-bytes", 0L) val diskQuotaBytes = newGauge("disk-quota-bytes", Long.MaxValue) val executorWorkFactor = newGauge("executor-work-factor", 1.0) + val physicalMemoryMb = newGauge[Double]("physical-memory-mb", 0.0F) val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]() def addStoreRestorationGauge(taskName: TaskName, storeName: String) { taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) } + } http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java new file mode 100644 index 0000000..b01fdca --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.host; + +import junit.framework.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.fail; + +public class TestStatisticsMonitorImpl { + + @Test + public void testPhysicalMemoryReporting() throws Exception { + final int numSamplesToCollect = 5; + final CountDownLatch latch = new CountDownLatch(numSamplesToCollect); + + final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new PosixCommandBasedStatisticsGetter()); + monitor.start(); + + boolean result = monitor.registerListener(new SystemStatisticsMonitor.Listener() { + + @Override + public void onUpdate(SystemMemoryStatistics sample) { + // assert memory is greater than 10 bytes, as a sanity check + Assert.assertTrue(sample.getPhysicalMemoryBytes() > 10); + latch.countDown(); + } + }); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect)); + } + // assert that the registration for the listener was successful + Assert.assertTrue(result); + monitor.stop(); + + // assert that attempting to register a listener after monitor stop results in failure of registration + boolean registrationFailsAfterStop = monitor.registerListener(new SystemStatisticsMonitor.Listener() { + @Override + public void onUpdate(SystemMemoryStatistics sample) { + } + }); + Assert.assertFalse(registrationFailsAfterStop); + } + + @Test + public void testStopBehavior() throws Exception { + + final int numSamplesToCollect = 5; + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger numCallbacks = new AtomicInteger(0); + + final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new PosixCommandBasedStatisticsGetter()); + + monitor.start(); + monitor.registerListener(new SystemStatisticsMonitor.Listener() { + + @Override + public void onUpdate(SystemMemoryStatistics sample) { + Assert.assertTrue(sample.getPhysicalMemoryBytes() > 10); + if (numCallbacks.incrementAndGet() == numSamplesToCollect) { + //monitor.stop() is invoked from the same thread. So, there's no race between a stop() call and the + //callback invocation for the next sample. + monitor.stop(); + latch.countDown(); + } + } + }); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect)); + } + // Ensure that we only receive as many callbacks + Assert.assertEquals(numCallbacks.get(), numSamplesToCollect); + } + + +}
