This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a671d92 DbLedgerStorage -- Make some entrylogger methods accessibles
a671d92 is described below
commit a671d92441c222052f9a5cdcb396a343d60882e5
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Dec 14 12:35:02 2017 -0800
DbLedgerStorage -- Make some entrylogger methods accessibles
This changes are needed for `DbLedgerStorage` implementation to access some
of the protected methods of the EntryLogger.
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #844 from merlimat/db-ledger-storage-itemized and squashes the
following commits:
b4d886ea [Matteo Merli] Removed timeout and fixed test
46652dd4 [Matteo Merli] DbLedgerStorage -- Make entrylogger read methods
accessibles
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 5 ++
.../org/apache/bookkeeper/bookie/EntryLogger.java | 53 +++++++++++++++++++---
.../bookie/InterleavedLedgerStorage.java | 4 +-
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 35 +++++++++++++-
4 files changed, 89 insertions(+), 8 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index f4b7183..2c526db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1437,6 +1437,11 @@ public class Bookie extends BookieCriticalThread {
return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
}
+ @VisibleForTesting
+ public LedgerStorage getLedgerStorage() {
+ return ledgerStorage;
+ }
+
// The rest of the code is test stuff
static class CounterCallback implements WriteCallback {
int count;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 589e43a..a1a9254 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -26,6 +26,7 @@ import static
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPAC
import static
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
import com.google.common.collect.MapMaker;
+import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -40,6 +41,7 @@ import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
@@ -52,6 +54,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -203,7 +206,7 @@ public class EntryLogger {
/**
* Scan entries in a entry log file.
*/
- interface EntryLogScanner {
+ public interface EntryLogScanner {
/**
* Tests whether or not the entries belongs to the specified ledger
* should be processed.
@@ -837,7 +840,7 @@ public class EntryLogger {
leastUnflushedLogId = flushedLogId + 1;
}
- void flush() throws IOException {
+ public void flush() throws IOException {
flushRotatedLogs();
flushCurrentLog();
}
@@ -867,7 +870,7 @@ public class EntryLogger {
}
};
- synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog)
throws IOException {
+ public synchronized long addEntry(long ledger, ByteBuf entry, boolean
rollLog) throws IOException {
int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to
prepend the size
boolean reachEntryLogLimit =
rollLog ? reachEntryLogLimit(entrySize) :
readEntryLogHardLimit(entrySize);
@@ -963,7 +966,6 @@ public class EntryLogger {
}
}
-
private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws
IOException {
if (!doRegularFlushes) {
return;
@@ -986,7 +988,8 @@ public class EntryLogger {
return logChannel.position() + size > Integer.MAX_VALUE;
}
- ByteBuf readEntry(long ledgerId, long entryId, long location) throws
IOException, Bookie.NoEntryException {
+ public ByteBuf internalReadEntry(long ledgerId, long entryId, long
location)
+ throws IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(location);
long pos = location & 0xffffffffL;
ByteBuf sizeBuff = sizeBuffer.get();
@@ -1035,6 +1038,15 @@ public class EntryLogger {
+ pos + "(" + rc + "!=" +
entrySize + ")", ledgerId, entryId);
}
data.writerIndex(entrySize);
+
+ return data;
+ }
+
+ public ByteBuf readEntry(long ledgerId, long entryId, long location)
throws IOException, Bookie.NoEntryException {
+ long entryLogId = logIdForOffset(location);
+ long pos = location & 0xffffffffL;
+
+ ByteBuf data = internalReadEntry(ledgerId, entryId, location);
long thisLedgerId = data.getLong(0);
if (thisLedgerId != ledgerId) {
data.release();
@@ -1112,6 +1124,35 @@ public class EntryLogger {
return false;
}
+ /**
+ * Returns a set with the ids of all the entry log files.
+ *
+ * @throws IOException
+ */
+ public Set<Long> getEntryLogsSet() throws IOException {
+ Set<Long> entryLogs = Sets.newTreeSet();
+
+ final FilenameFilter logFileFilter = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.endsWith(".log");
+ }
+ };
+
+ for (File d : ledgerDirsManager.getAllLedgerDirs()) {
+ File[] files = d.listFiles(logFileFilter);
+ if (files == null) {
+ throw new IOException("Failed to get list of files in
directory " + d);
+ }
+
+ for (File f : files) {
+ Long entryLogId = Long.parseLong(f.getName().split(".log")[0],
16);
+ entryLogs.add(entryLogId);
+ }
+ }
+ return entryLogs;
+ }
+
private File findFile(long logId) throws FileNotFoundException {
for (File d : ledgerDirsManager.getAllLedgerDirs()) {
File f = new File(d, Long.toHexString(logId) + ".log");
@@ -1129,7 +1170,7 @@ public class EntryLogger {
* @param scanner Entry Log Scanner
* @throws IOException
*/
- protected void scanEntryLog(long entryLogId, EntryLogScanner scanner)
throws IOException {
+ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws
IOException {
// Buffer where to read the entrySize (4 bytes) and the ledgerId (8
bytes)
ByteBuf headerBuffer = Unpooled.buffer(4 + 8);
BufferedReadChannel bc;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 2b357ee..b5c9513 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -25,6 +25,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
@@ -83,7 +84,8 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
private OpStatsLogger getOffsetStats;
private OpStatsLogger getEntryStats;
- InterleavedLedgerStorage() {
+ @VisibleForTesting
+ public InterleavedLedgerStorage() {
activeLedgers = new SnapshotMap<Long, Boolean>();
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 527762b..f402a94 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -27,14 +27,18 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.collect.Sets;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.util.DiskChecker;
@@ -46,7 +50,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Tests for EntryLog.
*/
@@ -351,4 +354,34 @@ public class EntryLogTest {
}
+ /**
+ * Test the getEntryLogsSet() method.
+ */
+ @Test
+ public void testGetEntryLogsSet() throws Exception {
+ File tmpDir = createTempDir("bkTest", ".dir");
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ int gcWaitTime = 1000;
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ Bookie bookie = new Bookie(conf);
+
+ // create some entries
+ EntryLogger logger = ((InterleavedLedgerStorage)
bookie.ledgerStorage).entryLogger;
+
+ assertEquals(Sets.newHashSet(0L, 1L), logger.getEntryLogsSet());
+
+ logger.rollLog();
+ logger.flushRotatedLogs();
+
+ assertEquals(Sets.newHashSet(0L, 1L, 2L), logger.getEntryLogsSet());
+
+ logger.rollLog();
+ logger.flushRotatedLogs();
+
+ assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L),
logger.getEntryLogsSet());
+ }
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].