This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 21f125d  ISSUE #1527: Make ExplicitLAC persistent
21f125d is described below

commit 21f125d76870d7c244b87ad32a2f6f52421778eb
Author: cguttapalem <[email protected]>
AuthorDate: Mon Jul 23 14:21:24 2018 -0700

    ISSUE #1527: Make ExplicitLAC persistent
    
    Descriptions of the changes in this PR:
    
    For persisting explicitLAC, we can follow the
    same approach as followed in persisting
    fencing information / stateBits (d69986c)
    in FileInfo file and special marker entry
    in Journal.
    
    ### Motivation
    
    ExplicitLAC is kept in the memory and it can be lost in bookie reboot.
    Though it is an extreme corner case scenario, it can break one of the BK 
guarantees.
    " If you read once you can always read it".
    If all the bookies of the Write stripe were rebooted, it can loose its 
explicitLAC value and the client
    which was able to read the entry before the reboot, can't read it anymore.
    
    Master Issue: #1527
    
    Author: cguttapalem <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #1532 from reddycharan/storeexplicitlac, closes #1527
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  47 ++++-
 .../org/apache/bookkeeper/bookie/FileInfo.java     |  51 ++++-
 .../bookkeeper/bookie/FileInfoBackingCache.java    |   6 +-
 .../bookkeeper/bookie/IndexPersistenceMgr.java     |   3 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |  19 +-
 .../apache/bookkeeper/bookie/JournalChannel.java   |   4 +-
 .../apache/bookkeeper/bookie/ReadOnlyFileInfo.java |   7 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  26 +++
 .../bookkeeper/proto/WriteLacProcessorV3.java      |  46 ++++-
 .../bookkeeper/bookie/BookieJournalTest.java       |   4 +-
 .../bookkeeper/bookie/IndexPersistenceMgrTest.java | 123 +++++++++++-
 .../bookkeeper/bookie/LedgerStorageTest.java       | 219 +++++++++++++++++++++
 .../bookie/TestFileInfoBackingCache.java           |  10 +-
 .../org/apache/bookkeeper/bookie/UpgradeTest.java  |   2 +-
 .../apache/bookkeeper/client/ExplicitLacTest.java  |  69 +++++++
 .../bookkeeper/conf/TestServerConfiguration.java   |  28 +++
 conf/bk_server.conf                                |  13 ++
 site/_data/config/bk_server.yaml                   |  11 ++
 18 files changed, 656 insertions(+), 32 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 79153dc..d59a7e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -32,11 +32,13 @@ import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
+import static org.apache.bookkeeper.bookie.Bookie.METAENTRY_ID_FENCE_KEY;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -121,6 +123,7 @@ public class Bookie extends BookieCriticalThread {
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
     public static final long METAENTRY_ID_FORCE_LEDGER  = -0x4000;
+    static final long METAENTRY_ID_LEDGER_EXPLICITLAC  = -0x8000;
 
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
@@ -788,6 +791,35 @@ public class Bookie extends BookieCriticalThread {
                                     + " but layout version (" + journalVersion
                                     + ") is too old to hold this");
                         }
