This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new fe7bed3 [STATS] [DOC] Add @StatsDoc annotation for db ledger storage
stats
fe7bed3 is described below
commit fe7bed386af725e44621b534a6c516b743a15716
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Dec 13 22:49:02 2018 +0800
[STATS] [DOC] Add @StatsDoc annotation for db ledger storage stats
Descriptions of the changes in this PR:
*Motivation*
As part of [BP-36](https://github.com/apache/bookkeeper/issues/1785), this
PR is to document the db ledger storage stats.
*Changes*
- convert db ledger storage stats to use StatsDoc for documenting metrics
Master Issue: #1785
Reviewers: Jia Zhai <None>, Enrico Olivelli <[email protected]>
This closes #1874 from sijie/ldb_stats
---
.../bookie/storage/ldb/DbLedgerStorage.java | 57 +-----
.../bookie/storage/ldb/DbLedgerStorageStats.java | 204 +++++++++++++++++++++
.../bookie/storage/ldb/EntryLocationIndex.java | 22 +--
.../storage/ldb/EntryLocationIndexStats.java | 66 +++++++
.../bookie/storage/ldb/LedgerMetadataIndex.java | 22 +--
.../storage/ldb/LedgerMetadataIndexStats.java | 66 +++++++
.../ldb/SingleDirectoryDbLedgerStorage.java | 129 ++++---------
7 files changed, 385 insertions(+), 181 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 0706565..ccabaed 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -58,7 +58,6 @@ import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
@@ -87,6 +86,7 @@ public class DbLedgerStorage implements LedgerStorage {
// Keep 1 single Bookie GC thread so the the compactions from multiple
individual directories are serialized
private ScheduledExecutorService gcExecutor;
+ private DbLedgerStorageStats stats;
@Override
public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager, LedgerDirsManager ledgerDirsManager,
@@ -123,7 +123,13 @@ public class DbLedgerStorage implements LedgerStorage {
perDirectoryReadCacheSize));
}
- registerStats(statsLogger);
+ this.stats = new DbLedgerStorageStats(
+ statsLogger,
+ () ->
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheSize).sum(),
+ () ->
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheCount).sum(),
+ () ->
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheSize).sum(),
+ () ->
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheCount).sum()
+ );
}
@VisibleForTesting
@@ -136,53 +142,6 @@ public class DbLedgerStorage implements LedgerStorage {
stateManager, checkpointSource, checkpointer, statsLogger,
gcExecutor, writeCacheSize, readCacheSize);
}
- public void registerStats(StatsLogger stats) {
- stats.registerGauge("write-cache-size", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheSize).sum();
- }
- });
- stats.registerGauge("write-cache-count", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheCount).sum();
- }
- });
- stats.registerGauge("read-cache-size", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheSize).sum();
- }
- });
- stats.registerGauge("read-cache-count", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheCount).sum();
- }
- });
- }
-
@Override
public void start() {
ledgerStorageList.forEach(LedgerStorage::start);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java
new file mode 100644
index 0000000..bc99c60
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+
+import java.util.function.Supplier;
+import lombok.Getter;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for db ledger storage stats.
+ */
+@StatsDoc(
+ name = BOOKIE_SCOPE,
+ category = CATEGORY_SERVER,
+ help = "DbLedgerStorage related stats"
+)
+@Getter
+class DbLedgerStorageStats {
+
+ private static final String ADD_ENTRY = "add-entry";
+ private static final String READ_ENTRY = "read-entry";
+ private static final String READ_CACHE_HITS = "read-cache-hits";
+ private static final String READ_CACHE_MISSES = "read-cache-misses";
+ private static final String READAHEAD_BATCH_COUNT =
"readahead-batch-count";
+ private static final String READAHEAD_BATCH_SIZE = "readahead-batch-size";
+ private static final String FLUSH = "flush";
+ private static final String FLUSH_SIZE = "flush-size";
+ private static final String THROTTLED_WRITE_REQUESTS =
"throttled-write-requests";
+ private static final String REJECTED_WRITE_REQUESTS =
"rejected-write-requests";
+ private static final String WRITE_CACHE_SIZE = "write-cache-size";
+ private static final String WRITE_CACHE_COUNT = "write-cache-count";
+ private static final String READ_CACHE_SIZE = "read-cache-size";
+ private static final String READ_CACHE_COUNT = "read-cache-count";
+
+ @StatsDoc(
+ name = ADD_ENTRY,
+ help = "operation stats of adding entries to db ledger storage",
+ parent = BOOKIE_ADD_ENTRY
+ )
+ private final OpStatsLogger addEntryStats;
+ @StatsDoc(
+ name = READ_ENTRY,
+ help = "operation stats of reading entries from db ledger storage",
+ parent = BOOKIE_ADD_ENTRY
+ )
+ private final OpStatsLogger readEntryStats;
+ @StatsDoc(
+ name = READ_CACHE_HITS,
+ help = "operation stats of read cache hits",
+ parent = READ_ENTRY
+ )
+ private final OpStatsLogger readCacheHitStats;
+ @StatsDoc(
+ name = READ_CACHE_MISSES,
+ help = "operation stats of read cache misses",
+ parent = READ_ENTRY
+ )
+ private final OpStatsLogger readCacheMissStats;
+ @StatsDoc(
+ name = READAHEAD_BATCH_COUNT,
+ help = "the distribution of num of entries to read in one readahead
batch"
+ )
+ private final OpStatsLogger readAheadBatchCountStats;
+ @StatsDoc(
+ name = READAHEAD_BATCH_COUNT,
+ help = "the distribution of num of bytes to read in one readahead
batch"
+ )
+ private final OpStatsLogger readAheadBatchSizeStats;
+ @StatsDoc(
+ name = FLUSH_SIZE,
+ help = "operation stats of flushing write cache to entry log files"
+ )
+ private final OpStatsLogger flushStats;
+ @StatsDoc(
+ name = FLUSH_SIZE,
+ help = "the distribution of number of bytes flushed from write cache
to entry log files"
+ )
+ private final OpStatsLogger flushSizeStats;
+ @StatsDoc(
+ name = THROTTLED_WRITE_REQUESTS,
+ help = "The number of requests throttled due to write cache is full"
+ )
+ private final Counter throttledWriteRequests;
+ @StatsDoc(
+ name = REJECTED_WRITE_REQUESTS,
+ help = "The number of requests rejected due to write cache is full"
+ )
+ private final Counter rejectedWriteRequests;
+
+ @StatsDoc(
+ name = WRITE_CACHE_SIZE,
+ help = "Current number of bytes in write cache"
+ )
+ private final Gauge<Long> writeCacheSizeGauge;
+ @StatsDoc(
+ name = WRITE_CACHE_COUNT,
+ help = "Current number of entries in write cache"
+ )
+ private final Gauge<Long> writeCacheCountGauge;
+ @StatsDoc(
+ name = READ_CACHE_SIZE,
+ help = "Current number of bytes in read cache"
+ )
+ private final Gauge<Long> readCacheSizeGauge;
+ @StatsDoc(
+ name = READ_CACHE_COUNT,
+ help = "Current number of entries in read cache"
+ )
+ private final Gauge<Long> readCacheCountGauge;
+
+ DbLedgerStorageStats(StatsLogger stats,
+ Supplier<Long> writeCacheSizeSupplier,
+ Supplier<Long> writeCacheCountSupplier,
+ Supplier<Long> readCacheSizeSupplier,
+ Supplier<Long> readCacheCountSupplier) {
+ addEntryStats = stats.getOpStatsLogger(ADD_ENTRY);
+ readEntryStats = stats.getOpStatsLogger(READ_ENTRY);
+ readCacheHitStats = stats.getOpStatsLogger(READ_CACHE_HITS);
+ readCacheMissStats = stats.getOpStatsLogger(READ_CACHE_MISSES);
+ readAheadBatchCountStats =
stats.getOpStatsLogger(READAHEAD_BATCH_COUNT);
+ readAheadBatchSizeStats = stats.getOpStatsLogger(READAHEAD_BATCH_SIZE);
+ flushStats = stats.getOpStatsLogger(FLUSH);
+ flushSizeStats = stats.getOpStatsLogger(FLUSH_SIZE);
+
+ throttledWriteRequests = stats.getCounter(THROTTLED_WRITE_REQUESTS);
+ rejectedWriteRequests = stats.getCounter(REJECTED_WRITE_REQUESTS);
+
+ writeCacheSizeGauge = new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return writeCacheSizeSupplier.get();
+ }
+ };
+ stats.registerGauge(WRITE_CACHE_SIZE, writeCacheSizeGauge);
+ writeCacheCountGauge = new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return writeCacheCountSupplier.get();
+ }
+ };
+ stats.registerGauge(WRITE_CACHE_COUNT, writeCacheCountGauge);
+ readCacheSizeGauge = new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return readCacheSizeSupplier.get();
+ }
+ };
+ stats.registerGauge(READ_CACHE_SIZE, readCacheSizeGauge);
+ readCacheCountGauge = new Gauge<Long>() {
+
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return readCacheCountSupplier.get();
+ }
+ };
+ stats.registerGauge(READ_CACHE_COUNT, readCacheCountGauge);
+ }
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
index 21b87e2..5673883 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -34,7 +34,6 @@ import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
import org.slf4j.Logger;
@@ -51,33 +50,22 @@ public class EntryLocationIndex implements Closeable {
private final KeyValueStorage locationsDb;
private final ConcurrentLongHashSet deletedLedgers = new
ConcurrentLongHashSet();
- private StatsLogger stats;
+ private final EntryLocationIndexStats stats;
public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory
storageFactory, String basePath,
StatsLogger stats) throws IOException {
String locationsDbPath = FileSystems.getDefault().getPath(basePath,
"locations").toFile().toString();
locationsDb = storageFactory.newKeyValueStorage(locationsDbPath,
DbConfigType.Huge, conf);
- this.stats = stats;
- registerStats();
- }
-
- public void registerStats() {
- stats.registerGauge("entries-count", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
+ this.stats = new EntryLocationIndexStats(
+ stats,
+ () -> {
try {
return locationsDb.count();
} catch (IOException e) {
return -1L;
}
- }
- });
+ });
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexStats.java
new file mode 100644
index 0000000..dd87f7b
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexStats.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+
+import java.util.function.Supplier;
+import lombok.Getter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for ledger metadata index stats.
+ */
+@StatsDoc(
+ name = BOOKIE_SCOPE,
+ category = CATEGORY_SERVER,
+ help = "Entry location index stats"
+)
+@Getter
+class EntryLocationIndexStats {
+
+ private static final String ENTRIES_COUNT = "entries-count";
+
+ @StatsDoc(
+ name = ENTRIES_COUNT,
+ help = "Current number of entries"
+ )
+ private final Gauge<Long> entriesCountGauge;
+
+ EntryLocationIndexStats(StatsLogger statsLogger,
+ Supplier<Long> entriesCountSupplier) {
+ entriesCountGauge = new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return entriesCountSupplier.get();
+ }
+ };
+ statsLogger.registerGauge(ENTRIES_COUNT, entriesCountGauge);
+ }
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
index 04bf32d..8383db4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -39,7 +39,6 @@ import
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.Ledge
import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
@@ -56,7 +55,7 @@ public class LedgerMetadataIndex implements Closeable {
private final AtomicInteger ledgersCount;
private final KeyValueStorage ledgersDb;
- private StatsLogger stats;
+ private final LedgerMetadataIndexStats stats;
// Holds ledger modifications applied in memory map, and pending to be
flushed on db
private final ConcurrentLinkedQueue<Entry<Long, LedgerData>>
pendingLedgersUpdates;
@@ -89,22 +88,9 @@ public class LedgerMetadataIndex implements Closeable {
this.pendingLedgersUpdates = new ConcurrentLinkedQueue<Entry<Long,
LedgerData>>();
this.pendingDeletedLedgers = new ConcurrentLinkedQueue<Long>();
- this.stats = stats;
- registerStats();
- }
-
- public void registerStats() {
- stats.registerGauge("ledgers-count", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return (long) ledgersCount.get();
- }
- });
+ this.stats = new LedgerMetadataIndexStats(
+ stats,
+ () -> (long) ledgersCount.get());
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexStats.java
new file mode 100644
index 0000000..a46e38b
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexStats.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+
+import java.util.function.Supplier;
+import lombok.Getter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for ledger metadata index stats.
+ */
+@StatsDoc(
+ name = BOOKIE_SCOPE,
+ category = CATEGORY_SERVER,
+ help = "Ledger metadata index stats"
+)
+@Getter
+class LedgerMetadataIndexStats {
+
+ private static final String LEDGERS_COUNT = "ledgers-count";
+
+ @StatsDoc(
+ name = LEDGERS_COUNT,
+ help = "Current number of ledgers"
+ )
+ private final Gauge<Long> ledgersCountGauge;
+
+ LedgerMetadataIndexStats(StatsLogger statsLogger,
+ Supplier<Long> ledgersCountSupplier) {
+ ledgersCountGauge = new Gauge<Long>() {
+ @Override
+ public Long getDefaultValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long getSample() {
+ return ledgersCountSupplier.get();
+ }
+ };
+ statsLogger.registerGauge(LEDGERS_COUNT, ledgersCountGauge);
+ }
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 58504ab..197014a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -64,8 +64,6 @@ import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
@@ -124,19 +122,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
private final long maxThrottleTimeNanos;
- private final StatsLogger stats;
-
- private final OpStatsLogger addEntryStats;
- private final OpStatsLogger readEntryStats;
- private final OpStatsLogger readCacheHitStats;
- private final OpStatsLogger readCacheMissStats;
- private final OpStatsLogger readAheadBatchCountStats;
- private final OpStatsLogger readAheadBatchSizeStats;
- private final OpStatsLogger flushStats;
- private final OpStatsLogger flushSizeStats;
-
- private final Counter throttledWriteRequests;
- private final Counter rejectedWriteRequests;
+ private final DbLedgerStorageStats dbLedgerStorageStats;
static final String READ_AHEAD_CACHE_BATCH_SIZE =
"dbStorage_readAheadCacheBatchSize";
private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
@@ -169,10 +155,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
readCache = new ReadCache(readCacheMaxSize);
- this.stats = statsLogger;
-
- ledgerIndex = new LedgerMetadataIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, stats);
- entryLocationIndex = new EntryLocationIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, stats);
+ ledgerIndex = new LedgerMetadataIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, statsLogger);
+ entryLocationIndex = new EntryLocationIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, statsLogger);
transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
Runtime.getRuntime().availableProcessors() * 2);
@@ -183,62 +167,13 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
entryLogger = new EntryLogger(conf, ledgerDirsManager, null,
statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, this,
statsLogger);
- stats.registerGauge("write-cache-size", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return writeCache.size() + writeCacheBeingFlushed.size();
- }
- });
- stats.registerGauge("write-cache-count", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return writeCache.count() + writeCacheBeingFlushed.count();
- }
- });
- stats.registerGauge("read-cache-size", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return readCache.size();
- }
- });
- stats.registerGauge("read-cache-count", new Gauge<Long>() {
- @Override
- public Long getDefaultValue() {
- return 0L;
- }
-
- @Override
- public Long getSample() {
- return readCache.count();
- }
- });
-
- addEntryStats = stats.getOpStatsLogger("add-entry");
- readEntryStats = stats.getOpStatsLogger("read-entry");
- readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
- readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
- readAheadBatchCountStats =
stats.getOpStatsLogger("readahead-batch-count");
- readAheadBatchSizeStats =
stats.getOpStatsLogger("readahead-batch-size");
- flushStats = stats.getOpStatsLogger("flush");
- flushSizeStats = stats.getOpStatsLogger("flush-size");
-
- throttledWriteRequests = stats.getCounter("throttled-write-requests");
- rejectedWriteRequests = stats.getCounter("rejected-write-requests");
+ dbLedgerStorageStats = new DbLedgerStorageStats(
+ statsLogger,
+ () -> writeCache.size() + writeCacheBeingFlushed.size(),
+ () -> writeCache.count() + writeCacheBeingFlushed.count(),
+ () -> readCache.size(),
+ () -> readCache.count()
+ );
}
@Override
@@ -393,7 +328,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
// after successfully insert the entry, update LAC and notify the
watchers
updateCachedLacIfNeeded(ledgerId, lac);
- recordSuccessfulEvent(addEntryStats, startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getAddEntryStats(),
startTime);
return entryId;
}
@@ -414,7 +349,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
});
}
- throttledWriteRequests.inc();
+ dbLedgerStorageStats.getThrottledWriteRequests().inc();
long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos;
while (System.nanoTime() < absoluteTimeoutNanos) {
@@ -438,7 +373,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
}
// Timeout expired and we weren't able to insert in write cache
- rejectedWriteRequests.inc();
+ dbLedgerStorageStats.getRejectedWriteRequests().inc();
throw new OperationRejectedException();
}
@@ -473,24 +408,24 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
// First try to read from the write cache of recent entries
ByteBuf entry = localWriteCache.get(ledgerId, entryId);
if (entry != null) {
- recordSuccessfulEvent(readCacheHitStats, startTime);
- recordSuccessfulEvent(readEntryStats, startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(),
startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(),
startTime);
return entry;
}
// If there's a flush going on, the entry might be in the flush buffer
entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
if (entry != null) {
- recordSuccessfulEvent(readCacheHitStats, startTime);
- recordSuccessfulEvent(readEntryStats, startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(),
startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(),
startTime);
return entry;
}
// Try reading from read-ahead cache
entry = readCache.get(ledgerId, entryId);
if (entry != null) {
- recordSuccessfulEvent(readCacheHitStats, startTime);
- recordSuccessfulEvent(readEntryStats, startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(),
startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(),
startTime);
return entry;
}
@@ -503,7 +438,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
}
entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
} catch (NoEntryException e) {
- recordFailedEvent(readEntryStats, startTime);
+ recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(),
startTime);
throw e;
}
@@ -513,8 +448,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
long nextEntryLocation = entryLocation + 4 /* size header */ +
entry.readableBytes();
fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
- recordSuccessfulEvent(readCacheMissStats, startTime);
- recordSuccessfulEvent(readEntryStats, startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(),
startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(),
startTime);
return entry;
}
@@ -551,8 +486,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
}
}
- readAheadBatchCountStats.registerSuccessfulValue(count);
- readAheadBatchSizeStats.registerSuccessfulValue(size);
+
dbLedgerStorageStats.getReadAheadBatchCountStats().registerSuccessfulValue(count);
+
dbLedgerStorageStats.getReadAheadBatchSizeStats().registerSuccessfulValue(size);
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Exception during read ahead for ledger: {}: e",
orginalLedgerId, e);
@@ -578,8 +513,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
}
}
- recordSuccessfulEvent(readCacheHitStats, startTime);
- recordSuccessfulEvent(readEntryStats, startTime);
+
recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
+
recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
return entry;
}
@@ -595,8 +530,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
}
}
- recordSuccessfulEvent(readCacheHitStats, startTime);
- recordSuccessfulEvent(readEntryStats, startTime);
+
recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
+
recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
return entry;
}
} finally {
@@ -612,8 +547,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
long entryLocation = entryLocationIndex.getLocation(ledgerId,
lastEntryId);
ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId,
entryLocation);
- recordSuccessfulEvent(readCacheMissStats, startTime);
- recordSuccessfulEvent(readEntryStats, startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(),
startTime);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(),
startTime);
return content;
}
@@ -702,8 +637,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
log.debug("Flushing done time {} s -- Written {} MB/s",
flushTimeSeconds, flushThroughput);
}
- recordSuccessfulEvent(flushStats, startTime);
- flushSizeStats.registerSuccessfulValue(sizeToFlush);
+ recordSuccessfulEvent(dbLedgerStorageStats.getFlushStats(),
startTime);
+
dbLedgerStorageStats.getFlushSizeStats().registerSuccessfulValue(sizeToFlush);
} catch (IOException e) {
// Leave IOExecption as it is
throw e;