Repository: samza Updated Branches: refs/heads/master 57aae364b -> dc67d1560
SAMZA-924: Add disk space monitoring Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dc67d156 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dc67d156 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dc67d156 Branch: refs/heads/master Commit: dc67d1560c0ae76fe75e6b767b857ceb492cb7a8 Parents: 57aae36 Author: Chris Pettitt <[email protected]> Authored: Sat Apr 9 21:19:38 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Sat Apr 9 21:19:38 2016 -0700 ---------------------------------------------------------------------- .../samza/container/disk/DiskSpaceMonitor.java | 59 +++++ .../disk/PollingScanDiskSpaceMonitor.java | 199 +++++++++++++++ .../apache/samza/container/SamzaContainer.scala | 53 +++- .../samza/container/SamzaContainerMetrics.scala | 13 +- .../disk/TestPollingScanDiskSpaceMonitor.java | 252 +++++++++++++++++++ 5 files changed, 569 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java new file mode 100644 index 0000000..2a565be --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.disk; + +/** + * An object that monitors the amount of disk space used and reports this usage via a + * {@link DiskSpaceMonitor.Listener}. + */ +public interface DiskSpaceMonitor { + /** + * Starts the disk space monitor. + */ + void start(); + + /** + * Stops the disk space monitor. Once shutdown is complete listeners will not longer receive + * new samples. A stopped monitor cannot be restarted with {@link #start()}. + */ + void stop(); + + /** + * Registers the specified listener with this monitor. The listener will be called + * when the monitor has a new sample. The update interval is implementation specific. + * + * @param listener the listener to register + * @return {@code true} if the registration was successful and {@code false} if not. Registration + * can fail if the monitor has been stopped or if the listener was already registered. + */ + boolean registerListener(Listener listener); + + /** + * A listener that is notified when the disk space manager has sampled a new disk usage value. + * Register this listener with {@link #registerListener(Listener)} to receive updates. + */ + interface Listener { + /** + * Invoked with new samples as they become available. + * + * @param diskUsageSample the measured disk usage size in bytes. + */ + void onUpdate(long diskUsageSample); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java new file mode 100644 index 0000000..50c8500 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.disk; + +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An implementation of {@link DiskSpaceMonitor} that polls for disk usage based on a specified + * polling interval. + * <p> + * This class is thread-safe. + */ +public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor { + private enum State { INIT, RUNNING, STOPPED } + + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl(); + // Note: we use this as a set where the value is always Boolean.TRUE. + private final ConcurrentMap<Listener, Boolean> listenerSet = new ConcurrentHashMap<>(); + + // Used to guard write access to state and listenerSet. + private final Object lock = new Object(); + + private final ScheduledExecutorService schedulerService = + Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + private final Set<Path> watchPaths; + private final long pollingIntervalMillis; + + private State state = State.INIT; + + /** + * Returns the total size in bytes used by the specified paths. This function guarantees that it + * will not double count overlapping portions of the path set. For example, with a trivial + * overlap of /A and /A, it will count /A only once. It also handles other types of overlaps + * similarly, such as counting /A/B only once given the paths /A and /A/B. + * <p> + * This function is exposed as package private to simplify testing various cases without involving + * an executor. Alternatively this could have been pulled out to a utility class, but it would + * unnecessarily pollute the global namespace. + */ + static long getSpaceUsed(Set<Path> paths) { + ArrayDeque<Path> pathStack = new ArrayDeque<>(); + + for (Path path : paths) { + pathStack.push(path); + } + + // Track the directories we've visited to ensure we're not double counting. It would be + // preferable to resolve overlap once at startup, but the problem is that the filesystem may + // change over time and, in fact, at startup I found that the rocks DB store directory was not + // created by the time the disk space monitor was started. + Set<Path> visited = new HashSet<>(); + long totalBytes = 0; + while (!pathStack.isEmpty()) { + try { + // We need to resolve to the real path to ensure that we don't inadvertently double count + // due to different paths to the same directory (e.g. /A and /A/../A). + Path current = pathStack.pop().toRealPath(); + + if (visited.contains(current)) { + continue; + } + visited.add(current); + + BasicFileAttributes currentAttrs = Files.readAttributes(current, + BasicFileAttributes.class); + if (currentAttrs.isDirectory()) { + try (DirectoryStream<Path> directoryListing = Files.newDirectoryStream(current)) { + for (Path child : directoryListing) { + pathStack.push(child); + } + } + } else if (currentAttrs.isRegularFile()) { + totalBytes += currentAttrs.size(); + } + } catch (IOException e) { + // If we can't stat the file, just ignore it. This can happen, for example, if we scan + // a directory, but by the time we get to stat'ing the file it has been deleted (e.g. + // due to compaction, rotation, etc.). + } + } + + return totalBytes; + } + + /** + * Creates a new disk space monitor that uses a periodic polling mechanism. + * + * @param watchPaths the set of paths to watch + * @param pollingIntervalMillis the polling interval in milliseconds + */ + public PollingScanDiskSpaceMonitor(Set<Path> watchPaths, long pollingIntervalMillis) { + this.watchPaths = Collections.unmodifiableSet(new HashSet<>(watchPaths)); + this.pollingIntervalMillis = pollingIntervalMillis; + } + + @Override + public void start() { + synchronized (lock) { + switch (state) { + case INIT: + schedulerService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + updateSample(); + } + }, pollingIntervalMillis, pollingIntervalMillis, TimeUnit.MILLISECONDS); + + state = State.RUNNING; + break; + + case RUNNING: + // start is idempotent + return; + + case STOPPED: + throw new IllegalStateException("PollingScanDiskSpaceMonitor was stopped and cannot be restarted."); + } + } + } + + @Override + public void stop() { + synchronized (lock) { + // We could also wait for full termination of the scheduler service, but it is overkill for + // our use case. + schedulerService.shutdownNow(); + + listenerSet.clear(); + state = State.STOPPED; + } + } + + @Override + public boolean registerListener(Listener listener) { + synchronized (lock) { + if (state != State.STOPPED) { + return listenerSet.putIfAbsent(listener, Boolean.TRUE) == Boolean.TRUE; + } + } + return false; + } + + /** + * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout + * and false otherwise. + * <p> + * This is currently exposed at the package private level for tests only. + */ + boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return schedulerService.awaitTermination(timeout, unit); + } + + private void updateSample() { + long totalBytes = getSpaceUsed(watchPaths); + for (Listener listener : listenerSet.keySet()) { + listener.onUpdate(totalBytes); + } + } + + private static class ThreadFactoryImpl implements ThreadFactory { + private static final String PREFIX = "Samza-" + PollingScanDiskSpaceMonitor.class.getSimpleName() + "-"; + private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(); + + public Thread newThread(Runnable runnable) { + return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement()); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index bcbc90a..5462208 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -20,6 +20,8 @@ package org.apache.samza.container import java.io.File +import java.nio.file.Path +import java.util import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.MetricsConfig.Config2Metrics @@ -29,6 +31,8 @@ import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.container.disk.DiskSpaceMonitor.Listener +import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, DiskSpaceMonitor} import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory import org.apache.samza.metrics.JmxServer import org.apache.samza.metrics.JvmMetrics @@ -391,6 +395,14 @@ object SamzaContainer extends Logging { .toSet val containerContext = new SamzaContainerContext(containerId, config, taskNames) + // TODO not sure how we should make this config based, or not. Kind of + // strange, since it has some dynamic directories when used with YARN. + val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state") + info("Got default storage engine base directory: %s" format defaultStoreBaseDir) + + val storeWatchPaths = new util.HashSet[Path]() + storeWatchPaths.add(defaultStoreBaseDir.toPath) + val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { debug("Setting up task instance: %s" format taskModel) @@ -414,11 +426,6 @@ object SamzaContainer extends Logging { info("Got store consumers: %s" format storeConsumers) - // TODO not sure how we should make this config based, or not. Kind of - // strange, since it has some dynamic directories when used with YARN. - val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state") - info("Got default storage engine base directory: %s" format defaultStoreBaseDir) - var loggedStorageBaseDir: File = null if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) { val jobNameAndId = Util.getJobNameAndId(config) @@ -430,6 +437,8 @@ object SamzaContainer extends Logging { loggedStorageBaseDir = defaultStoreBaseDir } + storeWatchPaths.add(loggedStorageBaseDir.toPath) + info("Got base directory for logged data stores: %s" format loggedStorageBaseDir) val taskStores = storageEngineFactories @@ -504,6 +513,20 @@ object SamzaContainer extends Logging { (taskName, taskInstance) }).toMap + val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0) + var diskSpaceMonitor: DiskSpaceMonitor = null + if (diskPollMillis != 0) { + val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge() + + diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, diskPollMillis) + diskSpaceMonitor.registerListener(new Listener { + override def onUpdate(diskUsageSample: Long): Unit = + diskUsage.set(diskUsageSample) + }) + + info("Initialized disk space monitor watch paths to: %s" format storeWatchPaths) + } + val runLoop = new RunLoop( taskInstances = taskInstances, consumerMultiplexer = consumerMultiplexer, @@ -525,7 +548,8 @@ object SamzaContainer extends Logging { metrics = samzaContainerMetrics, reporters = reporters, jvm = jvm, - jmxServer = jmxServer) + jmxServer = jmxServer, + diskSpaceMonitor = diskSpaceMonitor) } } @@ -537,6 +561,7 @@ class SamzaContainer( producerMultiplexer: SystemProducers, metrics: SamzaContainerMetrics, jmxServer: JmxServer, + diskSpaceMonitor: DiskSpaceMonitor = null, offsetManager: OffsetManager = new OffsetManager, localityManager: LocalityManager = null, reporters: Map[String, MetricsReporter] = Map(), @@ -550,6 +575,7 @@ class SamzaContainer( startOffsetManager startLocalityManager startStores + startDiskSpaceMonitor startProducers startTask startConsumers @@ -566,6 +592,7 @@ class SamzaContainer( shutdownConsumers shutdownTask shutdownStores + shutdownDiskSpaceMonitor shutdownProducers shutdownLocalityManager shutdownOffsetManager @@ -575,6 +602,13 @@ class SamzaContainer( } } + def startDiskSpaceMonitor: Unit = { + if (diskSpaceMonitor != null) { + info("Starting disk space monitor") + diskSpaceMonitor.start() + } + } + def startMetrics { info("Registering task instances with metrics.") @@ -713,4 +747,11 @@ class SamzaContainer( jvm.stop } } + + def shutdownDiskSpaceMonitor: Unit = { + if (diskSpaceMonitor != null) { + info("Shutting down disk space monitor.") + diskSpaceMonitor.stop() + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index 6fae650..9e6641c 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -37,11 +37,22 @@ class SamzaContainerMetrics( val windowNs = newTimer("window-ns") val processNs = newTimer("process-ns") val commitNs = newTimer("commit-ns") - val utilization = newGauge("event-loop-utilization", 0.0F); + val utilization = newGauge("event-loop-utilization", 0.0F) val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]() def addStoreRestorationGauge(taskName: TaskName, storeName: String) { taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) } + + /** + * Creates or gets the disk usage gauge for the container and returns it. + */ + def createOrGetDiskUsageGauge(): Gauge[Long] = { + // Despite the name, this function appears to be idempotent. A more defensive approach would be + // to ensure idempotency at this level, e.g. via a CAS operation. Unfortunately, it appears that + // the mechanism to register a Gauge is hidden. An alternative would be to use a mutex to + // set ensure the gauge is created once. + newGauge("disk-usage", 0L) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java b/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java new file mode 100644 index 0000000..2576437 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.disk; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; + +public class TestPollingScanDiskSpaceMonitor { + private Path testDir; + private ArrayDeque<Path> filesToDelete; + + @Before + public void setUp() throws IOException { + filesToDelete = new ArrayDeque<>(); + testDir = Files.createTempDirectory("samza-polling-scan-disk-monitor-test"); + filesToDelete.push(testDir); + } + + @After + public void tearDown() throws IOException { + while (!filesToDelete.isEmpty()) { + Path path = filesToDelete.pop(); + + try { + Files.delete(path); + } catch (IOException e) { + // Continue with best effort, this is just test code. + } + } + } + + @Test + public void testSizeOfSingleFile() throws IOException { + writeFile(testDir, "single-file", new byte[1024]); + assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir))); + } + + @Test + public void testSizeOfDisjointDirectoriesFromRoot() throws IOException { + Path child1Dir = createDirectory(testDir, "child1"); + writeFile(child1Dir, "foo", new byte[1024]); + + Path child2Dir = createDirectory(testDir, "child2"); + writeFile(child2Dir, "bar", new byte[4096]); + + assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir))); + } + + @Test + public void testSizeOfDisjointDirectoriesFromChildDirs() throws IOException { + Path child1Dir = createDirectory(testDir, "child1"); + writeFile(child1Dir, "foo", new byte[1024]); + + Path child2Dir = createDirectory(testDir, "child2"); + writeFile(child2Dir, "bar", new byte[4096]); + + Set<Path> pathSet = new HashSet<>(Arrays.asList(child1Dir, child2Dir)); + assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet)); + } + + @Test + public void testSizeOfOverlappedDirectories() throws IOException { + Path childDir = createDirectory(testDir, "child"); + writeFile(childDir, "foo", new byte[1024]); + + Path grandchildDir = createDirectory(childDir, "grandchild"); + writeFile(grandchildDir, "bar", new byte[4096]); + + // If getSpaceUsed were not handling overlapping directories we would expect to count + // grandchild twice, which would give us the erroneous total `1024 + 4096 * 2`. + Set<Path> pathSet = new HashSet<>(Arrays.asList(childDir, grandchildDir)); + assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet)); + } + + @Test + public void testSizeOfDirectoryAccessedWithDifferentPaths() throws IOException { + Path childDir = createDirectory(testDir, "child1"); + writeFile(childDir, "foo", new byte[1024]); + + Path otherPath = childDir.resolve("..").resolve(childDir.getFileName()); + Set<Path> pathSet = new HashSet<>(Arrays.asList(childDir, otherPath)); + + // This test actually verifies that !childDir.equals(otherPath) and ensures that we properly + // handle duplicate paths to the same directory. + assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet)); + } + + @Test + public void testSizeOfAlreadyCountedSymlinkedFile() throws IOException { + writeFile(testDir, "regular-file", new byte[1024]); + Files.createSymbolicLink(testDir.resolve("symlink"), testDir.resolve("regular-file")); + + // We should not double count a symlinked file + assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir))); + } + + @Test + public void testSizeOfUncountedSymlinkedFile() throws IOException { + Path childDir = createDirectory(testDir, "child"); + writeFile(testDir, "regular-file", new byte[1024]); + Files.createSymbolicLink(childDir.resolve("symlink"), testDir.resolve("regular-file")); + + // We should count the space of the symlinked file even thought it is outside of the root + // from which we started. + assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir))); + } + + @Test + public void testFollowSymlinkedDirectory() throws IOException { + Path childDir = createDirectory(testDir, "child"); + writeFile(childDir, "regular-file", new byte[1024]); + + Path dirSymlink = testDir.resolve("symlink"); + Files.createSymbolicLink(dirSymlink, childDir); + + // We should follow the symlink and read the symlinked file + assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(dirSymlink))); + } + + @Test + public void testHandleCyclicalSymlink() throws IOException { + Path childDir = createDirectory(testDir, "child"); + writeFile(childDir, "regular-file", new byte[1024]); + Files.createSymbolicLink(childDir.resolve("symlink"), testDir); + + // We have testDir/childDir/symlink -> testDir, which effectively creates a cycle. + assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(childDir))); + } + + @Test + public void testMissingDirectory() throws IOException { + Set<Path> pathSet = Collections.singleton(testDir.resolve("non-existant-child")); + assertEquals(0, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet)); + } + + @Test + public void testGetSamplesFromListener() throws IOException, InterruptedException { + writeFile(testDir, "single-file", new byte[1024]); + + final AtomicLong sample = new AtomicLong(); + final CountDownLatch sampleReady = new CountDownLatch(1); + final PollingScanDiskSpaceMonitor monitor = new PollingScanDiskSpaceMonitor(Collections.singleton(testDir), 50); + monitor.registerListener(new DiskSpaceMonitor.Listener() { + @Override + public void onUpdate(long diskUsageSample) { + sample.set(diskUsageSample); + sampleReady.countDown(); + } + }); + + monitor.start(); + + try { + if (!sampleReady.await(5, TimeUnit.SECONDS)) { + fail("Timed out waiting for listener to be provide disk usage sample"); + } + + assertEquals(1024, sample.get()); + } finally { + monitor.stop(); + } + } + + @Test + public void testStartStop() throws IOException, InterruptedException { + writeFile(testDir, "single-file", new byte[1024]); + + final int numSamplesToCollect = 5; + + final AtomicInteger numCallbackInvocations = new AtomicInteger(); + final CountDownLatch doneLatch = new CountDownLatch(1); + final PollingScanDiskSpaceMonitor monitor = new PollingScanDiskSpaceMonitor(Collections.singleton(testDir), 50); + monitor.registerListener(new DiskSpaceMonitor.Listener() { + @Override + public void onUpdate(long diskUsageSample) { + if (numCallbackInvocations.incrementAndGet() == numSamplesToCollect) { + monitor.stop(); + doneLatch.countDown(); + } + } + }); + + monitor.start(); + + try { + if (!doneLatch.await(5, TimeUnit.SECONDS)) { + fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect)); + } + if (!monitor.awaitTermination(5, TimeUnit.SECONDS)) { + fail("Timed out waiting for monitor to terminate"); + } + + // A number larger than numSamplesToCollect indicates that we got a callback after we stopped + // the monitor. We should safely be able to assert this will not happen as we stopped the + // monitor in the the thread on which it is delivering notifications. + assertEquals(numSamplesToCollect, numCallbackInvocations.get()); + } finally { + monitor.stop(); + } + } + + private Path createDirectory(Path parentDir, String name) throws IOException { + name = name + "-"; + Path path = Files.createTempDirectory(parentDir, name); + filesToDelete.push(path); + return path; + } + + private Path createFile(Path parentDir, String name) throws IOException { + name = name + "-"; + Path path = Files.createTempFile(parentDir, name, null); + filesToDelete.push(path); + return path; + } + + private void writeFile(Path parentDir, String name, byte[] contents) throws IOException { + Path path = createFile(parentDir, name); + Files.write(path, contents); + } +}