+                    } else if (entryId == METAENTRY_ID_LEDGER_EXPLICITLAC) {
+                        if (journalVersion >= JournalChannel.V6) {
+                            int explicitLacBufLength = recBuff.getInt();
+                            ByteBuf explicitLacBuf = 
Unpooled.buffer(explicitLacBufLength);
+                            byte[] explicitLacBufArray = new 
byte[explicitLacBufLength];
+                            recBuff.get(explicitLacBufArray);
+                            explicitLacBuf.writeBytes(explicitLacBufArray);
+                            byte[] key = masterKeyCache.get(ledgerId);
+                            if (key == null) {
+                                key = ledgerStorage.readMasterKey(ledgerId);
+                            }
+                            LedgerDescriptor handle = 
handles.getHandle(ledgerId, key);
+                            handle.setExplicitLac(explicitLacBuf);
+                        } else {
+                            throw new IOException("Invalid journal. Contains 
explicitLAC " + " but layout version ("
+                                    + journalVersion + ") is too old to hold 
this");
+                        }
+                    } else if (entryId < 0) {
+                        /*
+                         * this is possible if bookie code binary is rolledback
+                         * to older version but when it is trying to read
+                         * Journal which was created previously using newer
+                         * code/journalversion, which introduced new special
+                         * entry. So in anycase, if we see unrecognizable
+                         * special entry while replaying journal we should skip
+                         * (ignore) it.
+                         */
+                        LOG.warn("Read unrecognizable entryId: {} for ledger: 
{} while replaying Journal. Skipping it",
+                                entryId, ledgerId);
                     } else {
                         byte[] key = masterKeyCache.get(ledgerId);
                         if (key == null) {
@@ -1186,13 +1218,26 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
-    public void setExplicitLac(ByteBuf entry, Object ctx, byte[] masterKey)
+    static ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
+        ByteBuf bb = PooledByteBufAllocator.DEFAULT.directBuffer(8 + 8 + 4 + 
explicitLac.capacity());
+        bb.writeLong(ledgerId);
+        bb.writeLong(METAENTRY_ID_LEDGER_EXPLICITLAC);
+        bb.writeInt(explicitLac.capacity());
+        bb.writeBytes(explicitLac);
+        return bb;
+    }
+
+    public void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, 
Object ctx, byte[] masterKey)
             throws IOException, BookieException {
         try {
             long ledgerId = entry.getLong(entry.readerIndex());
             LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
             synchronized (handle) {
+                entry.markReaderIndex();
                 handle.setExplicitLac(entry);
+                entry.resetReaderIndex();
+                ByteBuf explicitLACEntry = createExplicitLACEntry(ledgerId, 
entry);
+                getJournal(ledgerId).logAddEntry(explicitLACEntry, false /* 
ackBeforeSync */, writeCallback, ctx);
             }
         } catch (NoWritableLedgerDirException e) {
             stateManager.transitionToReadOnlyMode();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index eb6d9e1..a5ddacf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -35,6 +35,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import org.apache.bookkeeper.common.util.Watchable;
 import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +75,13 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
      * The fingerprint of a ledger index file.
      */
     public static final int SIGNATURE = 
ByteBuffer.wrap("BKLE".getBytes(UTF_8)).getInt();
-    public static final int HEADER_VERSION = 0;
+
+    // No explicitLac
+    static final int V0 = 0;
+    // Adding explicitLac
+    static final int V1 = 1;
+    // current version of FileInfo header is V1
+    public static final int CURRENT_HEADER_VERSION = V1;
 
     static final long START_OF_DATA = 1024;
     private long size;
@@ -91,12 +98,16 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
     // file access mode
     protected String mode;
 
-    public FileInfo(File lf, byte[] masterKey) throws IOException {
+    // this FileInfo Header Version
+    int headerVersion;
+
+    public FileInfo(File lf, byte[] masterKey, int fileInfoVersionToWrite) 
throws IOException {
         super(WATCHER_RECYCLER);
 
         this.lf = lf;
         this.masterKey = masterKey;
         mode = "rw";
+        this.headerVersion = fileInfoVersionToWrite;
     }
 
     synchronized Long getLastAddConfirmed() {
@@ -182,6 +193,7 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("fileInfo:SetLac: {}", explicitLac);
             }
+            needFlushHeader = true;
         }
         setLastAddConfirmed(explicitLacValue);
     }
@@ -206,9 +218,11 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
                 throw new IOException("Missing ledger signature while reading 
header for " + lf);
             }
             int version = bb.getInt();
-            if (version != HEADER_VERSION) {
+            if (version > CURRENT_HEADER_VERSION) {
                 throw new IOException("Incompatible ledger version " + version 
+ " while reading header for " + lf);
             }
+            this.headerVersion = version;
+
             int length = bb.getInt();
             if (length < 0) {
                 throw new IOException("Length " + length + " is invalid while 
reading header for " + lf);
@@ -218,6 +232,25 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
             masterKey = new byte[length];
             bb.get(masterKey);
             stateBits = bb.getInt();
+
+            if (this.headerVersion >= V1) {
+                int explicitLacBufLength = bb.getInt();
+                if (explicitLacBufLength == 0) {
+                    explicitLac = null;
+                } else if (explicitLacBufLength >= 
DigestManager.LAC_METADATA_LENGTH) {
+                    if (explicitLac == null) {
+                        explicitLac = 
ByteBuffer.allocate(explicitLacBufLength);
+                    }
+                    byte[] explicitLacBufArray = new 
byte[explicitLacBufLength];
+                    bb.get(explicitLacBufArray);
+                    explicitLac.put(explicitLacBufArray);
+                    explicitLac.rewind();
+                } else {
+                    throw new IOException("ExplicitLacBufLength " + 
explicitLacBufLength
+                            + " is invalid while reading header for " + lf);
+                }
+            }
+
             needFlushHeader = false;
         } else {
             throw new IOException("Ledger index file " + lf + " does not 
exist");
@@ -271,10 +304,20 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
     private void writeHeader() throws IOException {
         ByteBuffer bb = ByteBuffer.allocate((int) START_OF_DATA);
         bb.putInt(SIGNATURE);
-        bb.putInt(HEADER_VERSION);
+        bb.putInt(this.headerVersion);
         bb.putInt(masterKey.length);
         bb.put(masterKey);
         bb.putInt(stateBits);
+        if (this.headerVersion >= V1) {
+            if (explicitLac != null) {
+                explicitLac.rewind();
+                bb.putInt(explicitLac.capacity());
+                bb.put(explicitLac);
+                explicitLac.rewind();
+            } else {
+                bb.putInt(0);
+            }
+        }
         bb.rewind();
         fc.position(0);
         fc.write(bb);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
index 31fd077..6beba6a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
@@ -35,9 +35,11 @@ class FileInfoBackingCache {
     final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     final ConcurrentLongHashMap<CachedFileInfo> fileInfos = new 
ConcurrentLongHashMap<>();
     final FileLoader fileLoader;
+    final int fileInfoVersionToWrite;
 
-    FileInfoBackingCache(FileLoader fileLoader) {
+    FileInfoBackingCache(FileLoader fileLoader, int fileInfoVersionToWrite) {
         this.fileLoader = fileLoader;
+        this.fileInfoVersionToWrite = fileInfoVersionToWrite;
     }
 
     /**
@@ -125,7 +127,7 @@ class FileInfoBackingCache {
         final AtomicInteger refCount;
 
         CachedFileInfo(long ledgerId, File lf, byte[] masterKey) throws 
IOException {
-            super(lf, masterKey);
+            super(lf, masterKey, fileInfoVersionToWrite);
             this.ledgerId = ledgerId;
             this.refCount = new AtomicInteger(0);
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index eb3b935..5375002 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -110,7 +110,8 @@ public class IndexPersistenceMgr {
 
         // build the file info cache
         int concurrencyLevel = Math.max(1, 
Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
-        fileInfoBackingCache = new 
FileInfoBackingCache(this::createFileInfoBackingFile);
+        fileInfoBackingCache = new 
FileInfoBackingCache(this::createFileInfoBackingFile,
+                conf.getFileInfoFormatVersionToWrite());
         RemovalListener<Long, CachedFileInfo> fileInfoEvictionListener = 
this::handleLedgerEviction;
         writeFileInfoCache = buildCache(
             concurrencyLevel,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1da0435..0715db4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -585,6 +585,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     private final boolean flushWhenQueueEmpty;
     // should we hint the filesystem to remove pages from cache after force 
write
     private final boolean removePagesFromCache;
+    private final int journalFormatVersionToWrite;
+    private final int journalAlignmentSize;
 
     // Should data be fsynced on disk before triggering the callback
     private final boolean syncData;
@@ -646,6 +648,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         this.maxGroupWaitInNanos = 
TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
         this.bufferedWritesThreshold = 
conf.getJournalBufferedWritesThreshold();
         this.bufferedEntriesThreshold = 
conf.getJournalBufferedEntriesThreshold();
+        this.journalFormatVersionToWrite = 
conf.getJournalFormatVersionToWrite();
+        this.journalAlignmentSize = conf.getJournalAlignmentSize();
         if (conf.getNumJournalCallbackThreads() > 0) {
             this.cbThreadPool = 
Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
                                                          new 
DefaultThreadFactory("bookie-journal-callback"));
@@ -926,8 +930,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         ByteBuf lenBuff = Unpooled.buffer(4);
         ByteBuf paddingBuff = Unpooled.buffer(2 * 
conf.getJournalAlignmentSize());
         paddingBuff.writeZero(paddingBuff.capacity());
-        final int journalFormatVersionToWrite = 
conf.getJournalFormatVersionToWrite();
-        final int journalAlignmentSize = conf.getJournalAlignmentSize();
+
         BufferedChannel bc = null;
         JournalChannel logFile = null;
         forceWriteThread.start();
@@ -1099,7 +1102,17 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 if (qe == null) { // no more queue entry
                     continue;
                 }
-                if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
+                if ((qe.entryId == Bookie.METAENTRY_ID_LEDGER_EXPLICITLAC)
+                        && (journalFormatVersionToWrite < JournalChannel.V6)) {
+                    /*
+                     * this means we are using new code which supports
+                     * persisting explicitLac, but 
"journalFormatVersionToWrite"
+                     * is set to some older value (< V6). In this case we
+                     * shouldn't write this special entry
+                     * (METAENTRY_ID_LEDGER_EXPLICITLAC) to Journal.
+                     */
+                    qe.entry.release();
+                } else if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
                     int entrySize = qe.entry.readableBytes();
                     journalWriteBytes.add(entrySize);
                     journalQueueSize.dec();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index 507c933..420bd07 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -66,11 +66,13 @@ class JournalChannel implements Closeable {
     // 1) expanding header to 512
     // 2) Padding writes to align sector size
     static final int V5 = 5;
+    // Adding explicitlac entry
+    public static final int V6 = 6;
 
     static final int HEADER_SIZE = SECTOR_SIZE; // align header to sector size
     static final int VERSION_HEADER_SIZE = 8; // 4byte magic word, 4 byte 
version
     static final int MIN_COMPAT_JOURNAL_FORMAT_VERSION = V1;
-    static final int CURRENT_JOURNAL_FORMAT_VERSION = V5;
+    static final int CURRENT_JOURNAL_FORMAT_VERSION = V6;
 
     private final long preAllocSize;
     private final int journalAlignSize;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java
index 7e9885c..9e41bad 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java
@@ -30,7 +30,12 @@ import java.io.IOException;
 class ReadOnlyFileInfo extends FileInfo {
 
     public ReadOnlyFileInfo(File lf, byte[] masterKey) throws IOException {
-        super(lf, masterKey);
+        /*
+         * For ReadOnlyFile it is okay to initialize FileInfo with
+         * CURRENT_HEADER_VERSION, when fileinfo.readHeader is called it would
+         * read actual header version.
+         */
+        super(lf, masterKey, FileInfo.CURRENT_HEADER_VERSION);
         mode = "r";
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d913dbb..7e14f2e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -66,6 +66,7 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     protected static final String PAGE_SIZE = "pageSize";
     protected static final String FILEINFO_CACHE_INITIAL_CAPACITY = 
"fileInfoCacheInitialCapacity";
     protected static final String FILEINFO_MAX_IDLE_TIME = 
"fileInfoMaxIdleTime";
+    protected static final String FILEINFO_FORMAT_VERSION_TO_WRITE = 
"fileInfoFormatVersionToWrite";
     // Journal Parameters
     protected static final String MAX_JOURNAL_SIZE = "journalMaxSizeMB";
     protected static final String MAX_BACKUP_JOURNALS = "journalMaxBackups";
@@ -547,6 +548,27 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Get fileinfo format version to write.
+     *
+     * @return fileinfo format version to write.
+     */
+    public int getFileInfoFormatVersionToWrite() {
+        return this.getInt(FILEINFO_FORMAT_VERSION_TO_WRITE, 0);
+    }
+
+    /**
+     * Set fileinfo format version to write.
+     *
+     * @param version
+     *            fileinfo format version to write.
+     * @return server configuration.
+     */
+    public ServerConfiguration setFileInfoFormatVersionToWrite(int version) {
+        this.setProperty(FILEINFO_FORMAT_VERSION_TO_WRITE, version);
+        return this;
+    }
+
+    /**
      * Max journal file size.
      *
      * @return max journal file size
@@ -2432,6 +2454,10 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
             throw new ConfigurationException(
                     "When entryLogPerLedger is enabled , it is unnecessary to 
use transactional compaction");
         }
+        if ((getJournalFormatVersionToWrite() >= 6) ^ 
(getFileInfoFormatVersionToWrite() >= 1)) {
+            throw new ConfigurationException("For persisiting explicitLac, 
journalFormatVersionToWrite should be >= 6"
+                    + "and FileInfoFormatVersionToWrite should be >= 1");
+        }
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index c0866f3..2f018ff 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
@@ -66,12 +67,45 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 
implements Runnable {
             return writeLacResponse.build();
         }
 
+        BookkeeperInternalCallbacks.WriteCallback writeCallback = new 
BookkeeperInternalCallbacks.WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                if (BookieProtocol.EOK == rc) {
+                    
requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+                            TimeUnit.NANOSECONDS);
+                } else {
+                    
requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+                            TimeUnit.NANOSECONDS);
+                }
+
+                StatusCode status;
+                switch (rc) {
+                case BookieProtocol.EOK:
+                    status = StatusCode.EOK;
+                    break;
+                case BookieProtocol.EIO:
+                    status = StatusCode.EIO;
+                    break;
+                default:
+                    status = StatusCode.EUA;
+                    break;
+                }
+                writeLacResponse.setStatus(status);
+                Response.Builder response = Response.newBuilder()
+                        .setHeader(getHeader())
+                        .setStatus(writeLacResponse.getStatus())
+                        .setWriteLacResponse(writeLacResponse);
+                Response resp = response.build();
+                sendResponse(status, resp, 
requestProcessor.writeLacRequestStats);
+            }
+        };
+
         StatusCode status = null;
         ByteBuffer lacToAdd = writeLacRequest.getBody().asReadOnlyByteBuffer();
         byte[] masterKey = writeLacRequest.getMasterKey().toByteArray();
 
         try {
-            
requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), 
channel, masterKey);
+            
requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), 
writeCallback, channel, masterKey);
             status = StatusCode.EOK;
         } catch (IOException e) {
             logger.error("Error saving lac {} for ledger:{}",
@@ -90,15 +124,13 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 
implements Runnable {
 
         // If everything is okay, we return null so that the calling function
         // dosn't return a response back to the caller.
-        if (status.equals(StatusCode.EOK)) {
-            
requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
-        } else {
+        if (!status.equals(StatusCode.EOK)) {
             
requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
+            writeLacResponse.setStatus(status);
+            return writeLacResponse.build();
         }
-        writeLacResponse.setStatus(status);
-        return writeLacResponse.build();
+        return null;
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 36940ba..1a0342b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -78,7 +78,7 @@ public class BookieJournalTest {
             throws Exception {
         File fn = new File(indexDir, 
IndexPersistenceMgr.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
-        FileInfo fi = new FileInfo(fn, masterKey);
+        FileInfo fi = new FileInfo(fn, masterKey, 
FileInfo.CURRENT_HEADER_VERSION);
         // force creation of index file
         fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
         fi.close(true);
@@ -89,7 +89,7 @@ public class BookieJournalTest {
             throws Exception {
         File fn = new File(indexDir, 
IndexPersistenceMgr.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
-        FileInfo fi = new FileInfo(fn, masterKey);
+        FileInfo fi = new FileInfo(fn, masterKey, 
FileInfo.CURRENT_HEADER_VERSION);
         // force creation of index file
         fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
         fi.close(true);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 1260b11..70f2a0e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -27,10 +27,22 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+
 import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+
 import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.SnapshotMap;
@@ -38,16 +50,12 @@ import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Test cases for IndexPersistenceMgr.
  */
 public class IndexPersistenceMgrTest {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(IndexPersistenceMgr.class);
-
     ServerConfiguration conf;
     File journalDir, ledgerDir;
     LedgerDirsManager ledgerDirsManager;
@@ -301,4 +309,111 @@ public class IndexPersistenceMgrTest {
             }
         }
     }
+
+    /*
+     * In this testcase index files (FileInfos) are precreated with different
+     * FileInfo header versions (FileInfo.V0 and FileInfo.V1) and it is
+     * validated that the current implementation of IndexPersistenceMgr (and
+     * corresponding FileInfo) is able to function as per the specifications of
+     * FileInfo header version. If it is FileInfo.V0 then explicitLac is not
+     * persisted and if it is FileInfo.V1 then explicitLac is persisted.
+     */
+    @Test
+    public void testFileInfosOfVariousHeaderVersions() throws Exception {
+        IndexPersistenceMgr indexPersistenceMgr = null;
+        try {
+            indexPersistenceMgr = createIndexPersistenceManager(1);
+            long ledgerIdWithVersionZero = 25L;
+            validateFileInfo(indexPersistenceMgr, ledgerIdWithVersionZero, 
FileInfo.V0);
+
+            long ledgerIdWithVersionOne = 135L;
+            validateFileInfo(indexPersistenceMgr, ledgerIdWithVersionOne, 
FileInfo.V1);
+        } finally {
+            if (null != indexPersistenceMgr) {
+                indexPersistenceMgr.close();
+            }
+        }
+    }
+
+    void validateFileInfo(IndexPersistenceMgr indexPersistenceMgr, long 
ledgerId, int headerVersion)
+            throws IOException, GeneralSecurityException {
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        boolean getUseV2WireProtocol = true;
+
+        preCreateFileInfoForLedger(ledgerId, headerVersion);
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, 
masterKey,
+                BookKeeper.DigestType.toProtoDigestType(digestType), 
getUseV2WireProtocol);
+
+        CachedFileInfo fileInfo = indexPersistenceMgr.getFileInfo(ledgerId, 
masterKey);
+        fileInfo.readHeader();
+        assertEquals("ExplicitLac should be null", null, 
fileInfo.getExplicitLac());
+        assertEquals("Header Version should match with precreated fileinfos 
headerversion", headerVersion,
+                fileInfo.headerVersion);
+        assertTrue("Masterkey should match with precreated fileinfos 
masterkey",
+                Arrays.equals(masterKey, fileInfo.masterKey));
+        long explicitLac = 22;
+        ByteBuf explicitLacByteBuf = 
digestManager.computeDigestAndPackageForSendingLac(explicitLac).getBuffer(0);
+        explicitLacByteBuf.markReaderIndex();
+        indexPersistenceMgr.setExplicitLac(ledgerId, explicitLacByteBuf);
+        explicitLacByteBuf.resetReaderIndex();
+        assertEquals("explicitLac ByteBuf contents should match", 0,
+                ByteBufUtil.compare(explicitLacByteBuf, 
indexPersistenceMgr.getExplicitLac(ledgerId)));
+        /*
+         * release fileInfo untill it is marked dead and closed, so that
+         * contents of it are persisted.
+         */
+        while (fileInfo.refCount.get() != FileInfoBackingCache.DEAD_REF) {
+            fileInfo.release();
+        }
+        /*
+         * reopen the fileinfo and readHeader, so that whatever was persisted
+         * would be read.
+         */
+        fileInfo = indexPersistenceMgr.getFileInfo(ledgerId, masterKey);
+        fileInfo.readHeader();
+        assertEquals("Header Version should match with precreated fileinfos 
headerversion even after reopening",
+                headerVersion, fileInfo.headerVersion);
+        assertTrue("Masterkey should match with precreated fileinfos 
masterkey",
+                Arrays.equals(masterKey, fileInfo.masterKey));
+        if (headerVersion == FileInfo.V0) {
+            assertEquals("Since it is V0 Header, explicitLac will not be 
persisted and should be null after reopening",
+                    null, indexPersistenceMgr.getExplicitLac(ledgerId));
+        } else {
+            explicitLacByteBuf.resetReaderIndex();
+            assertEquals("Since it is V1 Header, explicitLac will be persisted 
and should not be null after reopening",
+                    0, ByteBufUtil.compare(explicitLacByteBuf, 
indexPersistenceMgr.getExplicitLac(ledgerId)));
+        }
+    }
+
+    void preCreateFileInfoForLedger(long ledgerId, int headerVersion) throws 
IOException {
+        File ledgerCurDir = Bookie.getCurrentDirectory(ledgerDir);
+        String ledgerName = IndexPersistenceMgr.getLedgerName(ledgerId);
+        File indexFile = new File(ledgerCurDir, ledgerName);
+        indexFile.getParentFile().mkdirs();
+        indexFile.createNewFile();
+        /*
+         * precreate index file (FileInfo) for the ledger with specified
+         * headerversion. Even in FileInfo.V1 case, it is valid for
+         * explicitLacBufLength to be 0. If it is 0, then explicitLac is
+         * considered null (not set).
+         */
+        try (RandomAccessFile raf = new RandomAccessFile(indexFile, "rw")) {
+            FileChannel fcForIndexFile = raf.getChannel();
+            ByteBuffer bb = ByteBuffer.allocate((int) FileInfo.START_OF_DATA);
+            bb.putInt(FileInfo.SIGNATURE);
+            bb.putInt(headerVersion);
+            bb.putInt(masterKey.length);
+            bb.put(masterKey);
+            // statebits
+            bb.putInt(0);
+            if (headerVersion == FileInfo.V1) {
+                // explicitLacBufLength
+                bb.putInt(0);
+            }
+            bb.rewind();
+            fcForIndexFile.position(0);
+            fcForIndexFile.write(bb);
+            fcForIndexFile.close();
+        }
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
index 67ec985..697d7c0 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
@@ -22,9 +22,19 @@ package org.apache.bookkeeper.bookie;
 
 import static org.junit.Assert.assertEquals;
 
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.TestUtils;
 import org.junit.Test;
 
 /**
@@ -54,4 +64,213 @@ public class LedgerStorageTest extends 
BookKeeperClusterTestCase {
 
         counter.await();
     }
+
+    @Test
+    public void testExplicitLacWriteToJournalWithValidVersions() throws 
Exception {
+        /*
+         * to persist explicitLac, journalFormatVersionToWrite should be 
atleast
+         * V6 and fileInfoFormatVersionToWrite should be atleast V1
+         */
+        testExplicitLacWriteToJournal(6, 1);
+    }
+
+    @Test
+    public void testExplicitLacWriteToJournalWithOlderVersions() throws 
Exception {
+        /*
+         * to persist explicitLac, journalFormatVersionToWrite should be 
atleast
+         * V6 and fileInfoFormatVersionToWrite should be atleast V1
+         */
+        testExplicitLacWriteToJournal(5, 0);
+    }
+
+    public void testExplicitLacWriteToJournal(int journalFormatVersionToWrite, 
int fileInfoFormatVersionToWrite)
+            throws Exception {
+        ServerConfiguration bookieServerConfig = bsConfs.get(0);
+        
bookieServerConfig.setJournalFormatVersionToWrite(journalFormatVersionToWrite);
+        
bookieServerConfig.setFileInfoFormatVersionToWrite(fileInfoFormatVersionToWrite);
+
+        restartBookies(bookieServerConfig);
+
+        ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
+        
confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        /*
+         * enable explicitLacFlush by setting non-zero value for
+         * explictLacInterval
+         */
+        int explictLacInterval = 100;
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        byte[] passwdBytes = "testPasswd".getBytes();
+        confWithExplicitLAC.setExplictLacInterval(explictLacInterval);
+
+        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
+
+        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(1, 1, 1, 
digestType, passwdBytes);
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, 
digestType, passwdBytes);
+
+        assertEquals("LAC of rlh", (long) numOfEntries - 2, 
rlh.getLastAddConfirmed());
+        assertEquals("Read explicit LAC of rlh", (long) numOfEntries - 2, 
rlh.readExplicitLastConfirmed());
+
+        /*
+         * we need to wait for atleast 2 explicitlacintervals, since in
+         * writehandle for the first call lh.getExplicitLastAddConfirmed() will
+         * be < lh.getPiggyBackedLastAddConfirmed(), so it wont make explicit
+         * writelac in the first run
+         */
+        long readExplicitLastConfirmed = 
TestUtils.waitUntilExplicitLacUpdated(rlh, numOfEntries - 1);
+        assertEquals("Read explicit LAC of rlh after wait for 
explicitlacflush", (numOfEntries - 1),
+                readExplicitLastConfirmed);
+
+        ServerConfiguration newBookieConf = new 
ServerConfiguration(bsConfs.get(0));
+        /*
+         * by reusing bookieServerConfig and setting metadataServiceUri to null
+         * we can create/start new Bookie instance using the same data
+         * (journal/ledger/index) of the existing BookeieServer for our testing
+         * purpose.
+         */
+        newBookieConf.setMetadataServiceUri(null);
+        Bookie newbookie = new Bookie(newBookieConf);
+        /*
+         * since 'newbookie' uses the same data as original Bookie, it should 
be
+         * able to read journal of the original bookie and hence explicitLac 
buf
+         * entry written to Journal in the original bookie.
+         */
+        newbookie.readJournal();
+        ByteBuf explicitLacBuf = newbookie.getExplicitLac(ledgerId);
+
+        if ((journalFormatVersionToWrite >= 6) && 
(fileInfoFormatVersionToWrite >= 1)) {
+            DigestManager digestManager = DigestManager.instantiate(ledgerId, 
passwdBytes,
+                    BookKeeper.DigestType.toProtoDigestType(digestType), 
confWithExplicitLAC.getUseV2WireProtocol());
+            long explicitLacPersistedInJournal = 
digestManager.verifyDigestAndReturnLac(explicitLacBuf);
+            assertEquals("explicitLac persisted in journal", (numOfEntries - 
1), explicitLacPersistedInJournal);
+        } else {
+            assertEquals("explicitLac is not expected to be persisted, so it 
should be null", null, explicitLacBuf);
+        }
+        bkcWithExplicitLAC.close();
+    }
+
+    @Test
+    public void testExplicitLacWriteToFileInfoWithValidVersions() throws 
Exception {
+        /*
+         * to persist explicitLac, journalFormatVersionToWrite should be 
atleast
+         * V6 and fileInfoFormatVersionToWrite should be atleast V1
+         */
+        testExplicitLacWriteToFileInfo(6, 1);
+    }
+
+    @Test
+    public void testExplicitLacWriteToFileInfoWithOlderVersions() throws 
Exception {
+        /*
+         * to persist explicitLac, journalFormatVersionToWrite should be 
atleast
+         * V6 and fileInfoFormatVersionToWrite should be atleast V1
+         */
+        testExplicitLacWriteToFileInfo(5, 0);
+    }
+
+    public void testExplicitLacWriteToFileInfo(int 
journalFormatVersionToWrite, int fileInfoFormatVersionToWrite)
+            throws Exception {
+        ServerConfiguration bookieServerConfig = bsConfs.get(0);
+        
bookieServerConfig.setJournalFormatVersionToWrite(journalFormatVersionToWrite);
+        
bookieServerConfig.setFileInfoFormatVersionToWrite(fileInfoFormatVersionToWrite);
+
+        restartBookies(bookieServerConfig);
+
+        ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
+        
confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        /*
+         * enable explicitLacFlush by setting non-zero value for
+         * explictLacInterval
+         */
+        int explictLacInterval = 100;
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        byte[] passwdBytes = "testPasswd".getBytes();
+        confWithExplicitLAC.setExplictLacInterval(explictLacInterval);
+
+        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
+
+        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(1, 1, 1, 
digestType, passwdBytes);
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, 
digestType, passwdBytes);
+
+        assertEquals("LAC of rlh", (long) numOfEntries - 2, 
rlh.getLastAddConfirmed());
+        assertEquals("Read explicit LAC of rlh", (long) numOfEntries - 2, 
rlh.readExplicitLastConfirmed());
+
+        /*
+         * we need to wait for atleast 2 explicitlacintervals, since in
+         * writehandle for the first call lh.getExplicitLastAddConfirmed() will
+         * be < lh.getPiggyBackedLastAddConfirmed(), so it wont make explicit
+         * writelac in the first run
+         */
+        long readExplicitLastConfirmed = 
TestUtils.waitUntilExplicitLacUpdated(rlh, numOfEntries - 1);
+        assertEquals("Read explicit LAC of rlh after wait for 
explicitlacflush", (numOfEntries - 1),
+                readExplicitLastConfirmed);
+
+        /*
+         * flush ledgerStorage so that header of fileinfo is flushed.
+         */
+        bs.get(0).getBookie().ledgerStorage.flush();
+
+        ReadOnlyFileInfo fileInfo = getFileInfo(ledgerId, 
Bookie.getCurrentDirectories(bsConfs.get(0).getLedgerDirs()));
+        fileInfo.readHeader();
+        ByteBuf explicitLacBufReadFromFileInfo = fileInfo.getExplicitLac();
+
+        if ((journalFormatVersionToWrite >= 6) && 
(fileInfoFormatVersionToWrite >= 1)) {
+            DigestManager digestManager = DigestManager.instantiate(ledgerId, 
passwdBytes,
+                    BookKeeper.DigestType.toProtoDigestType(digestType), 
confWithExplicitLAC.getUseV2WireProtocol());
+            long explicitLacReadFromFileInfo = 
digestManager.verifyDigestAndReturnLac(explicitLacBufReadFromFileInfo);
+            assertEquals("explicitLac persisted in FileInfo", (numOfEntries - 
1), explicitLacReadFromFileInfo);
+        } else {
+            assertEquals("explicitLac is not expected to be persisted, so it 
should be null", null,
+                    explicitLacBufReadFromFileInfo);
+        }
+
+        bkcWithExplicitLAC.close();
+    }
+
+    /**
+     * Get the ledger file of a specified ledger.
+     *
+     * @param ledgerId Ledger Id
+     *
+     * @return file object.
+     */
+    private File getLedgerFile(long ledgerId, File[] indexDirectories) {
+        String ledgerName = IndexPersistenceMgr.getLedgerName(ledgerId);
+        File lf = null;
+        for (File d : indexDirectories) {
+            lf = new File(d, ledgerName);
+            if (lf.exists()) {
+                break;
+            }
+            lf = null;
+        }
+        return lf;
+    }
+
+    /**
+     * Get FileInfo for a specified ledger.
+     *
+     * @param ledgerId Ledger Id
+     * @return read only file info instance
+     */
+    ReadOnlyFileInfo getFileInfo(long ledgerId, File[] indexDirectories) 
throws IOException {
+        File ledgerFile = getLedgerFile(ledgerId, indexDirectories);
+        if (null == ledgerFile) {
+            throw new FileNotFoundException("No index file found for ledger " 
+ ledgerId
+                    + ". It may be not flushed yet.");
+        }
+        ReadOnlyFileInfo fi = new ReadOnlyFileInfo(ledgerFile, null);
+        fi.readHeader();
+        return fi;
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestFileInfoBackingCache.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestFileInfoBackingCache.java
index 489db74..77f5eba 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestFileInfoBackingCache.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestFileInfoBackingCache.java
@@ -89,7 +89,7 @@ public class TestFileInfoBackingCache {
                     File f = new File(baseDir, String.valueOf(ledgerId));
                     f.deleteOnExit();
                     return f;
-                });
+                }, FileInfo.CURRENT_HEADER_VERSION);
         CachedFileInfo fi = cache.loadFileInfo(1, masterKey);
         Assert.assertEquals(fi.getRefCount(), 1);
         CachedFileInfo fi2 = cache.loadFileInfo(2, masterKey);
@@ -116,7 +116,7 @@ public class TestFileInfoBackingCache {
                 (ledgerId, createIfNotFound) -> {
                     Assert.assertFalse(createIfNotFound);
                     throw new Bookie.NoLedgerException(ledgerId);
-                });
+                }, FileInfo.CURRENT_HEADER_VERSION);
         cache.loadFileInfo(1, null);
     }
 
@@ -135,7 +135,7 @@ public class TestFileInfoBackingCache {
                     File f = new File(baseDir, String.valueOf(ledgerId));
                     f.deleteOnExit();
                     return f;
-                });
+                }, FileInfo.CURRENT_HEADER_VERSION);
         Iterable<Future<Set<CachedFileInfo>>> futures =
             IntStream.range(0, numRunners).mapToObj(
                     (i) -> {
@@ -194,7 +194,7 @@ public class TestFileInfoBackingCache {
                     File f = new File(baseDir, String.valueOf(ledgerId));
                     f.deleteOnExit();
                     return f;
-                });
+                }, FileInfo.CURRENT_HEADER_VERSION);
 
         Iterable<Future<Set<CachedFileInfo>>> futures =
             IntStream.range(0, 2).mapToObj(
@@ -239,7 +239,7 @@ public class TestFileInfoBackingCache {
                     File f = new File(baseDir, String.valueOf(ledgerId));
                     f.deleteOnExit();
                     return f;
-                });
+                }, FileInfo.CURRENT_HEADER_VERSION);
 
         Cache<Long, CachedFileInfo> guavaCache = CacheBuilder.newBuilder()
             .maximumSize(1)
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index e65d39e..0ddd9a6 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -67,7 +67,7 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
 
         File fn = new File(dir, IndexPersistenceMgr.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
-        FileInfo fi = new FileInfo(fn, masterKey);
+        FileInfo fi = new FileInfo(fn, masterKey, 
FileInfo.CURRENT_HEADER_VERSION);
         // force creation of index file
         fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
         fi.close(true);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
index 256f051..5c5c24b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.bookkeeper.client;
 
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -33,6 +35,7 @@ import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.TestUtils;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -50,6 +53,12 @@ public class ExplicitLacTest extends 
BookKeeperClusterTestCase {
         super(1);
         this.digestType = DigestType.CRC32;
         baseConf.setLedgerStorageClass(storageClass.getName());
+        /*
+         * to persist explicitLac, journalFormatVersionToWrite should be 
atleast
+         * V6 and fileInfoFormatVersionToWrite should be atleast V1
+         */
+        baseConf.setJournalFormatVersionToWrite(6);
+        baseConf.setFileInfoFormatVersionToWrite(1);
     }
 
     @Parameters
@@ -125,6 +134,66 @@ public class ExplicitLacTest extends 
BookKeeperClusterTestCase {
     }
 
     @Test
+    public void testExplicitLACIsPersisted() throws Exception {
+        /*
+         * In DbLedgerStorage scenario, TransientLedgerInfo is not persisted -
+         * https://github.com/apache/bookkeeper/issues/1533.
+         *
+         * So for this testcase we are ignoring DbLedgerStorage. It can/should
+         * be enabled when Issue-1533 is fixed.
+         */
+        
Assume.assumeTrue(!baseConf.getLedgerStorageClass().equals(DbLedgerStorage.class.getName()));
+        ClientConfiguration confWithNoExplicitLAC = new ClientConfiguration();
+        
confWithNoExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        // enable explicitLacFlush by setting non-zero value for
+        // explictLacInterval
+        long explictLacInterval = 100;
+        confWithNoExplicitLAC.setExplictLacInterval(50);
+
+        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithNoExplicitLAC);
+
+        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(1, 1, 1, 
digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, 
digestType, "testPasswd".getBytes());
+        assertEquals("LAC of rlh", (long) numOfEntries - 2, 
rlh.getLastAddConfirmed());
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        assertEquals("LAC of wlh", (2 * numOfEntries - 1), 
wlh.getLastAddConfirmed());
+        assertEquals("LAC of rlh", (long) numOfEntries - 2, 
rlh.getLastAddConfirmed());
+        assertEquals("Read LAC of rlh", (2 * numOfEntries - 2), 
rlh.readLastAddConfirmed());
+        assertEquals("Read explicit LAC of rlh", (2 * numOfEntries - 2), 
rlh.readExplicitLastConfirmed());
+
+        // we need to wait for atleast 2 explicitlacintervals,
+        // since in writehandle for the first call
+        // lh.getExplicitLastAddConfirmed() will be <
+        // lh.getPiggyBackedLastAddConfirmed(),
+        // so it wont make explicit writelac in the first run
+        long readExplicitLastConfirmed = 
TestUtils.waitUntilExplicitLacUpdated(rlh, 2 * numOfEntries - 1);
+        assertEquals("Read explicit LAC of rlh after wait for 
explicitlacflush", (2 * numOfEntries - 1),
+                readExplicitLastConfirmed);
+
+        // bookies have to be restarted
+        restartBookies();
+
+        /*
+         * since explicitLac is persisted we should be able to read explicitLac
+         * from the bookies.
+         */
+        LedgerHandle rlh2 = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, 
digestType, "testPasswd".getBytes());
+        assertEquals("Read explicit LAC of rlh2 after bookies restart", (2 * 
numOfEntries - 1),
+                rlh2.readExplicitLastConfirmed());
+        bkcWithExplicitLAC.close();
+    }
+
+    @Test
     public void testReadHandleWithExplicitLAC() throws Exception {
         ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
         
confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
index ed29d71..424202d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
@@ -83,4 +83,32 @@ public class TestServerConfiguration {
         assertArrayEquals(components, serverConf.getExtraServerComponents());
     }
 
+    @Test(expected = ConfigurationException.class)
+    public void testMismatchofJournalAndFileInfoVersionsOlderJournalVersion() 
throws ConfigurationException {
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setJournalFormatVersionToWrite(5);
+        conf.setFileInfoFormatVersionToWrite(1);
+        conf.validate();
+    }
+
+    @Test(expected = ConfigurationException.class)
+    public void testMismatchofJournalAndFileInfoVersionsOlderFileInfoVersion() 
throws ConfigurationException {
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setJournalFormatVersionToWrite(6);
+        conf.setFileInfoFormatVersionToWrite(0);
+        conf.validate();
+    }
+
+    @Test
+    public void testValidityOfJournalAndFileInfoVersions() throws 
ConfigurationException {
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setJournalFormatVersionToWrite(5);
+        conf.setFileInfoFormatVersionToWrite(0);
+        conf.validate();
+
+        conf = new ServerConfiguration();
+        conf.setJournalFormatVersionToWrite(6);
+        conf.setFileInfoFormatVersionToWrite(1);
+        conf.validate();
+    }
 }
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 7a0f361..08d97a9 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -278,9 +278,13 @@ journalDirectories=/tmp/bk-txn
 # 3: ledger key was introduced
 # 4: fencing key was introduced
 # 5: expanding header to 512 and padding writes to align sector size 
