eolivelli closed pull request #531: Issue-530 BP-14 BP-14 Relax Durability - 
protocol changes preview
URL: https://github.com/apache/bookkeeper/pull/531
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 9c52788eb..fb939da2a 100644
--- 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -28,6 +28,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -54,7 +55,7 @@
     static class LatencyCallback implements WriteCallback {
         boolean complete;
         @Override
-        public synchronized void writeComplete(int rc, long ledgerId, long 
entryId,
+        public synchronized void writeComplete(int rc, long ledgerId, long 
entryId, long lastAddSyncedEntryId,
                 BookieSocketAddress addr, Object ctx) {
             if (rc != 0) {
                 LOG.error("Got error " + rc);
@@ -75,7 +76,7 @@ public synchronized void waitForComplete() throws 
InterruptedException {
     static class ThroughputCallback implements WriteCallback {
         int count;
         int waitingCount = Integer.MAX_VALUE;
-        public synchronized void writeComplete(int rc, long ledgerId, long 
entryId,
+        public synchronized void writeComplete(int rc, long ledgerId, long 
entryId, long lastAddSyncedEnryId,
                 BookieSocketAddress addr, Object ctx) {
             if (rc != 0) {
                 LOG.error("Got error " + rc);
@@ -175,7 +176,7 @@ public static void main(String[] args)
             toSend.writeLong(entry);
             toSend.writerIndex(toSend.capacity());
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new 
byte[20],
-                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE, 
LedgerType.PD_JOURNAL);
         }
         LOG.info("Waiting for warmup");
         tc.waitFor(warmUpCount);
@@ -193,7 +194,7 @@ public static void main(String[] args)
             toSend.writerIndex(toSend.capacity());
             lc.resetComplete();
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new 
byte[20],
-                        entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
+                        entry, toSend, lc, null, BookieProtocol.FLAG_NONE, 
LedgerType.PD_JOURNAL);
             lc.waitForComplete();
         }
         long endTime = System.nanoTime();
@@ -213,7 +214,7 @@ public static void main(String[] args)
             toSend.writeLong(entry);
             toSend.writerIndex(toSend.capacity());
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new 
byte[20],
-                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE, 
LedgerType.PD_JOURNAL);
         }
         tc.waitFor(entryCount);
         endTime = System.currentTimeMillis();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 220aa4cb0..c15d1de8c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -77,6 +77,7 @@
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNS;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
@@ -216,7 +217,7 @@ public long getEntry() {
     // Write Callback do nothing
     static class NopWriteCallback implements WriteCallback {
         @Override
-        public void writeComplete(int rc, long ledgerId, long entryId,
+        public void writeComplete(int rc, long ledgerId, long entryId, long 
lastAddSyncedEntryId,
                                   BookieSocketAddress addr, Object ctx) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
@@ -1378,7 +1379,7 @@ private Journal getJournal(long ledgerId) {
     /**
      * Add an entry to a ledger as specified by handle.
      */
-    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, 
WriteCallback cb, Object ctx)
+    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, 
short ledgerType, WriteCallback cb, Object ctx)
             throws IOException, BookieException {
         long ledgerId = handle.getLedgerId();
         long entryId = handle.addEntry(entry);
@@ -1388,7 +1389,7 @@ private void addEntryInternal(LedgerDescriptor handle, 
ByteBuf entry, WriteCallb
         if (LOG.isTraceEnabled()) {
             LOG.trace("Adding {}@{}", entryId, ledgerId);
         }
-        getJournal(ledgerId).logAddEntry(entry, cb, ctx);
+        getJournal(ledgerId).logAddEntry(entry, ledgerType, cb, ctx);
     }
 
     /**
@@ -1406,7 +1407,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback 
cb, Object ctx, byte[]
             LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
             synchronized (handle) {
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, cb, ctx);
+                addEntryInternal(handle, entry, 
BookieProtocol.LEDGERTYPE_PD_JOURNAL, cb, ctx);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
@@ -1453,7 +1454,7 @@ public ByteBuf getExplicitLac(long ledgerId) throws 
IOException, Bookie.NoLedger
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
      */
-    public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] 
masterKey)
+    public void addEntry(ByteBuf entry, short ledgerType, WriteCallback cb, 
Object ctx, byte[] masterKey)
             throws IOException, BookieException.LedgerFencedException, 
BookieException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
@@ -1466,7 +1467,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb, 
Object ctx, byte[] masterK
                             
.create(BookieException.Code.LedgerFencedException);
                 }
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, cb, ctx);
+                addEntryInternal(handle, entry, ledgerType, cb, ctx);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
@@ -1491,7 +1492,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb, 
Object ctx, byte[] masterK
         SettableFuture<Boolean> result = SettableFuture.create();
 
         @Override
-        public void writeComplete(int rc, long ledgerId, long entryId,
+        public void writeComplete(int rc, long ledgerId, long entryId, long 
lastAddSyncedEntryId,
                                   BookieSocketAddress addr, Object ctx) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
@@ -1560,7 +1561,7 @@ public Observable waitForLastAddConfirmedUpdate(long 
ledgerId, long previoisLAC,
         int count;
 
         @Override
-        public synchronized void writeComplete(int rc, long l, long e, 
BookieSocketAddress addr, Object ctx) {
+        public synchronized void writeComplete(int rc, long l, long e, long 
las, BookieSocketAddress addr, Object ctx) {
             count--;
             if (count == 0) {
                 notifyAll();
@@ -1680,7 +1681,7 @@ public static void main(String[] args)
             buff.writeLong(1);
             buff.writeLong(i);
             cb.incCount();
-            b.addEntry(buff, cb, null, new byte[0]);
+            b.addEntry(buff, BookieProtocol.LEDGERTYPE_PD_JOURNAL, cb, null, 
new byte[0]);
         }
         cb.waitZero();
         long end = MathUtils.now();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index e2b1e798d..dc15bae7b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -39,6 +39,7 @@
 import java.util.concurrent.TimeUnit;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -293,7 +294,9 @@ public void run() {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, 
entryId);
             }
             
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime),
 TimeUnit.NANOSECONDS);
-            cb.writeComplete(0, ledgerId, entryId, null, ctx);
+            final long dummyLastAddSyncedEnryId = -1;
+            // TODO: lastAddSynced will be implemented in next patches for 
BP-14
+            cb.writeComplete(0, ledgerId, entryId, dummyLastAddSyncedEnryId, 
null, ctx);
         }
     }
 
@@ -752,19 +755,21 @@ public boolean accept(long journalId) {
     }
 
     public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
-        logAddEntry(Unpooled.wrappedBuffer(entry), cb, ctx);
+        logAddEntry(Unpooled.wrappedBuffer(entry), 
BookieProtocol.LEDGERTYPE_PD_JOURNAL, cb, ctx);
     }
 
     /**
      * record an add entry operation in journal.
      */
