DL-107: Added unregistering gauges for distributedlog-core and distributedlog-benchmark
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/c6ea1755 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/c6ea1755 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/c6ea1755 Branch: refs/heads/master Commit: c6ea17555adfc325c0ec995d59c8d14a9dac2ad1 Parents: ab0868c Author: Phillip Su <[email protected]> Authored: Mon Aug 22 17:47:28 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Dec 27 16:49:28 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/benchmark/ReaderWorker.java | 11 ++- .../BKDistributedLogNamespace.java | 4 + .../twitter/distributedlog/ZooKeeperClient.java | 2 + .../readahead/ReadAheadTracker.java | 32 ++++++-- .../readahead/ReadAheadWorker.java | 2 + .../util/LimitedPermitManager.java | 14 +++- .../MonitoredScheduledThreadPoolExecutor.java | 42 +++++++---- .../distributedlog/util/OrderedScheduler.java | 2 + .../util/SimplePermitLimiter.java | 16 +++- .../distributedlog/zk/ZKWatcherManager.java | 21 +++++- .../TestDistributedLogConfiguration.java | 5 +- .../distributedlog/zk/TestZKWatcherManager.java | 3 +- .../service/DistributedLogServiceImpl.java | 77 +++++++++++++------- .../distributedlog/service/MonitorService.java | 35 ++++++--- .../service/stream/StreamImpl.java | 32 +++++--- 15 files changed, 220 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java index 3e82e30..9817d94 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java @@ -106,12 +106,13 @@ public class ReaderWorker implements Worker { final String streamName; DLSN prevDLSN = null; long prevSequenceId = Long.MIN_VALUE; + private static final String gaugeLabel = "sequence_id"; StreamReader(int idx, StatsLogger statsLogger) { this.streamIdx = idx; int streamId = startStreamId + streamIdx; streamName = String.format("%s_%d", streamPrefix, streamId); - statsLogger.scope(streamName).registerGauge("sequence_id", this); + statsLogger.scope(streamName).registerGauge(gaugeLabel, this); } @Override @@ -218,6 +219,10 @@ public class ReaderWorker implements Worker { public synchronized Number getSample() { return prevSequenceId; } + + void unregisterGauge() { + statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this); + } } public ReaderWorker(DistributedLogConfiguration conf, @@ -446,6 +451,10 @@ public class ReaderWorker implements Worker { for (DLZkServerSet serverSet: serverSets) { serverSet.close(); } + // Unregister gauges to prevent GC spirals + for(StreamReader sr : streamReaders) { + sr.unregisterGauge(); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index cae6f6a..0df2f1c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -1079,6 +1079,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { LOG.info("Ledger Allocator stopped."); } + // Unregister gauge to avoid GC spiral + ((LimitedPermitManager)this.logSegmentRollingPermitManager).unregisterGauge(); + ((SimplePermitLimiter)this.writeLimiter).unregisterGauge(); + // Shutdown log segment metadata stores Utils.close(writerSegmentMetadataStore); Utils.close(readerSegmentMetadataStore); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java index 9ea9e37..74cd6cf 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java @@ -395,6 +395,8 @@ public class ZooKeeperClient { } LOG.info("Close zookeeper client {}.", name); closeInternal(); + // unregister gauges to prevent GC spiral + this.watcherManager.unregisterGauges(); closed = true; } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java index 5c0fd4b..39a627f 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java @@ -32,13 +32,22 @@ public class ReadAheadTracker { final AtomicLong ticks = new AtomicLong(0); // which phase that the worker is in. ReadAheadPhase phase; + private final StatsLogger statsLogger; + // Gauges and their labels + private static final String phaseGaugeLabel = "phase"; + private final Gauge<Number> phaseGauge; + private static final String ticksGaugeLabel = "ticks"; + private final Gauge<Number> ticksGauge; + private static final String cachEntriesGaugeLabel = "cache_entries"; + private final Gauge<Number> cacheEntriesGauge; ReadAheadTracker(String streamName, final ReadAheadCache cache, ReadAheadPhase initialPhase, StatsLogger statsLogger) { + this.statsLogger = statsLogger; this.phase = initialPhase; - statsLogger.registerGauge("phase", new Gauge<Number>() { + phaseGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return ReadAheadPhase.SCHEDULE_READAHEAD.getCode(); @@ -48,8 +57,10 @@ public class ReadAheadTracker { public Number getSample() { return phase.getCode(); } - }); - statsLogger.registerGauge("ticks", new Gauge<Number>() { + }; + this.statsLogger.registerGauge(phaseGaugeLabel, phaseGauge); + + ticksGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -59,8 +70,10 @@ public class ReadAheadTracker { public Number getSample() { return ticks.get(); } - }); - statsLogger.registerGauge("cache_entries", new Gauge<Number>() { + }; + this.statsLogger.registerGauge(ticksGaugeLabel, ticksGauge); + + cacheEntriesGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -70,7 +83,8 @@ public class ReadAheadTracker { public Number getSample() { return cache.getNumCachedRecords(); } - }); + }; + this.statsLogger.registerGauge(cachEntriesGaugeLabel, cacheEntriesGauge); } ReadAheadPhase getPhase() { @@ -81,4 +95,10 @@ public class ReadAheadTracker { this.ticks.incrementAndGet(); this.phase = readAheadPhase; } + + public void unregisterGauge() { + this.statsLogger.unregisterGauge(phaseGaugeLabel, phaseGauge); + this.statsLogger.unregisterGauge(ticksGaugeLabel, ticksGauge); + this.statsLogger.unregisterGauge(cachEntriesGaugeLabel, cacheEntriesGauge); + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java index 9ba8ca4..83a34a3 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java @@ -356,6 +356,8 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea public Future<Void> asyncClose() { LOG.info("Stopping Readahead worker for {}", fullyQualifiedName); running = false; + // Unregister associated gauages to prevent GC spiral + this.tracker.unregisterGauge(); // Aside from unfortunate naming of variables, this allows // the currently active long poll to be interrupted and completed http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java index 7357410..4b917b2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java @@ -68,6 +68,8 @@ public class LimitedPermitManager implements PermitManager, Runnable, Watcher { final TimeUnit timeUnit; final ScheduledExecutorService executorService; final AtomicInteger epoch = new AtomicInteger(0); + private StatsLogger statsLogger = null; + private Gauge<Number> outstandingGauge = null; public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit, ScheduledExecutorService executorService) { @@ -84,7 +86,8 @@ public class LimitedPermitManager implements PermitManager, Runnable, Watcher { this.period = period; this.timeUnit = timeUnit; this.executorService = executorService; - statsLogger.scope("permits").registerGauge("outstanding", new Gauge<Number>() { + this.statsLogger = statsLogger; + this.outstandingGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -94,7 +97,8 @@ public class LimitedPermitManager implements PermitManager, Runnable, Watcher { public Number getSample() { return null == semaphore ? 0 : concurrency - semaphore.availablePermits(); } - }); + }; + this.statsLogger.scope("permits").registerGauge("outstanding", this.outstandingGauge); } @Override @@ -176,4 +180,10 @@ public class LimitedPermitManager implements PermitManager, Runnable, Watcher { } } } + + public void unregisterGauge() { + if(this.statsLogger != null && this.outstandingGauge != null) { + this.statsLogger.scope("permits").unregisterGauge("outstanding", this.outstandingGauge); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java index 2bc7f82..512a456 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java @@ -110,6 +110,14 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe protected final boolean traceTaskExecution; protected final OpStatsLogger taskExecutionStats; protected final OpStatsLogger taskPendingStats; + protected final StatsLogger statsLogger; + // Gauges and their labels + private static final String pendingTasksGaugeLabel = "pending_tasks"; + private final Gauge<Number> pendingTasksGauge; + private static final String completedTasksGaugeLabel = "completed_tasks"; + protected final Gauge<Number> completedTasksGauge; + private static final String totalTasksGaugeLabel = "total_tasks"; + protected final Gauge<Number> totalTasksGauge; public MonitoredScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, @@ -117,11 +125,10 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe boolean traceTaskExecution) { super(corePoolSize, threadFactory); this.traceTaskExecution = traceTaskExecution; - - this.taskPendingStats = statsLogger.getOpStatsLogger("task_pending_time"); - this.taskExecutionStats = statsLogger.getOpStatsLogger("task_execution_time"); - // outstanding tasks - statsLogger.registerGauge("pending_tasks", new Gauge<Number>() { + this.statsLogger = statsLogger; + this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time"); + this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time"); + this.pendingTasksGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -131,9 +138,8 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe public Number getSample() { return getQueue().size(); } - }); - // completed tasks - statsLogger.registerGauge("completed_tasks", new Gauge<Number>() { + }; + this.completedTasksGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -143,9 +149,8 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe public Number getSample() { return getCompletedTaskCount(); } - }); - // total tasks - statsLogger.registerGauge("total_tasks", new Gauge<Number>() { + }; + this.totalTasksGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -155,7 +160,14 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe public Number getSample() { return getTaskCount(); } - }); + }; + + // outstanding tasks + this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge); + // completed tasks + this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge); + // total tasks + this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge); } private Runnable timedRunnable(Runnable r) { @@ -236,6 +248,10 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe return null; } - + void unregisterGauges() { + this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge); + this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge); + this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge); + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java index d3385a6..9f34902 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java @@ -310,6 +310,8 @@ public class OrderedScheduler implements ScheduledExecutorService { @Override public void shutdown() { for (MonitoredScheduledThreadPoolExecutor executor : executors) { + // Unregister gauges + executor.unregisterGauges(); executor.shutdown(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java index 298a5ff..2482ece 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java @@ -47,6 +47,9 @@ public class SimplePermitLimiter implements PermitLimiter { final int permitsMax; final boolean darkmode; final Feature disableWriteLimitFeature; + private StatsLogger statsLogger = null; + private Gauge<Number> permitsGauge = null; + private String permitsGaugeLabel = ""; public SimplePermitLimiter(boolean darkmode, int permitsMax, StatsLogger statsLogger, boolean singleton, Feature disableWriteLimitFeature) { @@ -57,7 +60,8 @@ public class SimplePermitLimiter implements PermitLimiter { // stats if (singleton) { - statsLogger.registerGauge("num_permits", new Gauge<Number>() { + this.statsLogger = statsLogger; + this.permitsGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -66,7 +70,9 @@ public class SimplePermitLimiter implements PermitLimiter { public Number getSample() { return permits.get(); } - }); + }; + this.permitsGaugeLabel = "permits"; + statsLogger.registerGauge(permitsGaugeLabel, permitsGauge); } acquireFailureCounter = statsLogger.getCounter("acquireFailure"); permitsMetric = statsLogger.getOpStatsLogger("permits"); @@ -97,4 +103,10 @@ public class SimplePermitLimiter implements PermitLimiter { public int getPermits() { return permits.get(); } + + public void unregisterGauge() { + if (this.statsLogger != null && this.permitsGauge != null) { + this.statsLogger.unregisterGauge(permitsGaugeLabel, permitsGauge); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java index 03b2841..8ef33ea 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java @@ -78,6 +78,11 @@ public class ZKWatcherManager implements Watcher { private final String name; private final ZooKeeperClient zkc; private final StatsLogger statsLogger; + // Gauges and their labels + private final Gauge<Number> totalWatchesGauge; + private static final String totalWatchesGauageLabel = "total_watches"; + private final Gauge<Number> numChildWatchesGauge; + private static final String numChildWatchesGauageLabel = "num_child_watches"; protected final ConcurrentMap<String, Set<Watcher>> childWatches; protected final AtomicInteger allWatchesGauge; @@ -94,7 +99,7 @@ public class ZKWatcherManager implements Watcher { this.allWatchesGauge = new AtomicInteger(0); // stats - this.statsLogger.registerGauge("total_watches", new Gauge<Number>() { + totalWatchesGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -104,9 +109,10 @@ public class ZKWatcherManager implements Watcher { public Number getSample() { return allWatchesGauge.get(); } - }); + }; + this.statsLogger.registerGauge(totalWatchesGauageLabel, totalWatchesGauge); - this.statsLogger.registerGauge("num_child_watches", new Gauge<Number>() { + numChildWatchesGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -116,7 +122,9 @@ public class ZKWatcherManager implements Watcher { public Number getSample() { return childWatches.size(); } - }); + }; + + this.statsLogger.registerGauge(numChildWatchesGauageLabel, numChildWatchesGauge); } public Watcher registerChildWatcher(String path, Watcher watcher) { @@ -178,6 +186,11 @@ public class ZKWatcherManager implements Watcher { } } + public void unregisterGauges() { + this.statsLogger.unregisterGauge(totalWatchesGauageLabel, totalWatchesGauge); + this.statsLogger.unregisterGauge(numChildWatchesGauageLabel, numChildWatchesGauge); + } + @Override public void process(WatchedEvent event) { switch (event.getType()) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java index 8dcb053..19b9863 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java @@ -109,14 +109,15 @@ public class TestDistributedLogConfiguration { DistributedLogConfiguration conf = new DistributedLogConfiguration(); // validate default configuration conf.validate(); - // test invalid timeout, should throw exception + // test equal, should not throw exception conf.setReadLACLongPollTimeout(conf.getBKClientReadTimeout() * 1000); try { conf.validate(); } catch (IllegalArgumentException e){ exceptionThrown=true; } - assertTrue(exceptionThrown); + assertFalse(exceptionThrown); + // test invalid case, should throw exception exceptionThrown=false; conf.setReadLACLongPollTimeout(conf.getBKClientReadTimeout() * 1000 * 2); try { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java index 3ad181d..b702d4c 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java @@ -73,7 +73,8 @@ public class TestZKWatcherManager { // unregister watcher watcherManager.unregisterChildWatcher(path, watcher, true); - + // unregister gauges + watcherManager.unregisterGauges(); assertEquals(0, watcherManager.childWatches.size()); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index 74da34f..677ade5 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -150,6 +150,11 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters = new ConcurrentHashMap<StatusCode, Counter>(); private final Counter statusCodeTotal; + private final Gauge<Number> proxyStatusGauge; + private final Gauge<Number> movingAvgRpsGauge; + private final Gauge<Number> movingAvgBpsGauge; + private final Gauge<Number> streamAcquiredGauge; + private final Gauge<Number> streamCachedGauge; DistributedLogServiceImpl(ServerConfiguration serverConf, DistributedLogConfiguration dlConf, @@ -260,9 +265,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI // Stats this.statsLogger = statsLogger; - // Stats on server - // Gauge for server status/health - statsLogger.registerGauge("proxy_status", new Gauge<Number>() { + // Gauges for server status/health + this.proxyStatusGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -271,11 +275,10 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI @Override public Number getSample() { return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable() ? - 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2)); + 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2)); } - }); - // Global moving average rps - statsLogger.registerGauge("moving_avg_rps", new Gauge<Number>() { + }; + this.movingAvgRpsGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -285,9 +288,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI public Number getSample() { return windowedRps.get(); } - }); - // Global moving average bps - statsLogger.registerGauge("moving_avg_bps", new Gauge<Number>() { + }; + this.movingAvgBpsGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -297,19 +299,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI public Number getSample() { return windowedBps.get(); } - }); - - // Stats on requests - this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending"); - this.writePendingStat = streamOpStats.requestPendingCounter("writePending"); - this.redirects = streamOpStats.requestCounter("redirect"); - this.statusCodeStatLogger = streamOpStats.requestScope("statuscode"); - this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count"); - this.receivedRecordCounter = streamOpStats.recordsCounter("received"); - - // Stats on streams - StatsLogger streamsStatsLogger = statsLogger.scope("streams"); - streamsStatsLogger.registerGauge("acquired", new Gauge<Number>() { + }; + // Gauges for streams + this.streamAcquiredGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -319,8 +311,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI public Number getSample() { return streamManager.numAcquired(); } - }); - streamsStatsLogger.registerGauge("cached", new Gauge<Number>() { + }; + this.streamCachedGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -330,7 +322,26 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI public Number getSample() { return streamManager.numCached(); } - }); + }; + + // Stats on server + statsLogger.registerGauge("proxy_status", proxyStatusGauge); + // Global moving average rps + statsLogger.registerGauge("moving_avg_rps", movingAvgRpsGauge); + // Global moving average bps + statsLogger.registerGauge("moving_avg_bps", movingAvgBpsGauge); + // Stats on requests + this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending"); + this.writePendingStat = streamOpStats.requestPendingCounter("writePending"); + this.redirects = streamOpStats.requestCounter("redirect"); + this.statusCodeStatLogger = streamOpStats.requestScope("statuscode"); + this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count"); + this.receivedRecordCounter = streamOpStats.recordsCounter("received"); + + // Stats for streams + StatsLogger streamsStatsLogger = statsLogger.scope("streams"); + streamsStatsLogger.registerGauge("acquired", this.streamAcquiredGauge); + streamsStatsLogger.registerGauge("cached", this.streamCachedGauge); // Setup complete logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}, dlsn version {}.", @@ -669,6 +680,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI // Stop the timer. timer.stop(); + // clean up gauge + unregisterGauge(); + // shutdown the executor after requesting closing streams. SchedulerUtils.shutdownScheduler(scheduler, 60, TimeUnit.SECONDS); } catch (Exception ex) { @@ -704,6 +718,17 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI } } + /** + * clean up the gauge before we close to help GC + */ + private void unregisterGauge(){ + this.statsLogger.unregisterGauge("proxy_status",this.proxyStatusGauge); + this.statsLogger.unregisterGauge("moving_avg_rps",this.movingAvgRpsGauge); + this.statsLogger.unregisterGauge("moving_avg_bps",this.movingAvgBpsGauge); + this.statsLogger.unregisterGauge("acquired",this.streamAcquiredGauge); + this.statsLogger.unregisterGauge("cached",this.streamCachedGauge); + } + @VisibleForTesting Stream newStream(String name) throws IOException { return streamManager.getOrCreateStream(name, false); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java index 2c6cccc..7edb778 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java @@ -41,6 +41,8 @@ import com.twitter.finagle.stats.StatsReceiver; import com.twitter.finagle.thrift.ClientId$; import com.twitter.util.Duration; import com.twitter.util.FutureEventListener; + +import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.lang.StringUtils; @@ -107,6 +109,7 @@ public class MonitorService implements NamespaceListener { private final StatsReceiver monitorReceiver; private final Stat successStat; private final Stat failureStat; + private final Gauge<Number> numOfStreamsGauge; // Hash Function private final HashFunction hashFunction = Hashing.md5(); @@ -254,6 +257,17 @@ public class MonitorService implements NamespaceListener { this.successStat = monitorReceiver.stat0("success"); this.failureStat = monitorReceiver.stat0("failure"); this.statsProvider = statsProvider; + this.numOfStreamsGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return knownStreams.size(); + } + }; } public void runServer() throws IllegalArgumentException, IOException { @@ -394,17 +408,7 @@ public class MonitorService implements NamespaceListener { void runMonitor(DistributedLogConfiguration conf, URI dlUri) throws IOException { // stats - statsProvider.getStatsLogger("monitor").registerGauge("num_streams", new org.apache.bookkeeper.stats.Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return knownStreams.size(); - } - }); + statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge); logger.info("Construct dl namespace @ {}", dlUri); dlNamespace = DistributedLogNamespaceBuilder.newBuilder() .conf(conf) @@ -442,6 +446,8 @@ public class MonitorService implements NamespaceListener { logger.error("Interrupted on waiting shutting down monitor executor service : ", e); } if (null != statsProvider) { + // clean up the gauges + unregisterGauge(); statsProvider.stop(); } keepAliveLatch.countDown(); @@ -452,4 +458,11 @@ public class MonitorService implements NamespaceListener { keepAliveLatch.await(); } + /** + * clean up the gauge before we close to help GC + */ + private void unregisterGauge(){ + statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge); + } + } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java index e74ebbe..9f049c8 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java @@ -150,6 +150,7 @@ public class StreamImpl implements Stream { private final StatsLogger exceptionStatLogger; private final ConcurrentHashMap<String, Counter> exceptionCounters = new ConcurrentHashMap<String, Counter>(); + private final Gauge<Number> streamStatusGauge; // Since we may create and discard streams at initialization if there's a race, // must not do any expensive initialization here (particularly any locking or @@ -206,6 +207,17 @@ public class StreamImpl implements Stream { this.exceptionStatLogger = streamOpStats.requestScope("exceptions"); this.writerCloseStatLogger = streamsStatsLogger.getOpStatsLogger("writer_close"); this.writerCloseTimeoutCounter = streamsStatsLogger.getCounter("writer_close_timeouts"); + // Gauges + this.streamStatusGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return StreamStatus.UNINITIALIZED.getCode(); + } + @Override + public Number getSample() { + return status.getCode(); + } + }; } @Override @@ -242,16 +254,7 @@ public class StreamImpl implements Stream { // Better to avoid registering the gauge multiple times, so do this in init // which only gets called once. - streamLogger.registerGauge("stream_status", new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return StreamStatus.UNINITIALIZED.getCode(); - } - @Override - public Number getSample() { - return status.getCode(); - } - }); + streamLogger.registerGauge("stream_status", this.streamStatusGauge); // Signal initialization is complete, should be last in this method. status = StreamStatus.INITIALIZING; @@ -761,6 +764,7 @@ public class StreamImpl implements Stream { // after the async writer is closed. so we could clear up the lock before redirect // them. close(abort); + unregisterGauge(); if (uncache) { final long probationTimeoutMs; if (null != owner) { @@ -827,6 +831,7 @@ public class StreamImpl implements Stream { } else { closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs); } + FutureUtils.stats( closeWriterFuture, writerCloseStatLogger, @@ -869,6 +874,13 @@ public class StreamImpl implements Stream { logger.info("Closed stream {}.", name); } + /** + * clean up the gauge to help GC + */ + private void unregisterGauge(){ + streamLogger.unregisterGauge("stream_status", this.streamStatusGauge); + } + // Test-only apis @VisibleForTesting