configured by `journalAlignmentSize`
+# 6: persisting explicitLac is introduced
 # By default, it is `4`. If you'd like to enable `padding-writes` feature, you 
can set journal version to `5`.
 # You can disable `padding-writes` by setting journal version back to `4`. 
This feature is available in 4.5.0
 # and onward versions.
+# If you'd like to enable persisting ExplicitLac, you can set this config to 6 
and also 
+# fileInfoFormatVersionToWrite should be atleast 1. If there is mismatch then 
the serverconfig is considered 
+# invalid.
 # journalFormatVersionToWrite=4
 
 # Max file size of journal file, in mega bytes
@@ -592,6 +596,15 @@ ledgerDirectories=/tmp/bk-data
 # only when opened files reached openFileLimit. The default value is 0.
 # fileInfoMaxIdleTime=0
 
+# The fileinfo format version to write.
+#  Available formats are 0-1:
+#   0: Initial version
+#   1: persisting explicitLac is introduced
+#  By default, it is `0`. If you'd like to enable persisting ExplicitLac, you 
can set
+#  this config to 1 and also journalFormatVersionToWrite should be atleast 6. 
If 
+#  there is mismatch then the serverconfig is considered invalid.
+# fileInfoFormatVersionToWrite = 0
+
 # Size of a index page in ledger cache, in bytes
 # A larger index page can improve performance writing page to disk,
 # which is efficent when you have small number of ledgers and these
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index ac24fce..afd0792 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -198,9 +198,11 @@ groups:
        3: ledger key was introduced
        4: fencing key was introduced
        5: expanding header to 512 and padding writes to align sector size 
