This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b69f1e4 ISSUE #568: Refactor EntryLoggerAllocator to enable
preallocation
b69f1e4 is described below
commit b69f1e4d63e3f7adf04a772bd5cef002c81bc311
Author: Arvin <[email protected]>
AuthorDate: Wed Dec 6 00:30:44 2017 -0800
ISSUE #568: Refactor EntryLoggerAllocator to enable preallocation
Descriptions of the changes in this PR:
Enable 'entryLogFilePreallocationEnabled' which is never be used.
Author: Arvin <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #814 from ArvinDevel/issue568, closes #568
---
.../org/apache/bookkeeper/bookie/EntryLogger.java | 52 +++++++++----
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 88 ++++++++++++++++------
2 files changed, 103 insertions(+), 37 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 056f298..c520b3d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -128,6 +128,7 @@ public class EntryLogger {
private List<BufferedLogChannel> logChannelsToFlush;
private volatile BufferedLogChannel logChannel;
private volatile BufferedLogChannel compactionLogChannel;
+
private final EntryLoggerAllocator entryLoggerAllocator;
private final boolean entryLogPreAllocationEnabled;
private final CopyOnWriteArrayList<EntryLogListener> listeners = new
CopyOnWriteArrayList<EntryLogListener>();
@@ -494,6 +495,12 @@ public class EntryLogger {
}
/**
+ * get EntryLoggerAllocator, Just for tests.
+ */
+ EntryLoggerAllocator getEntryLoggerAllocator() {
+ return entryLoggerAllocator;
+ }
+ /**
* Append the ledger map at the end of the entry log.
* Updates the entry log file header with the offset and size of the map.
*/
@@ -561,28 +568,34 @@ public class EntryLogger {
BufferedLogChannel createNewLog() throws IOException {
synchronized (createEntryLogLock) {
BufferedLogChannel bc;
- if (!entryLogPreAllocationEnabled || null == preallocation) {
- // initialization time to create a new log
+ if (!entryLogPreAllocationEnabled){
+ // create a new log directly
bc = allocateNewLog();
return bc;
} else {
- // has a preallocated entry log
- try {
- bc = preallocation.get();
- } catch (ExecutionException ee) {
- if (ee.getCause() instanceof IOException) {
- throw (IOException) (ee.getCause());
- } else {
- throw new IOException("Error to execute entry log
allocation.", ee);
+ // allocate directly to response request
+ if (null == preallocation){
+ bc = allocateNewLog();
+ } else {
+ // has a preallocated entry log
+ try {
+ bc = preallocation.get();
+ } catch (ExecutionException ee) {
+ if (ee.getCause() instanceof IOException) {
+ throw (IOException) (ee.getCause());
+ } else {
+ throw new IOException("Error to execute entry
log allocation.", ee);
+ }
+ } catch (CancellationException ce) {
+ throw new IOException("Task to allocate a new
entry log is cancelled.", ce);
+ } catch (InterruptedException ie) {
+ throw new IOException("Intrrupted when waiting a
new entry log to be allocated.", ie);
}
- } catch (CancellationException ce) {
- throw new IOException("Task to allocate a new entry
log is cancelled.", ce);
- } catch (InterruptedException ie) {
- throw new IOException("Intrrupted when waiting a new
entry log to be allocated.", ie);
}
+ // preallocate a new log in background upon every call
+ preallocation = allocatorExecutor.submit(() ->
allocateNewLog());
+ return bc;
}
- preallocation = allocatorExecutor.submit(() ->
allocateNewLog());
- return bc;
}
}
@@ -642,6 +655,13 @@ public class EntryLogger {
allocatorExecutor.shutdown();
LOG.info("Stopped entry logger preallocator.");
}
+
+ /**
+ * get the preallocation for tests.
+ */
+ Future<BufferedLogChannel> getPreallocationFuture(){
+ return preallocation;
+ }
}
/**
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index e939123..db161f5 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -20,17 +20,20 @@
*/
package org.apache.bookkeeper.bookie;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
-import java.io.IOException;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.util.DiskChecker;
@@ -42,10 +45,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
+/**
+ * Tests for EntryLog.
+ */
public class EntryLogTest {
- private final static Logger LOG =
LoggerFactory.getLogger(EntryLogTest.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(EntryLogTest.class);
final List<File> tempDirs = new ArrayList<File>();
@@ -75,7 +80,7 @@ public class EntryLogTest {
conf.setLedgerDirNames(new String[] {tmpDir.toString()});
Bookie bookie = new Bookie(conf);
// create some entries
- EntryLogger logger =
((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
+ EntryLogger logger = ((InterleavedLedgerStorage)
bookie.ledgerStorage).entryLogger;
logger.addEntry(1, generateEntry(1, 1).nioBuffer());
logger.addEntry(3, generateEntry(3, 1).nioBuffer());
logger.addEntry(2, generateEntry(2, 1).nioBuffer());
@@ -83,7 +88,7 @@ public class EntryLogTest {
// now lets truncate the file to corrupt the last entry, which
simulates a partial write
File f = new File(curDir, "0.log");
RandomAccessFile raf = new RandomAccessFile(f, "rw");
- raf.setLength(raf.length()-10);
+ raf.setLength(raf.length() - 10);
raf.close();
// now see which ledgers are in the log
logger = new EntryLogger(conf, bookie.getLedgerDirsManager());
@@ -116,13 +121,13 @@ public class EntryLogTest {
// create some entries
int numLogs = 3;
int numEntries = 10;
- long[][] positions = new long[2*numLogs][];
- for (int i=0; i<numLogs; i++) {
+ long[][] positions = new long[2 * numLogs][];
+ for (int i = 0; i < numLogs; i++) {
positions[i] = new long[numEntries];
EntryLogger logger = new EntryLogger(conf,
bookie.getLedgerDirsManager());
- for (int j=0; j<numEntries; j++) {
+ for (int j = 0; j < numEntries; j++) {
positions[i][j] = logger.addEntry(i, generateEntry(i,
j).nioBuffer());
}
logger.flush();
@@ -132,12 +137,12 @@ public class EntryLogTest {
lastLogId.delete();
// write another entries
- for (int i=numLogs; i<2*numLogs; i++) {
+ for (int i = numLogs; i < 2 * numLogs; i++) {
positions[i] = new long[numEntries];
EntryLogger logger = new EntryLogger(conf,
bookie.getLedgerDirsManager());
- for (int j=0; j<numEntries; j++) {
+ for (int j = 0; j < numEntries; j++) {
positions[i][j] = logger.addEntry(i, generateEntry(i,
j).nioBuffer());
}
logger.flush();
@@ -145,12 +150,12 @@ public class EntryLogTest {
EntryLogger newLogger = new EntryLogger(conf,
bookie.getLedgerDirsManager());
- for (int i=0; i<(2*numLogs+1); i++) {
+ for (int i = 0; i < (2 * numLogs + 1); i++) {
File logFile = new File(curDir, Long.toHexString(i) + ".log");
assertTrue(logFile.exists());
}
- for (int i=0; i<2*numLogs; i++) {
- for (int j=0; j<numEntries; j++) {
+ for (int i = 0; i < 2 * numLogs; i++) {
+ for (int j = 0; j < numEntries; j++) {
String expectedValue = "ledger-" + i + "-" + j;
ByteBuf value = newLogger.readEntry(i, j, positions[i][j]);
long ledgerId = value.readLong();
@@ -166,7 +171,9 @@ public class EntryLogTest {
}
@Test
- /** Test that EntryLogger Should fail with FNFE, if entry logger
directories does not exist*/
+ /**
+ * Test that EntryLogger Should fail with FNFE, if entry logger
directories does not exist.
+ */
public void testEntryLoggerShouldThrowFNFEIfDirectoriesDoesNotExist()
throws Exception {
File tmpDir = createTempDir("bkTest", ".dir");
@@ -188,7 +195,7 @@ public class EntryLogTest {
}
/**
- * Test to verify the DiskFull during addEntry
+ * Test to verify the DiskFull during addEntry.
*/
@Test
public void testAddEntryFailureOnDiskFull() throws Exception {
@@ -219,7 +226,7 @@ public class EntryLogTest {
}
/**
- * Explicitely try to recover using the ledgers map index at the end of
the entry log
+ * Explicitely try to recover using the ledgers map index at the end of
the entry log.
*/
@Test
public void testRecoverFromLedgersMap() throws Exception {
@@ -234,7 +241,7 @@ public class EntryLogTest {
Bookie bookie = new Bookie(conf);
// create some entries
- EntryLogger logger =
((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
+ EntryLogger logger = ((InterleavedLedgerStorage)
bookie.ledgerStorage).entryLogger;
logger.addEntry(1, generateEntry(1, 1).nioBuffer());
logger.addEntry(3, generateEntry(3, 1).nioBuffer());
logger.addEntry(2, generateEntry(2, 1).nioBuffer());
@@ -253,7 +260,7 @@ public class EntryLogTest {
}
/**
- * Explicitely try to recover using the ledgers map index at the end of
the entry log
+ * Explicitely try to recover using the ledgers map index at the end of
the entry log.
*/
@Test
public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception {
@@ -304,4 +311,43 @@ public class EntryLogTest {
assertEquals(120, meta.getRemainingSize());
}
+ /**
+ * Test pre-allocate for entry log in EntryLoggerAllocator.
+ * @throws Exception
+ */
+ @Test
+ public void testPreAllocateLog() throws Exception {
+ File tmpDir = createTempDir("bkTest", ".dir");
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ // enable pre-allocation case
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+ conf.setEntryLogFilePreAllocationEnabled(true);
+ Bookie bookie = new Bookie(conf);
+ // create a logger whose initialization phase allocating a new entry
log
+ EntryLogger logger = ((InterleavedLedgerStorage)
bookie.ledgerStorage).entryLogger;
+
assertNotNull(logger.getEntryLoggerAllocator().getPreallocationFuture());
+
+ logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+ // the Future<BufferedLogChannel> is not null all the time
+
assertNotNull(logger.getEntryLoggerAllocator().getPreallocationFuture());
+
+ // disable pre-allocation case
+ ServerConfiguration conf2 =
TestBKConfiguration.newServerConfiguration();
+ conf2.setLedgerDirNames(new String[] {tmpDir.toString()});
+ conf2.setEntryLogFilePreAllocationEnabled(false);
+ Bookie bookie2 = new Bookie(conf2);
+ // create a logger
+ EntryLogger logger2 = ((InterleavedLedgerStorage)
bookie2.ledgerStorage).entryLogger;
+ assertNull(logger2.getEntryLoggerAllocator().getPreallocationFuture());
+
+ logger2.addEntry(2, generateEntry(1, 1).nioBuffer());
+
+ // the Future<BufferedLogChannel> is null all the time
+ assertNull(logger2.getEntryLoggerAllocator().getPreallocationFuture());
+
+ }
+
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].