This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b69f1e4  ISSUE #568: Refactor EntryLoggerAllocator to enable 
preallocation
b69f1e4 is described below

commit b69f1e4d63e3f7adf04a772bd5cef002c81bc311
Author: Arvin <[email protected]>
AuthorDate: Wed Dec 6 00:30:44 2017 -0800

    ISSUE #568: Refactor EntryLoggerAllocator to enable preallocation
    
    Descriptions of the changes in this PR:
    
    Enable 'entryLogFilePreallocationEnabled' which is  never be used.
    
    Author: Arvin <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #814 from ArvinDevel/issue568, closes #568
---
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 52 +++++++++----
 .../org/apache/bookkeeper/bookie/EntryLogTest.java | 88 ++++++++++++++++------
 2 files changed, 103 insertions(+), 37 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 056f298..c520b3d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -128,6 +128,7 @@ public class EntryLogger {
     private List<BufferedLogChannel> logChannelsToFlush;
     private volatile BufferedLogChannel logChannel;
     private volatile BufferedLogChannel compactionLogChannel;
+
     private final EntryLoggerAllocator entryLoggerAllocator;
     private final boolean entryLogPreAllocationEnabled;
     private final CopyOnWriteArrayList<EntryLogListener> listeners = new 
CopyOnWriteArrayList<EntryLogListener>();
@@ -494,6 +495,12 @@ public class EntryLogger {
     }
 
     /**
+     * get EntryLoggerAllocator, Just for tests.
+     */
+    EntryLoggerAllocator getEntryLoggerAllocator() {
+        return entryLoggerAllocator;
+    }
+    /**
      * Append the ledger map at the end of the entry log.
      * Updates the entry log file header with the offset and size of the map.
      */
@@ -561,28 +568,34 @@ public class EntryLogger {
         BufferedLogChannel createNewLog() throws IOException {
             synchronized (createEntryLogLock) {
                 BufferedLogChannel bc;
-                if (!entryLogPreAllocationEnabled || null == preallocation) {
-                    // initialization time to create a new log
+                if (!entryLogPreAllocationEnabled){
+                    // create a new log directly
                     bc = allocateNewLog();
                     return bc;
                 } else {
-                    // has a preallocated entry log
-                    try {
-                        bc = preallocation.get();
-                    } catch (ExecutionException ee) {
-                        if (ee.getCause() instanceof IOException) {
-                            throw (IOException) (ee.getCause());
-                        } else {
-                            throw new IOException("Error to execute entry log 
allocation.", ee);
+                    // allocate directly to response request
+                    if (null == preallocation){
+                        bc = allocateNewLog();
+                    } else {
+                        // has a preallocated entry log
+                        try {
+                            bc = preallocation.get();
+                        } catch (ExecutionException ee) {
+                            if (ee.getCause() instanceof IOException) {
+                                throw (IOException) (ee.getCause());
+                            } else {
+                                throw new IOException("Error to execute entry 
log allocation.", ee);
+                            }
+                        } catch (CancellationException ce) {
+                            throw new IOException("Task to allocate a new 
entry log is cancelled.", ce);
+                        } catch (InterruptedException ie) {
+                            throw new IOException("Intrrupted when waiting a 
new entry log to be allocated.", ie);
                         }
-                    } catch (CancellationException ce) {
-                        throw new IOException("Task to allocate a new entry 
log is cancelled.", ce);
-                    } catch (InterruptedException ie) {
-                        throw new IOException("Intrrupted when waiting a new 
entry log to be allocated.", ie);
                     }
+                    // preallocate a new log in background upon every call
+                    preallocation = allocatorExecutor.submit(() -> 
allocateNewLog());
+                    return bc;
                 }
-                preallocation = allocatorExecutor.submit(() -> 
allocateNewLog());
-                return bc;
             }
         }
 
@@ -642,6 +655,13 @@ public class EntryLogger {
             allocatorExecutor.shutdown();
             LOG.info("Stopped entry logger preallocator.");
         }
+
+        /**
+         * get the preallocation for tests.
+         */
+        Future<BufferedLogChannel> getPreallocationFuture(){
+            return preallocation;
+        }
     }
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index e939123..db161f5 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -20,17 +20,20 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
-import java.io.IOException;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
@@ -42,10 +45,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
 
+/**
+ * Tests for EntryLog.
+ */
 public class EntryLogTest {
-    private final static Logger LOG = 
LoggerFactory.getLogger(EntryLogTest.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogTest.class);
 
     final List<File> tempDirs = new ArrayList<File>();
 
@@ -75,7 +80,7 @@ public class EntryLogTest {
         conf.setLedgerDirNames(new String[] {tmpDir.toString()});
         Bookie bookie = new Bookie(conf);
         // create some entries
-        EntryLogger logger = 
((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
+        EntryLogger logger = ((InterleavedLedgerStorage) 
bookie.ledgerStorage).entryLogger;
         logger.addEntry(1, generateEntry(1, 1).nioBuffer());
         logger.addEntry(3, generateEntry(3, 1).nioBuffer());
         logger.addEntry(2, generateEntry(2, 1).nioBuffer());
@@ -83,7 +88,7 @@ public class EntryLogTest {
         // now lets truncate the file to corrupt the last entry, which 
simulates a partial write
         File f = new File(curDir, "0.log");
         RandomAccessFile raf = new RandomAccessFile(f, "rw");
-        raf.setLength(raf.length()-10);
+        raf.setLength(raf.length() - 10);
         raf.close();
         // now see which ledgers are in the log
         logger = new EntryLogger(conf, bookie.getLedgerDirsManager());
@@ -116,13 +121,13 @@ public class EntryLogTest {
         // create some entries
         int numLogs = 3;
         int numEntries = 10;
-        long[][] positions = new long[2*numLogs][];
-        for (int i=0; i<numLogs; i++) {
+        long[][] positions = new long[2 * numLogs][];
+        for (int i = 0; i < numLogs; i++) {
             positions[i] = new long[numEntries];
 
             EntryLogger logger = new EntryLogger(conf,
                     bookie.getLedgerDirsManager());
-            for (int j=0; j<numEntries; j++) {
+            for (int j = 0; j < numEntries; j++) {
                 positions[i][j] = logger.addEntry(i, generateEntry(i, 
j).nioBuffer());
             }
             logger.flush();
@@ -132,12 +137,12 @@ public class EntryLogTest {
         lastLogId.delete();
 
         // write another entries
-        for (int i=numLogs; i<2*numLogs; i++) {
+        for (int i = numLogs; i < 2 * numLogs; i++) {
             positions[i] = new long[numEntries];
 
             EntryLogger logger = new EntryLogger(conf,
                     bookie.getLedgerDirsManager());
-            for (int j=0; j<numEntries; j++) {
+            for (int j = 0; j < numEntries; j++) {
                 positions[i][j] = logger.addEntry(i, generateEntry(i, 
j).nioBuffer());
             }
             logger.flush();
@@ -145,12 +150,12 @@ public class EntryLogTest {
 
         EntryLogger newLogger = new EntryLogger(conf,
                 bookie.getLedgerDirsManager());
-        for (int i=0; i<(2*numLogs+1); i++) {
+        for (int i = 0; i < (2 * numLogs + 1); i++) {
             File logFile = new File(curDir, Long.toHexString(i) + ".log");
             assertTrue(logFile.exists());
         }
-        for (int i=0; i<2*numLogs; i++) {
-            for (int j=0; j<numEntries; j++) {
+        for (int i = 0; i < 2 * numLogs; i++) {
+            for (int j = 0; j < numEntries; j++) {
                 String expectedValue = "ledger-" + i + "-" + j;
                 ByteBuf value = newLogger.readEntry(i, j, positions[i][j]);
                 long ledgerId = value.readLong();
@@ -166,7 +171,9 @@ public class EntryLogTest {
     }
 
     @Test
-    /** Test that EntryLogger Should fail with FNFE, if entry logger 
directories does not exist*/
+    /**
+     * Test that EntryLogger Should fail with FNFE, if entry logger 
directories does not exist.
+     */
     public void testEntryLoggerShouldThrowFNFEIfDirectoriesDoesNotExist()
             throws Exception {
         File tmpDir = createTempDir("bkTest", ".dir");
@@ -188,7 +195,7 @@ public class EntryLogTest {
     }
 
     /**
-     * Test to verify the DiskFull during addEntry
+     * Test to verify the DiskFull during addEntry.
      */
     @Test
     public void testAddEntryFailureOnDiskFull() throws Exception {
@@ -219,7 +226,7 @@ public class EntryLogTest {
     }
 
     /**
-     * Explicitely try to recover using the ledgers map index at the end of 
the entry log
+     * Explicitely try to recover using the ledgers map index at the end of 
the entry log.
      */
     @Test
     public void testRecoverFromLedgersMap() throws Exception {
@@ -234,7 +241,7 @@ public class EntryLogTest {
         Bookie bookie = new Bookie(conf);
 
         // create some entries
-        EntryLogger logger = 
((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
+        EntryLogger logger = ((InterleavedLedgerStorage) 
bookie.ledgerStorage).entryLogger;
         logger.addEntry(1, generateEntry(1, 1).nioBuffer());
         logger.addEntry(3, generateEntry(3, 1).nioBuffer());
         logger.addEntry(2, generateEntry(2, 1).nioBuffer());
@@ -253,7 +260,7 @@ public class EntryLogTest {
     }
 
     /**
-     * Explicitely try to recover using the ledgers map index at the end of 
the entry log
+     * Explicitely try to recover using the ledgers map index at the end of 
the entry log.
      */
     @Test
     public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception {
@@ -304,4 +311,43 @@ public class EntryLogTest {
         assertEquals(120, meta.getRemainingSize());
     }
 
+    /**
+     * Test pre-allocate for entry log in EntryLoggerAllocator.
+     * @throws Exception
+     */
+    @Test
+    public void testPreAllocateLog() throws Exception {
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        // enable pre-allocation case
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        Bookie bookie = new Bookie(conf);
+        // create a logger whose initialization phase allocating a new entry 
log
+        EntryLogger logger = ((InterleavedLedgerStorage) 
bookie.ledgerStorage).entryLogger;
+        
assertNotNull(logger.getEntryLoggerAllocator().getPreallocationFuture());
+
+        logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+        // the Future<BufferedLogChannel> is not null all the time
+        
assertNotNull(logger.getEntryLoggerAllocator().getPreallocationFuture());
+
+        // disable pre-allocation case
+        ServerConfiguration conf2 = 
TestBKConfiguration.newServerConfiguration();
+        conf2.setLedgerDirNames(new String[] {tmpDir.toString()});
+        conf2.setEntryLogFilePreAllocationEnabled(false);
+        Bookie bookie2 = new Bookie(conf2);
+        // create a logger
+        EntryLogger logger2 = ((InterleavedLedgerStorage) 
bookie2.ledgerStorage).entryLogger;
+        assertNull(logger2.getEntryLoggerAllocator().getPreallocationFuture());
+
+        logger2.addEntry(2, generateEntry(1, 1).nioBuffer());
+
+        // the Future<BufferedLogChannel> is null all the time
+        assertNull(logger2.getEntryLoggerAllocator().getPreallocationFuture());
+
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to