configured by `journalAlignmentSize`
+       6: persisting explicitLac is introduced
 
       By default, it is `4`. If you'd like to enable `padding-writes` feature, 
you can set journal version to `5`.
       You can disable `padding-writes` by setting journal version back to `4`. 
This feature is available in 4.5.0 and onward versions.
+      If you'd like to enable persisting ExplicitLac, you can set this config 
to 6 and also fileInfoFormatVersionToWrite should be atleast 1. If there is 
mismatch then the serverconfig is considered invalid.
     default: 4
   - param: journalMaxSizeMB
     description: Max file size of journal file, in mega bytes. A new journal 
file will be created when the old one reaches the file size limitation.
@@ -413,6 +415,15 @@ groups:
       The max idle time allowed for an open file info existed in the file info 
cache. If the file info is idle for a long time, exceed the given time period. 
The file info will be
       evicted and closed. If the value is zero or negative, the file info is 
evicted only when opened files reached `openFileLimit`.
     default: 0
+  - param: fileInfoFormatVersionToWrite
+    description: |
+      The fileinfo format version to write.
+      Available formats are 0-1:
+       0: Initial version
+       1: persisting explicitLac is introduced
+
+      By default, it is `0`. If you'd like to enable persisting ExplicitLac, 
you can set this config to 1 and also journalFormatVersionToWrite should be 
atleast 6. If there is mismatch then the serverconfig is considered invalid.
+    default: 0
   - param: pageSize
     description: |
       Size of a index page in ledger cache, in bytes. A larger index page can 
improve performance writing page to disk, which is efficent when you have small 
number of ledgers and these ledgers have similar number of entries. If you have 
large number of ledgers and each ledger has fewer entries, smaller index page 
would improve memory usage.

Reply via email to