-    public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) {
+    public void logAddEntry(ByteBuf entry, short ledgerType, WriteCallback cb, 
Object ctx) {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
         journalQueueSize.inc();
 
         //Retain entry until it gets written to journal
         entry.retain();
+
+        // TODO ledgerType will be taken into account in next patches for BP-14
         queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, 
MathUtils.nowInNano()));
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index af55f6a73..138103eda 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -28,6 +28,7 @@
 import java.util.Observable;
 import java.util.Observer;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,7 +123,8 @@ ByteBuf getExplicitLac() {
             result = logFenceResult = SettableFuture.create();
         }
         ByteBuf entry = createLedgerFenceEntry(ledgerId);
-        journal.logAddEntry(entry, (rc, ledgerId, entryId, addr, ctx) -> {
+        journal.logAddEntry(entry, BookieProtocol.LEDGERTYPE_PD_JOURNAL,
+            (rc, ledgerId, entryId, lastAddSyncedEntryId,addr, ctx) -> {
             LOG.debug("Record fenced state for ledger {} in journal with rc 
{}", ledgerId, rc);
             if (rc == 0) {
                 fenceEntryPersisted.compareAndSet(false, true);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 376d716bb..729fc3fd4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -29,6 +29,7 @@
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -84,7 +85,8 @@
     LedgerCreateOp(BookKeeper bk, int ensembleSize, int writeQuorumSize, int 
ackQuorumSize, DigestType digestType,
             byte[] passwd, CreateCallback cb, Object ctx, final Map<String, 
byte[]> customMetadata) {
         this.bk = bk;
-        this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, 
ackQuorumSize, digestType, passwd, customMetadata);
+        this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, 
ackQuorumSize, digestType, passwd, customMetadata,
+                LedgerType.PD_JOURNAL);
         this.digestType = digestType;
         this.passwd = passwd;
         this.cb = cb;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index fe1104a17..4de48b133 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -31,6 +31,7 @@
 import java.util.Set;
 
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -291,7 +292,7 @@ public void readComplete(int rc, LedgerHandle lh,
                         new WriteCallback() {
                             @Override
                             public void writeComplete(int rc, long ledgerId,
-                                    long entryId, BookieSocketAddress addr,
+                                    long entryId, long lastAddSyncedEntryId, 
BookieSocketAddress addr,
                                     Object ctx) {
                                 if (rc != BKException.Code.OK) {
                                     LOG.error(
@@ -317,7 +318,9 @@ public void writeComplete(int rc, long ledgerId,
                                 ledgerFragmentEntryMcb.processResult(rc, null,
                                         null);
                             }
-                        }, null, BookieProtocol.FLAG_RECOVERY_ADD);
+                            // TODO: using LedgerType.PD_JOURNAL as we want 
recovery to be guaranteed to succeeed
+                            //       could this be a problem for other 
LedgerTypes ?
+                        }, null, BookieProtocol.FLAG_RECOVERY_ADD, 
LedgerType.PD_JOURNAL);
             }
         }, null);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 7c81d6d92..f273d4d96 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -51,11 +51,13 @@
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
@@ -101,6 +103,9 @@
     final Counter lacUpdateHitsCounter;
     final Counter lacUpdateMissesCounter;
 
+    // TODO: use BP-15 API to set ledgerType
+    final LedgerType ledgerType = LedgerType.PD_JOURNAL;
+
     // This empty master key is used when an empty password is provided which 
is the hash of an empty string
     private final static byte[] emptyLedgerKey;
     static {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 55e3987b8..1142ff124 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -42,7 +42,10 @@
 import static com.google.common.base.Charsets.UTF_8;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import java.util.Collections;
+import org.apache.bookkeeper.client.api.LedgerType;
 
 /**
  * This class encapsulates all the ledger metadata that is persistently stored
@@ -74,6 +77,7 @@
     private long length;
     private long lastEntryId;
     private long ctime;
+    private LedgerType ledgerType;
 
     private LedgerMetadataFormat.State state;
     private SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles =
@@ -85,14 +89,17 @@
     private LedgerMetadataFormat.DigestType digestType;
     private byte[] password;
 
-    private Map<String, byte[]> customMetadata = Maps.newHashMap();
+    private Map<String, byte[]> customMetadata;
 
     public LedgerMetadata(int ensembleSize, int writeQuorumSize, int 
ackQuorumSize,
-                          BookKeeper.DigestType digestType, byte[] password, 
Map<String, byte[]> customMetadata) {
+                          BookKeeper.DigestType digestType, byte[] password, 
Map<String, byte[]> customMetadata,
+                          LedgerType ledgerType) {
+        Preconditions.checkNotNull(ledgerType, "ledgerType is required");
         this.ensembleSize = ensembleSize;
         this.writeQuorumSize = writeQuorumSize;
         this.ackQuorumSize = ackQuorumSize;
         this.ctime = System.currentTimeMillis();
+        this.ledgerType = ledgerType;
 
         /*
          * It is set in PendingReadOp.readEntryComplete, and
@@ -109,14 +116,11 @@ public LedgerMetadata(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
         this.hasPassword = true;
         if (customMetadata != null) {
             this.customMetadata = customMetadata;
+        } else {
+            this.customMetadata = Collections.emptyMap();
         }
     }
 
-    public LedgerMetadata(int ensembleSize, int writeQuorumSize, int 
ackQuorumSize,
-            BookKeeper.DigestType digestType, byte[] password) {
-        this(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, 
password, null);
-    }
-
     /**
      * Copy Constructor.
      */
@@ -132,6 +136,7 @@ public LedgerMetadata(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
         this.version = other.version;
         this.hasPassword = other.hasPassword;
         this.digestType = other.digestType;
+        this.ledgerType = other.ledgerType;
         this.password = new byte[other.password.length];
         System.arraycopy(other.password, 0, this.password, 0, 
other.password.length);
         // copy the ensembles
@@ -142,9 +147,9 @@ public LedgerMetadata(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
         }
         this.customMetadata = other.customMetadata;
     }
-
+    private final static byte[] EMPTY_PASSWORD = new byte[0];
     private LedgerMetadata() {
-        this(0, 0, 0, BookKeeper.DigestType.MAC, new byte[] {});
+        this(0, 0, 0, BookKeeper.DigestType.MAC, EMPTY_PASSWORD, null, 
LedgerType.PD_JOURNAL);
         this.hasPassword = false;
     }
 
@@ -422,6 +427,18 @@ public static LedgerMetadata parseConfig(byte[] bytes, 
Version version, Optional
         } else {
             lc.ackQuorumSize = lc.writeQuorumSize;
         }
+        if (data.hasLedgerType()) {
+            switch (data.getLedgerType()) {
+                case VD_JOURNAL:
+                   lc.ledgerType = LedgerType.VD_JOURNAL;
+                   break;
+                case PD_JOURNAL:
+                   lc.ledgerType = LedgerType.PD_JOURNAL;
+                   break;
+                default:
+                    throw new IllegalStateException("illegal ledgerType 
"+data.getLedgerType()+" is metadata");
+            }
+        }
 
         lc.ensembleSize = data.getEnsembleSize();
         lc.length = data.getLength();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 5ab243536..24795969c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -103,7 +103,7 @@ void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : 
BookieProtocol.FLAG_NONE;
 
         
lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(bookieIndex), 
lh.ledgerId, lh.ledgerKey, entryId, toSend,
-                this, bookieIndex, flags);
+                this, bookieIndex, flags, lh.ledgerType);
     }
 
     @Override
@@ -198,7 +198,11 @@ void initiate(ByteBuf toSend, int entryLength) {
     }
 
     @Override
-    public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+    public void writeComplete(int rc, long ledgerId, long entryId, long 
lastAddSyncedEntryId,
+        BookieSocketAddress addr, Object ctx) {
+
+        // TODO: lastAddSyncedEntryId and lederType will be handled in next 
patches for BP-14
+
         int bookieIndex = (Integer) ctx;
 
         if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java
new file mode 100644
index 000000000..5fa758266
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client.api;
+
+/**
+ * Describes the type of ledger.
+ * LedgerTypes describes the behaviour of the ledger in respect to durability 
and
+ * provides hints to the storage of data on Bookies
+ *
+ * @since 4.6
+ */
+public enum LedgerType {
+    /**
+     * Persistent Durability, using Journal.<br>
+     * Each entry is persisted to the journal and every writes receives and 
acknowledgement only with the guarantee that
+     * it has been persisted durabily to it (data is fsync'd to the disk)
+     */
+    PD_JOURNAL,
+    /**
+     * Volatile Durability, using Journal.<br>
+     * Each entry is persisted to the journal and writes receive 
acknowledgement without guarantees of persistence
+     * (data is eventually fsync'd to disk).<br>
+     * For this kind of ledgers the client MUST explicitly call {@link 
LedgerHandle#asyncSync(long, 
org.apache.bookkeeper.client.AsyncCallback.SyncCallback, java.lang.Object) }
+     * in order to have guarantees of the durability of writes and in order to 
advance the LastAddConfirmed entry id
+     */
+    VD_JOURNAL
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 409fe4b4a..50352d4c6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -105,7 +105,7 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
                     ctx.channel().writeAndFlush(
                             new BookieProtocol.AddResponse(
                                     req.getProtocolVersion(), 
BookieProtocol.EUA,
-                                    req.getLedgerId(), req.getEntryId()));
+                                    req.getLedgerId(), req.getEntryId(), 
BookieProtocol.INVALID_ENTRY_ID));
                 } else if (req.getOpCode() == BookieProtocol.READENTRY) {
                     ctx.channel().writeAndFlush(
                             new BookieProtocol.ReadResponse(
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index d763f5768..d072631e5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -62,6 +62,7 @@
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import org.apache.bookkeeper.client.api.LedgerType;
 
 /**
  * Implements the client-side part of the BookKeeper protocol.
@@ -217,6 +218,7 @@ public void safeRun() {
     private void completeAdd(final int rc,
                              final long ledgerId,
                              final long entryId,
+                             final long lastAddSyncedEntry,
                              final BookieSocketAddress addr,
                              final WriteCallback cb,
                              final Object ctx) {
@@ -224,7 +226,7 @@ private void completeAdd(final int rc,
             executor.submitOrdered(ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
-                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                    cb.writeComplete(rc, ledgerId, entryId, 
lastAddSyncedEntry, addr, ctx);
                 }
                 @Override
                 public String toString() {
@@ -232,7 +234,7 @@ public String toString() {
                 }
             });
         } catch (RejectedExecutionException ree) {
-            cb.writeComplete(getRc(BKException.Code.InterruptedException), 
ledgerId, entryId, addr, ctx);
+            cb.writeComplete(getRc(BKException.Code.InterruptedException), 
ledgerId, entryId,lastAddSyncedEntry, addr, ctx);
         }
     }
 
@@ -243,13 +245,14 @@ public void addEntry(final BookieSocketAddress addr,
                          final ByteBuf toSend,
                          final WriteCallback cb,
                          final Object ctx,
-                         final int options) {
+                         final int options,
+                         final LedgerType ledgerType) {
         closeLock.readLock().lock();
         try {
             final PerChannelBookieClientPool client = lookupClient(addr, 
entryId);
             if (client == null) {
                 
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
-                            ledgerId, entryId, addr, cb, ctx);
+                            ledgerId, entryId, 
BookieProtocol.INVALID_ENTRY_ID, addr, cb, ctx);
                 return;
             }
 
@@ -261,9 +264,9 @@ public void addEntry(final BookieSocketAddress addr,
                 @Override
                 public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
                     if (rc != BKException.Code.OK) {
-                        completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
+                        completeAdd(rc, ledgerId, entryId, 
BookieProtocol.INVALID_ENTRY_ID, addr, cb, ctx);
                     } else {
-                        pcbc.addEntry(ledgerId, masterKey, entryId, toSend, 
cb, ctx, options);
+                        pcbc.addEntry(ledgerId, masterKey, entryId, toSend, 
cb, ctx, options, ledgerType);
                     }
                     toSend.release();
                 }
@@ -510,7 +513,7 @@ public static void main(String[] args) throws 
NumberFormatException, IOException
         }
         WriteCallback cb = new WriteCallback() {
 
-            public void writeComplete(int rc, long ledger, long entry, 
BookieSocketAddress addr, Object ctx) {
+            public void writeComplete(int rc, long ledger, long entry, long 
lastSyncedEntryId, BookieSocketAddress addr, Object ctx) {
                 Counter counter = (Counter) ctx;
                 counter.dec();
                 if (rc != 0) {
@@ -531,7 +534,8 @@ public void writeComplete(int rc, long ledger, long entry, 
BookieSocketAddress a
 
         for (int i = 0; i < 100000; i++) {
             counter.inc();
-            bc.addEntry(addr, ledger, new byte[0], i, 
Unpooled.wrappedBuffer(hello), cb, counter, 0);
+            bc.addEntry(addr, ledger, new byte[0], i, 
Unpooled.wrappedBuffer(hello), cb, counter, 0,
+                LedgerType.PD_JOURNAL);
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 0fece29c5..fa420bff6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -155,7 +155,8 @@ public Object decode(ByteBuf packet)
                 // Read ledger and entry id without advancing the reader index
                 ledgerId = packet.getLong(packet.readerIndex());
                 entryId = packet.getLong(packet.readerIndex() + 8);
-                return new BookieProtocol.AddRequest(version, ledgerId, 
entryId, flags, masterKey, packet.retain());
+                return new BookieProtocol.AddRequest(version, ledgerId, 
entryId, flags, masterKey, packet.retain(),
+                                                     
BookieProtocol.LEDGERTYPE_PD_JOURNAL);
             }
 
             case BookieProtocol.READENTRY:
@@ -263,7 +264,8 @@ public Object decode(ByteBuf buffer)
                 rc = buffer.readInt();
                 ledgerId = buffer.readLong();
                 entryId = buffer.readLong();
-                return new BookieProtocol.AddResponse(version, rc, ledgerId, 
entryId);
+                return new BookieProtocol.AddResponse(version, rc, ledgerId, 
entryId,
+                                                     
BookieProtocol.LEDGERTYPE_PD_JOURNAL);
             case BookieProtocol.READENTRY:
                 rc = buffer.readInt();
                 ledgerId = buffer.readLong();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 99c33fbeb..f8991da6e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -62,6 +62,16 @@
      */
     public static final int MASTER_KEY_LENGTH = 20;
 
+    /**
+     * The request is about a ledger with a PD_JOURNAL LedgerType
+     */
+    public static final short LEDGERTYPE_PD_JOURNAL = 0;
+
+    /**
+     * The request is about a ledger with a VD_JOURNAL LedgerType
+     */
+    public static final short LEDGERTYPE_VD_JOURNAL = 1;
+
     /** 
      * The first int of a packet is the header.
      * It contains the version, opCode and flags.
@@ -237,17 +247,23 @@ public String toString() {
 
     static class AddRequest extends Request {
         final ByteBuf data;
+        final short ledgerType;
 
         public AddRequest(byte protocolVersion, long ledgerId, long entryId,
-                          short flags, byte[] masterKey, ByteBuf data) {
+                          short flags, byte[] masterKey, ByteBuf data, short 
ledgerType) {
             super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, 
masterKey);
             this.data = data.retain();
+            this.ledgerType = ledgerType;
         }
 
         ByteBuf getData() {
             return data;
         }
 
+        short getLedgerType() {
+            return ledgerType;
+        }
+
         boolean isRecoveryAdd() {
             return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
         }
@@ -291,14 +307,16 @@ AuthMessage getAuthMessage() {
         final int errorCode;
         final long ledgerId;
         final long entryId;
+        final long lastAddSyncedEntry;
 
         protected Response(byte protocolVersion, byte opCode,
-                           int errorCode, long ledgerId, long entryId) {
+                           int errorCode, long ledgerId, long entryId, long 
lastAddSyncedEntry) {
             this.protocolVersion = protocolVersion;
             this.opCode = opCode;
             this.errorCode = errorCode;
             this.ledgerId = ledgerId;
             this.entryId = entryId;
+            this.lastAddSyncedEntry = lastAddSyncedEntry;
         }
 
         byte getProtocolVersion() {
@@ -321,6 +339,10 @@ int getErrorCode() {
             return errorCode;
         }
 
+        long getLastAddSyncedEntry() {
+            return lastAddSyncedEntry;
+        }
+
         @Override
         public String toString() {
             return String.format("Op(%d)[Ledger:%d,Entry:%d,errorCode=%d]",
@@ -332,12 +354,12 @@ public String toString() {
         final ByteBuf data;
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long 
entryId) {
-            super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
+            super(protocolVersion, READENTRY, errorCode, ledgerId, entryId, 
INVALID_ENTRY_ID);
             this.data = Unpooled.EMPTY_BUFFER;
         }
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long 
entryId, ByteBuf data) {
-            super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
+            super(protocolVersion, READENTRY, errorCode, ledgerId, entryId, 
INVALID_ENTRY_ID);
             this.data = data;
         }
 
@@ -351,15 +373,15 @@ ByteBuf getData() {
     }
 
     static class AddResponse extends Response {
-        AddResponse(byte protocolVersion, int errorCode, long ledgerId, long 
entryId) {
-            super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
+        AddResponse(byte protocolVersion, int errorCode, long ledgerId, long 
entryId, long lastAddSyncedEntry) {
+            super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId, 
lastAddSyncedEntry);
         }
     }
     
     static class ErrorResponse extends Response {
         ErrorResponse(byte protocolVersion, byte opCode, int errorCode,
                       long ledgerId, long entryId) {
-            super(protocolVersion, opCode, errorCode, ledgerId, entryId);
+            super(protocolVersion, opCode, errorCode, ledgerId, entryId, 
INVALID_ENTRY_ID);
         }
     }
 
@@ -367,7 +389,7 @@ ByteBuf getData() {
         final AuthMessage authMessage;
 
         AuthResponse(byte protocolVersion, AuthMessage authMessage) {
-            super(protocolVersion, AUTH, EOK, -1, -1);
+            super(protocolVersion, AUTH, EOK, -1, -1, INVALID_ENTRY_ID);
             this.authMessage = authMessage;
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index ab160ce81..896eeb311 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -68,7 +68,8 @@
     }
 
     public interface WriteCallback {
-        void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx);
+        void writeComplete(int rc, long ledgerId, long entryId, long 
lastAddSyncedEntry,
+                           BookieSocketAddress addr, Object ctx);
     }
 
     public interface ReadLacCallback {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 20f601ce9..c58a36aec 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -19,15 +19,11 @@
 
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
-import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -56,7 +52,6 @@
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
-import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
 import java.util.Collections;
@@ -68,7 +63,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.auth.BookKeeperPrincipal;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
@@ -115,7 +109,6 @@
 import com.google.protobuf.ExtensionRegistry;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
-import java.net.SocketAddress;
 
 import java.net.SocketAddress;
 import java.security.cert.Certificate;
@@ -124,6 +117,9 @@
 import java.util.List;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import org.apache.bookkeeper.auth.BookKeeperPrincipal;
+import org.apache.bookkeeper.client.api.LedgerType;
+import static org.apache.bookkeeper.client.api.LedgerType.PD_JOURNAL;
+import static org.apache.bookkeeper.client.api.LedgerType.VD_JOURNAL;
 
 /**
  * This class manages all details of connection to a particular bookie. It also
@@ -548,13 +544,13 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
      *          Add options
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, 
ByteBuf toSend, WriteCallback cb,
-                  Object ctx, final int options) {
+                  Object ctx, final int options, final LedgerType ledgerType) {
         Object request = null;
         CompletionKey completion = null;
         if (useV2WireProtocol) {
             completion = new V2CompletionKey(ledgerId, entryId, 
OperationType.ADD_ENTRY);
             request = new 
BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 
entryId,
-                    (short) options, masterKey, toSend);
+                    (short) options, masterKey, toSend, 
BookieProtocol.LEDGERTYPE_PD_JOURNAL);
         } else {
             final long txnId = getTxnId();
             completion = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
@@ -576,6 +572,15 @@ void addEntry(final long ledgerId, byte[] masterKey, final 
long entryId, ByteBuf
                 addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
             }
 
+            switch (ledgerType) {
+                case VD_JOURNAL:
+                    addBuilder.setLedgerType(AddRequest.LedgerType.VD_JOURNAL);
+                    break;
+                case PD_JOURNAL:
+                    // nothing, this is the default
+                    break;
+            }
+
             request = Request.newBuilder()
                     .setHeader(headerBuilder)
                     .setAddRequest(addBuilder)
@@ -1102,7 +1107,7 @@ public void safeRun() {
                             new Object[] { addCompletion.entryId, 
addCompletion.ledgerId, bAddress, rc });
                 }
 
-                addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, 
addCompletion.entryId,
+                addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, 
addCompletion.entryId, addCompletion.lastAddSyncedEntry,
                                                addr, addCompletion.ctx);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Invoked callback method: {}", 
addCompletion.entryId);
@@ -1288,7 +1293,7 @@ private void readV2Response(final BookieProtocol.Response 
response) {
                 public void safeRun() {
                     switch (operationType) {
                         case ADD_ENTRY: {
-                            handleAddResponse(ledgerId, entryId, status, 
completionValue);
+                            handleAddResponse(ledgerId, entryId, 
BookieProtocol.INVALID_ENTRY_ID, status, completionValue);
                             break;
                         }
                         case READ_ENTRY: {
@@ -1375,7 +1380,8 @@ public void safeRun() {
                         case ADD_ENTRY: {
                             AddResponse addResponse = 
response.getAddResponse();
                             StatusCode status = response.getStatus() == 
StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
-                            handleAddResponse(addResponse.getLedgerId(), 
addResponse.getEntryId(), status, completionValue);
+                            long lastAddSynced = 
addResponse.hasLastAddSynced() ? addResponse.getLastAddSynced() : 
BookieProtocol.INVALID_ENTRY_ID;
+                            handleAddResponse(addResponse.getLedgerId(), 
addResponse.getEntryId(), lastAddSynced, status, completionValue);
                             break;
                         }
                         case READ_ENTRY: {
@@ -1558,7 +1564,7 @@ void handleWriteLacResponse(long ledgerId, StatusCode 
status, CompletionValue co
         plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
     }
 
- void handleAddResponse(long ledgerId, long entryId, StatusCode status, 
CompletionValue completionValue) {
+ void handleAddResponse(long ledgerId, long entryId, long lastAddSynced, 
StatusCode status, CompletionValue completionValue) {
         // The completion value should always be an instance of an 
AddCompletion object when we reach here.
         AddCompletion ac = (AddCompletion)completionValue;
 
@@ -1577,7 +1583,7 @@ void handleAddResponse(long ledgerId, long entryId, 
StatusCode status, Completio
             }
             rcToRet = BKException.Code.WriteException;
         }
-        ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
+        ac.cb.writeComplete(rcToRet, ledgerId, entryId, lastAddSynced, addr, 
ac.ctx);
     }
 
     void handleReadLacResponse(long ledgerId, StatusCode status, ByteBuf 
lacBuffer, ByteBuf lastEntryBuffer, CompletionValue completionValue) {
@@ -1670,13 +1676,15 @@ void handleGetBookieInfoResponse(long freeDiskSpace, 
long totalDiskCapacity,  St
         final Object ctx;
         protected final long ledgerId;
         protected final long entryId;
+        protected final long lastAddSyncedEntry;
         protected final Timeout timeout;
 
-        public CompletionValue(Object ctx, long ledgerId, long entryId,
+        public CompletionValue(Object ctx, long ledgerId, long entryId, long 
lastAddSyncedEntry,
                                Timeout timeout) {
             this.ctx = ctx;
             this.ledgerId = ledgerId;
             this.entryId = entryId;
+            this.lastAddSyncedEntry = lastAddSyncedEntry;
             this.timeout = timeout;
         }
 
@@ -1697,7 +1705,7 @@ public WriteLacCompletion(WriteLacCallback cb, Object 
ctx, long ledgerId) {
 
         public WriteLacCompletion(final OpStatsLogger writeLacOpLogger, final 
WriteLacCallback originalCallback,
                 final Object originalCtx, final long ledgerId, final Timeout 
timeout) {
-            super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, 
timeout);
+            super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, 
BookieProtocol.INVALID_ENTRY_ID, timeout);
             final long startTime = MathUtils.nowInNano();
             this.cb = null == writeLacOpLogger ? originalCallback : new 
WriteLacCallback() {
                 @Override
@@ -1726,7 +1734,7 @@ public ReadLacCompletion(ReadLacCallback cb, Object ctx, 
long ledgerId) {
 
         public ReadLacCompletion(final OpStatsLogger readLacOpLogger, final 
ReadLacCallback originalCallback,
                 final Object ctx, final long ledgerId, final Timeout timeout) {
-            super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
+            super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, 
BookieProtocol.INVALID_ENTRY_ID, timeout);
             final long startTime = MathUtils.nowInNano();
             this.cb = null == readLacOpLogger ? originalCallback : new 
ReadLacCallback() {
                 @Override
@@ -1758,7 +1766,7 @@ public ReadCompletion(final PerChannelBookieClient pcbc, 
final OpStatsLogger rea
                               final ReadEntryCallback originalCallback,
                               final Object originalCtx, final long ledgerId, 
final long entryId,
                               final Timeout timeout) {
-            super(originalCtx, ledgerId, entryId, timeout);
+            super(originalCtx, ledgerId, entryId, 
BookieProtocol.INVALID_ENTRY_ID, timeout);
             final long startTime = MathUtils.nowInNano();
             this.cb = new ReadEntryCallback() {
                 @Override
@@ -1792,7 +1800,7 @@ public StartTLSCompletion(final PerChannelBookieClient 
pcbc, StartTLSCallback cb
 
         public StartTLSCompletion(final PerChannelBookieClient pcbc, final 
OpStatsLogger startTLSOpLogger,
                                   final StartTLSCallback originalCallback, 
final Object originalCtx, final Timeout timeout) {
-            super(originalCtx, -1, -1, timeout);
+            super(originalCtx, -1, -1, BookieProtocol.INVALID_ENTRY_ID, 
timeout);
             final long startTime = MathUtils.nowInNano();
             this.cb = new StartTLSCallback() {
                 @Override
@@ -1830,7 +1838,7 @@ public GetBookieInfoCompletion(final 
PerChannelBookieClient pcbc, GetBookieInfoC
         public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, 
final OpStatsLogger getBookieInfoOpLogger,
                               final GetBookieInfoCallback originalCallback,
                               final Object originalCtx, final Timeout timeout) 
{
-            super(originalCtx, 0L, 0L, timeout);
+            super(originalCtx, 0L, 0L, BookieProtocol.INVALID_ENTRY_ID, 
timeout);
             final long startTime = MathUtils.nowInNano();
             this.cb = (null == getBookieInfoOpLogger) ? originalCallback : new 
GetBookieInfoCallback() {
                 @Override
@@ -1868,11 +1876,11 @@ public AddCompletion(final PerChannelBookieClient pcbc, 
final OpStatsLogger addE
                              final WriteCallback originalCallback,
                              final Object originalCtx, final long ledgerId, 
final long entryId,
                              final Timeout timeout) {
-            super(originalCtx, ledgerId, entryId, timeout);
+            super(originalCtx, ledgerId, entryId, 
BookieProtocol.INVALID_ENTRY_ID, timeout);
             final long startTime = MathUtils.nowInNano();
             this.cb = null == addEntryOpLogger ? originalCallback : new 
WriteCallback() {
                 @Override
-                public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                public void writeComplete(int rc, long ledgerId, long entryId, 
long lastAddSynced, BookieSocketAddress addr, Object ctx) {
                     cancelTimeout();
                     if (pcbc.addEntryOpLogger != null) {
                         long latency = MathUtils.elapsedNanos(startTime);
@@ -1887,7 +1895,7 @@ public void writeComplete(int rc, long ledgerId, long 
entryId, BookieSocketAddre
                         pcbc.recordError();
                     }
 
-                    originalCallback.writeComplete(rc, ledgerId, entryId, 
addr, originalCtx);
+                    originalCallback.writeComplete(rc, ledgerId, entryId, 
lastAddSynced, addr, originalCtx);
                 }
             };
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
index c0be16247..36bd418ec 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
@@ -26,7 +26,7 @@
     static BookieProtocol.Response buildErrorResponse(int errorCode, 
BookieProtocol.Request r) {
         if (r.getOpCode() == BookieProtocol.ADDENTRY) {
             return new BookieProtocol.AddResponse(r.getProtocolVersion(), 
errorCode,
-                                                  r.getLedgerId(), 
r.getEntryId());
+                                                  r.getLedgerId(), 
r.getEntryId(), BookieProtocol.INVALID_ENTRY_ID);
         } else {
             assert(r.getOpCode() == BookieProtocol.READENTRY);
             return new BookieProtocol.ReadResponse(r.getProtocolVersion(), 
errorCode,
@@ -34,9 +34,9 @@
         }
     }
 
-    static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) {
+    static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r, 
long lastAddSyncedEntry) {
         return new BookieProtocol.AddResponse(r.getProtocolVersion(), 
BookieProtocol.EOK, r.getLedgerId(),
-                                              r.getEntryId());
+                                              r.getEntryId(), 
lastAddSyncedEntry);
     }
 
     static BookieProtocol.Response buildReadResponse(ByteBuf data, 
BookieProtocol.Request r) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 827aed986..b2d47ea02 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -64,7 +64,8 @@ protected void processPacket() {
             if (add.isRecoveryAdd()) {
                 requestProcessor.bookie.recoveryAddEntry(add.getData(), this, 
channel, add.getMasterKey());
             } else {
-                requestProcessor.bookie.addEntry(add.getData(), this, channel, 
add.getMasterKey());
+                requestProcessor.bookie.addEntry(add.getData(), 
BookieProtocol.LEDGERTYPE_PD_JOURNAL,
+                    this, channel, add.getMasterKey());
             }
         } catch (IOException e) {
             LOG.error("Error writing " + add, e);
@@ -89,7 +90,7 @@ protected void processPacket() {
     }
 
     @Override
-    public void writeComplete(int rc, long ledgerId, long entryId,
+    public void writeComplete(int rc, long ledgerId, long entryId, long 
lastAddSyncedEntry,
                               BookieSocketAddress addr, Object ctx) {
         if (BookieProtocol.EOK == rc) {
             
requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
@@ -99,7 +100,7 @@ public void writeComplete(int rc, long ledgerId, long 
entryId,
                     TimeUnit.NANOSECONDS);
         }
         sendResponse(rc,
-                     ResponseBuilder.buildAddResponse(request),
+                     ResponseBuilder.buildAddResponse(request, 
lastAddSyncedEntry),
                      requestProcessor.addRequestStats);
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index b4e89f8c5..a1f03c66f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -70,7 +70,7 @@ private AddResponse getAddResponse() {
 
         BookkeeperInternalCallbacks.WriteCallback wcb = new 
BookkeeperInternalCallbacks.WriteCallback() {
             @Override
-            public void writeComplete(int rc, long ledgerId, long entryId,
+            public void writeComplete(int rc, long ledgerId, long entryId, 
long lastAddSyncedEntry,
                                       BookieSocketAddress addr, Object ctx) {
                 if (BookieProtocol.EOK == rc) {
                     
requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
@@ -108,7 +108,10 @@ public void writeComplete(int rc, long ledgerId, long 
entryId,
             if (addRequest.hasFlag() && 
addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
                 requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, 
channel, masterKey);
             } else {
-                requestProcessor.bookie.addEntry(entryToAdd, wcb, channel, 
masterKey);
+                 short ledgerType = addRequest.hasLedgerType()
+                    && 
addRequest.getLedgerType().equals(AddRequest.LedgerType.VD_JOURNAL)
+                    ? BookieProtocol.LEDGERTYPE_VD_JOURNAL : 
BookieProtocol.LEDGERTYPE_PD_JOURNAL;
+                requestProcessor.bookie.addEntry(entryToAdd, ledgerType, wcb, 
channel, masterKey);
             }
             status = StatusCode.EOK;
         } catch (IOException e) {
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto 
b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index b43e69148..898822d7f 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -62,6 +62,7 @@ enum OperationType {
     READ_LAC = 7;
     GET_BOOKIE_INFO = 8;
     START_TLS = 9;
+    SYNC = 10;
 }
 
 /**
@@ -111,6 +112,12 @@ message AddRequest {
     required int64 entryId = 2;
     required bytes masterKey = 3;
     required bytes body = 4;
+
+    enum LedgerType {
+        PD_JOURNAL = 0;
+        VD_JOURNAL = 1;
+    }
+    optional LedgerType ledgerType = 5;
 }
 
 message StartTLSRequest {
@@ -136,6 +143,13 @@ message GetBookieInfoRequest {
     optional int64 requested = 1;
 }
 
+message SyncRequest {
+    required int64 ledgerId = 1;
+    required bytes masterKey = 2;
+    required int64 firstEntryId = 3;
+    required int64 lastEntryId = 4;
+}
+
 message Response {
 
     required BKPacketHeader header = 1;
@@ -150,6 +164,7 @@ message Response {
     optional ReadLacResponse readLacResponse = 104;
     optional GetBookieInfoResponse getBookieInfoResponse = 105;
     optional StartTLSResponse startTLSResponse = 106;
+    optional SyncResponse syncResponse = 107;
 }
 
 message ReadResponse {
@@ -166,6 +181,8 @@ message AddResponse {
     required StatusCode status = 1;
     required int64 ledgerId = 2;
     required int64 entryId = 3;
+    // Piggyback LAS
+    optional int64 lastAddSynced = 4;
 }
 
 message AuthMessage {
@@ -193,3 +210,9 @@ message GetBookieInfoResponse {
 
 message StartTLSResponse {
 }
+
+message SyncResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+    required int64 lastPersistedEntryId = 3;
+}
diff --git a/bookkeeper-server/src/main/proto/DataFormats.proto 
b/bookkeeper-server/src/main/proto/DataFormats.proto
index cdade9563..7ef03ef8d 100644
--- a/bookkeeper-server/src/main/proto/DataFormats.proto
+++ b/bookkeeper-server/src/main/proto/DataFormats.proto
@@ -58,6 +58,12 @@ message LedgerMetadataFormat {
         optional bytes value = 2;
     }
     repeated cMetadataMapEntry customMetadata = 11;
+
+    enum LedgerType {
+        PD_JOURNAL = 0;
+        VD_JOURNAL = 1;
+    }
+    optional LedgerType ledgerType = 12 [default = PD_JOURNAL];
 }
 
 message LedgerRereplicationLayoutFormat {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 2a7e5c3f2..f33566f6f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -43,6 +43,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.bookkeeper.proto.BookieProtocol;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -334,7 +335,7 @@ public void testIndexPageEvictionWriteOrder() throws 
Exception {
         b.start();
         for (int i = 1; i <= numLedgers; i++) {
             ByteBuf packet = generateEntry(i, 1);
-            b.addEntry(packet, new Bookie.NopWriteCallback(), null, 
"passwd".getBytes());
+            b.addEntry(packet, BookieProtocol.LEDGERTYPE_PD_JOURNAL, new 
Bookie.NopWriteCallback(), null, "passwd".getBytes());
         }
 
         conf = TestBKConfiguration.newServerConfiguration()
@@ -513,7 +514,7 @@ public void testEntryMemTableFlushFailure() throws 
Exception {
 
         // this bookie.addEntry call is required. FileInfo for Ledger 1 would 
be created with this call.
         // without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls 
will fail because of BOOKKEEPER-965 change.
-        bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), 
null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(1, 1), 
BookieProtocol.LEDGERTYPE_PD_JOURNAL, new Bookie.NopWriteCallback(), null, 
"passwd".getBytes());
 
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
         assertFalse("Bookie is expected to be in ReadWrite mode", 
bookie.isReadOnly());
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index 183af5bb0..c1429bb9a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -87,7 +87,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb,
                 }
 
                 @Override
-                public void addEntry(ByteBuf entry, WriteCallback cb,
+                public void addEntry(ByteBuf entry, short ledgerType, 
WriteCallback cb,
                                      Object ctx, byte[] masterKey)
                         throws IOException, BookieException {
                     try {
@@ -97,7 +97,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb,
                         // and an exception would spam the logs
                         Thread.currentThread().interrupt();
                     }
-                    super.addEntry(entry, cb, ctx, masterKey);
+                    super.addEntry(entry, ledgerType, cb, ctx, masterKey);
                 }
 
                 @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index e61100bbd..c50c37273 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -195,7 +195,7 @@ private void startUnauthorizedBookie(ServerConfiguration 
conf, final CountDownLa
             throws Exception {
         Bookie sBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, 
byte[] masterKey)
+            public void addEntry(ByteBuf entry, short ledgerType, 
WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 try {
                     latch.await();
@@ -218,8 +218,9 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback 
cb, Object ctx, byte[]
     // so no ensemble change when recovering ledger on this bookie.
     private void startDeadBookie(ServerConfiguration conf, final 
CountDownLatch latch) throws Exception {
         Bookie dBookie = new Bookie(conf) {
+
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, 
byte[] masterKey)
+            public void addEntry(ByteBuf entry, short ledgerType, 
WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 try {
                     latch.await();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index 73c38b9f0..1f5c3a1d0 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -188,7 +188,7 @@ private void ledgerRecoveryWithSlowBookie(int ensembleSize, 
int writeQuorumSize,
 
         Bookie fakeBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, 
byte[] masterKey)
+            public void addEntry(ByteBuf entry, short ledgerType, 
WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 // drop request to simulate a slow and failed bookie
             }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 1b6f2e7dc..ab6598c27 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -58,6 +58,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.api.LedgerType;
 
 public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
 
@@ -361,14 +362,15 @@ public void testRecoveryOnEntryGap() throws Exception {
         final CountDownLatch addLatch = new CountDownLatch(1);
         final AtomicBoolean addSuccess = new AtomicBoolean(false);
         LOG.info("Add entry {} with lac = {}", entryId, lac);
-        lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(0), 
lh.getId(), lh.ledgerKey, entryId, toSend,
+        lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(0),
+            lh.getId(), lh.ledgerKey, entryId, toSend,
             new WriteCallback() {
                 @Override
-                public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                public void writeComplete(int rc, long ledgerId, long entryId, 
long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) {
                     addSuccess.set(BKException.Code.OK == rc);
                     addLatch.countDown();
                 }
-            }, 0, BookieProtocol.FLAG_NONE);
+            }, 0, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         addLatch.await();
         assertTrue("add entry 14 should succeed", addSuccess.get());
 
@@ -418,22 +420,24 @@ public void operationComplete(int rc, Void result) {
             private final int rc;
             private final long ledgerId;
             private final long entryId;
+            private final long lastAddSyncedEntryId;
             private final BookieSocketAddress addr;
             private final Object ctx;
 
             WriteCallbackEntry(WriteCallback cb,
-                               int rc, long ledgerId, long entryId,
+                               int rc, long ledgerId, long entryId, long 
lastAddSyncedEntryId,
                                BookieSocketAddress addr, Object ctx) {
                 this.cb = cb;
                 this.rc = rc;
                 this.ledgerId = ledgerId;
                 this.entryId = entryId;
+                this.lastAddSyncedEntryId = lastAddSyncedEntryId;
                 this.addr = addr;
                 this.ctx = ctx;
             }
 
             public void callback() {
-                cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                cb.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntryId, 
addr, ctx);
             }
         }
 
@@ -450,16 +454,16 @@ public DelayResponseBookie(ServerConfiguration conf)
         }
 
         @Override
-        public void addEntry(ByteBuf entry, final WriteCallback cb, Object 
ctx, byte[] masterKey)
+        public void addEntry(ByteBuf entry, short ledgerType, final 
WriteCallback cb, Object ctx, byte[] masterKey)
                 throws IOException, BookieException {
-            super.addEntry(entry, new WriteCallback() {
+            super.addEntry(entry, ledgerType, new WriteCallback() {
                 @Override
-                public void writeComplete(int rc, long ledgerId, long entryId,
+                public void writeComplete(int rc, long ledgerId, long entryId, 
long lastAddSyncedEntryId,
                                           BookieSocketAddress addr, Object 
ctx) {
                     if (delayAddResponse.get()) {
-                        delayQueue.add(new WriteCallbackEntry(cb, rc, 
ledgerId, entryId, addr, ctx));
+                        delayQueue.add(new WriteCallbackEntry(cb, rc, 
ledgerId, entryId, lastAddSyncedEntryId, addr, ctx));
                     } else {
-                        cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                        cb.writeComplete(rc, ledgerId, entryId, 
lastAddSyncedEntryId, addr, ctx);
                     }
                 }
             }, ctx, masterKey);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index c0ff8d774..bc4cfc881 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -21,6 +21,7 @@
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -28,6 +29,7 @@
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -236,7 +238,8 @@ public void 
testReplicateLFShouldReturnFalseIfTheReplicationFails()
     public void testSplitIntoSubFragmentsWithDifferentFragmentBoundaries()
             throws Exception {
         LedgerMetadata metadata = new LedgerMetadata(3, 3, 3, TEST_DIGEST_TYPE,
-                TEST_PSSWD) {
+                TEST_PSSWD,
+                null, LedgerType.PD_JOURNAL) {
             @Override
             ArrayList<BookieSocketAddress> getEnsemble(long entryId) {
                 return null;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index a0801dbb2..bee2d408f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -49,6 +49,7 @@
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.api.LedgerType;
 
 import static org.junit.Assert.*;
 
@@ -120,7 +121,8 @@ public void testWatchMetadataRemoval() throws Exception {
         idGenerator.generateLedgerId(new GenericCallback<Long>() {
             @Override
             public void operationComplete(int rc, final Long lid) {
-                manager.createLedgerMetadata(lid, new LedgerMetadata(4, 2, 2, 
digestType, "fpj was here".getBytes()),
+                manager.createLedgerMetadata(lid, new LedgerMetadata(4, 2, 2, 
digestType, "fpj was here".getBytes(),
+                null, LedgerType.PD_JOURNAL),
                          new 
BookkeeperInternalCallbacks.GenericCallback<Void>(){
 
                     @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index ecdfa7718..918b08341 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -61,6 +61,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
@@ -101,7 +102,8 @@ public void operationComplete(int rc, final Long ledgerId) {
                     }
 
                     getLedgerManager().createLedgerMetadata(ledgerId,
-                            new LedgerMetadata(1, 1, 1, DigestType.MAC, 
"".getBytes()), new GenericCallback<Void>() {
+                            new LedgerMetadata(1, 1, 1, DigestType.MAC, 
"".getBytes(),
+                                null, LedgerType.PD_JOURNAL), new 
GenericCallback<Void>() {
                                 @Override
                                 public void operationComplete(int rc, Void 
result) {
                                     if (rc == BKException.Code.OK) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index 0692d33ab..e84f6f80e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -46,6 +46,7 @@
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -518,7 +519,8 @@ public void testTriggerAuditorWithNoPendingAuditTask() 
throws Exception {
         int numofledgers = 5;
         Random rand = new Random();
         for (int i = 0; i < numofledgers; i++) {
-            LedgerMetadata metadata = new LedgerMetadata(3, 2, 2, 
DigestType.CRC32, "passwd".getBytes(), null);
+            LedgerMetadata metadata = new LedgerMetadata(3, 2, 2, 
DigestType.CRC32, "passwd".getBytes(),
+                    null, LedgerType.PD_JOURNAL);
             ArrayList<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
             ensemble.add(new BookieSocketAddress("99.99.99.99:9999"));
             ensemble.add(new BookieSocketAddress("11.11.11.11:1111"));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 5bc34d61d..ae7ad846e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -35,6 +35,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -122,7 +123,8 @@ public void readEntryComplete(int rc, long ledgerId, long 
entryId, ByteBuf bb, O
     };
 
     WriteCallback wrcb = new WriteCallback() {
-        public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+        public void writeComplete(int rc, long ledgerId, long entryId, long 
lastAddSyncedEntryId,
+            BookieSocketAddress addr, Object ctx) {
             if (ctx != null) {
                 synchronized (ctx) {
                     if (ctx instanceof ResultStruct) {
@@ -145,7 +147,7 @@ public void testWriteGaps() throws Exception {
 
         BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
         ByteBuf bb = createByteBuffer(1, 1, 1);
-        bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, 
BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, 
BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         synchronized (arc) {
             arc.wait(1000);
             assertEquals(0, arc.rc);
@@ -155,16 +157,16 @@ public void testWriteGaps() throws Exception {
             assertEquals(1, arc.entry.getInt());
         }
         bb = createByteBuffer(2, 1, 2);
-        bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null, 
BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null, 
BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         bb = createByteBuffer(3, 1, 3);
-        bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null, 
BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null, 
BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         bb = createByteBuffer(5, 1, 5);
-        bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null, 
BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null, 
BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         bb = createByteBuffer(7, 1, 7);
-        bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null, 
BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null, 
BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         synchronized (notifyObject) {
             bb = createByteBuffer(11, 1, 11);
-            bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject, 
BookieProtocol.FLAG_NONE);
+            bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject, 
BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
             notifyObject.wait();
         }
         synchronized (arc) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
index eb2ff0ee9..a7b78acc1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
@@ -37,6 +37,7 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.junit.After;
 import org.junit.Before;
@@ -167,7 +168,7 @@ private long doWrites(int ledgers, int size, int 
totalwrites)
         throttle = new Semaphore(10000);
         WriteCallback cb = new WriteCallback() {
             @Override
-            public void writeComplete(int rc, long ledgerId, long entryId,
+            public void writeComplete(int rc, long ledgerId, long entryId, 
long lastAddSyncedEntryId,
                     BookieSocketAddress addr, Object ctx) {
                 AtomicInteger counter = (AtomicInteger)ctx;
                 counter.getAndIncrement();
@@ -187,7 +188,7 @@ public void writeComplete(int rc, long ledgerId, long 
entryId,
                 bytes.position(0);
                 bytes.limit(bytes.capacity());
                 throttle.acquire();
-                bookie.addEntry(Unpooled.wrappedBuffer(bytes), cb, counter, 
zeros);
+                bookie.addEntry(Unpooled.wrappedBuffer(bytes), 
BookieProtocol.LEDGERTYPE_PD_JOURNAL, cb, counter, zeros);
             }
         }
         long finish = System.currentTimeMillis();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
index 4aca80838..035484981 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
@@ -27,6 +27,7 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import org.apache.bookkeeper.client.api.LedgerType;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -76,11 +77,12 @@ void write(long ledgerId, long entry, byte[] data, 
BookieSocketAddress addr, Wri
         byte[] passwd = new byte[20];
         Arrays.fill(passwd, (byte) 'a');
 
-        client.addEntry(addr, ledgerId, passwd, entry, 
Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE);
+        client.addEntry(addr, ledgerId, passwd, entry, 
Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE,
+            LedgerType.PD_JOURNAL);
     }
 
     @Override
-    public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+    public void writeComplete(int rc, long ledgerId, long entryId, long 
lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) {
         Counter counter = (Counter) ctx;
         counter.increment();
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to