eolivelli closed pull request #531: Issue-530 BP-14 BP-14 Relax Durability - protocol changes preview URL: https://github.com/apache/bookkeeper/pull/531
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java index 9c52788eb..fb939da2a 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; @@ -54,7 +55,7 @@ static class LatencyCallback implements WriteCallback { boolean complete; @Override - public synchronized void writeComplete(int rc, long ledgerId, long entryId, + public synchronized void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { if (rc != 0) { LOG.error("Got error " + rc); @@ -75,7 +76,7 @@ public synchronized void waitForComplete() throws InterruptedException { static class ThroughputCallback implements WriteCallback { int count; int waitingCount = Integer.MAX_VALUE; - public synchronized void writeComplete(int rc, long ledgerId, long entryId, + public synchronized void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEnryId, BookieSocketAddress addr, Object ctx) { if (rc != 0) { LOG.error("Got error " + rc); @@ -175,7 +176,7 @@ public static void main(String[] args) toSend.writeLong(entry); toSend.writerIndex(toSend.capacity()); bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20], - entry, toSend, tc, null, BookieProtocol.FLAG_NONE); + entry, toSend, tc, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); } LOG.info("Waiting for warmup"); tc.waitFor(warmUpCount); @@ -193,7 +194,7 @@ public static void main(String[] args) toSend.writerIndex(toSend.capacity()); lc.resetComplete(); bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20], - entry, toSend, lc, null, BookieProtocol.FLAG_NONE); + entry, toSend, lc, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); lc.waitForComplete(); } long endTime = System.nanoTime(); @@ -213,7 +214,7 @@ public static void main(String[] args) toSend.writeLong(entry); toSend.writerIndex(toSend.capacity()); bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20], - entry, toSend, tc, null, BookieProtocol.FLAG_NONE); + entry, toSend, tc, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); } tc.waitFor(entryCount); endTime = System.currentTimeMillis(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 220aa4cb0..c15d1de8c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -77,6 +77,7 @@ import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNS; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; @@ -216,7 +217,7 @@ public long getEntry() { // Write Callback do nothing static class NopWriteCallback implements WriteCallback { @Override - public void writeComplete(int rc, long ledgerId, long entryId, + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { if (LOG.isDebugEnabled()) { LOG.debug("Finished writing entry {} @ ledger {} for {} : {}", @@ -1378,7 +1379,7 @@ private Journal getJournal(long ledgerId) { /** * Add an entry to a ledger as specified by handle. */ - private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, WriteCallback cb, Object ctx) + private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, short ledgerType, WriteCallback cb, Object ctx) throws IOException, BookieException { long ledgerId = handle.getLedgerId(); long entryId = handle.addEntry(entry); @@ -1388,7 +1389,7 @@ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, WriteCallb if (LOG.isTraceEnabled()) { LOG.trace("Adding {}@{}", entryId, ledgerId); } - getJournal(ledgerId).logAddEntry(entry, cb, ctx); + getJournal(ledgerId).logAddEntry(entry, ledgerType, cb, ctx); } /** @@ -1406,7 +1407,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] LedgerDescriptor handle = getLedgerForEntry(entry, masterKey); synchronized (handle) { entrySize = entry.readableBytes(); - addEntryInternal(handle, entry, cb, ctx); + addEntryInternal(handle, entry, BookieProtocol.LEDGERTYPE_PD_JOURNAL, cb, ctx); } success = true; } catch (NoWritableLedgerDirException e) { @@ -1453,7 +1454,7 @@ public ByteBuf getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedger * Add entry to a ledger. * @throws BookieException.LedgerFencedException if the ledger is fenced */ - public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, short ledgerType, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException.LedgerFencedException, BookieException { long requestNanos = MathUtils.nowInNano(); boolean success = false; @@ -1466,7 +1467,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterK .create(BookieException.Code.LedgerFencedException); } entrySize = entry.readableBytes(); - addEntryInternal(handle, entry, cb, ctx); + addEntryInternal(handle, entry, ledgerType, cb, ctx); } success = true; } catch (NoWritableLedgerDirException e) { @@ -1491,7 +1492,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterK SettableFuture<Boolean> result = SettableFuture.create(); @Override - public void writeComplete(int rc, long ledgerId, long entryId, + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { if (LOG.isDebugEnabled()) { LOG.debug("Finished writing entry {} @ ledger {} for {} : {}", @@ -1560,7 +1561,7 @@ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, int count; @Override - public synchronized void writeComplete(int rc, long l, long e, BookieSocketAddress addr, Object ctx) { + public synchronized void writeComplete(int rc, long l, long e, long las, BookieSocketAddress addr, Object ctx) { count--; if (count == 0) { notifyAll(); @@ -1680,7 +1681,7 @@ public static void main(String[] args) buff.writeLong(1); buff.writeLong(i); cb.incCount(); - b.addEntry(buff, cb, null, new byte[0]); + b.addEntry(buff, BookieProtocol.LEDGERTYPE_PD_JOURNAL, cb, null, new byte[0]); } cb.waitZero(); long end = MathUtils.now(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index e2b1e798d..dc15bae7b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -293,7 +294,9 @@ public void run() { LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId); } journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS); - cb.writeComplete(0, ledgerId, entryId, null, ctx); + final long dummyLastAddSyncedEnryId = -1; + // TODO: lastAddSynced will be implemented in next patches for BP-14 + cb.writeComplete(0, ledgerId, entryId, dummyLastAddSyncedEnryId, null, ctx); } } @@ -752,19 +755,21 @@ public boolean accept(long journalId) { } public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) { - logAddEntry(Unpooled.wrappedBuffer(entry), cb, ctx); + logAddEntry(Unpooled.wrappedBuffer(entry), BookieProtocol.LEDGERTYPE_PD_JOURNAL, cb, ctx); } /** * record an add entry operation in journal. */ - public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) { + public void logAddEntry(ByteBuf entry, short ledgerType, WriteCallback cb, Object ctx) { long ledgerId = entry.getLong(entry.readerIndex() + 0); long entryId = entry.getLong(entry.readerIndex() + 8); journalQueueSize.inc(); //Retain entry until it gets written to journal entry.retain(); + + // TODO ledgerType will be taken into account in next patches for BP-14 queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano())); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java index af55f6a73..138103eda 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java @@ -28,6 +28,7 @@ import java.util.Observable; import java.util.Observer; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.proto.BookieProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,7 +123,8 @@ ByteBuf getExplicitLac() { result = logFenceResult = SettableFuture.create(); } ByteBuf entry = createLedgerFenceEntry(ledgerId); - journal.logAddEntry(entry, (rc, ledgerId, entryId, addr, ctx) -> { + journal.logAddEntry(entry, BookieProtocol.LEDGERTYPE_PD_JOURNAL, + (rc, ledgerId, entryId, lastAddSyncedEntryId,addr, ctx) -> { LOG.debug("Record fenced state for ledger {} in journal with rc {}", ledgerId, rc); if (rc == 0) { fenceEntryPersisted.compareAndSet(false, true); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java index 376d716bb..729fc3fd4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java @@ -29,6 +29,7 @@ import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.meta.LedgerIdGenerator; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -84,7 +85,8 @@ LedgerCreateOp(BookKeeper bk, int ensembleSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx, final Map<String, byte[]> customMetadata) { this.bk = bk; - this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, passwd, customMetadata); + this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, passwd, customMetadata, + LedgerType.PD_JOURNAL); this.digestType = digestType; this.passwd = passwd; this.cb = cb; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index fe1104a17..4de48b133 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -291,7 +292,7 @@ public void readComplete(int rc, LedgerHandle lh, new WriteCallback() { @Override public void writeComplete(int rc, long ledgerId, - long entryId, BookieSocketAddress addr, + long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { if (rc != BKException.Code.OK) { LOG.error( @@ -317,7 +318,9 @@ public void writeComplete(int rc, long ledgerId, ledgerFragmentEntryMcb.processResult(rc, null, null); } - }, null, BookieProtocol.FLAG_RECOVERY_ADD); + // TODO: using LedgerType.PD_JOURNAL as we want recovery to be guaranteed to succeeed + // could this be a problem for other LedgerTypes ? + }, null, BookieProtocol.FLAG_RECOVERY_ADD, LedgerType.PD_JOURNAL); } }, null); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 7c81d6d92..f273d4d96 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -51,11 +51,13 @@ import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; @@ -101,6 +103,9 @@ final Counter lacUpdateHitsCounter; final Counter lacUpdateMissesCounter; + // TODO: use BP-15 API to set ledgerType + final LedgerType ledgerType = LedgerType.PD_JOURNAL; + // This empty master key is used when an empty password is provided which is the hash of an empty string private final static byte[] emptyLedgerKey; static { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java index 55e3987b8..1142ff124 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java @@ -42,7 +42,10 @@ import static com.google.common.base.Charsets.UTF_8; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import java.util.Collections; +import org.apache.bookkeeper.client.api.LedgerType; /** * This class encapsulates all the ledger metadata that is persistently stored @@ -74,6 +77,7 @@ private long length; private long lastEntryId; private long ctime; + private LedgerType ledgerType; private LedgerMetadataFormat.State state; private SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = @@ -85,14 +89,17 @@ private LedgerMetadataFormat.DigestType digestType; private byte[] password; - private Map<String, byte[]> customMetadata = Maps.newHashMap(); + private Map<String, byte[]> customMetadata; public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - BookKeeper.DigestType digestType, byte[] password, Map<String, byte[]> customMetadata) { + BookKeeper.DigestType digestType, byte[] password, Map<String, byte[]> customMetadata, + LedgerType ledgerType) { + Preconditions.checkNotNull(ledgerType, "ledgerType is required"); this.ensembleSize = ensembleSize; this.writeQuorumSize = writeQuorumSize; this.ackQuorumSize = ackQuorumSize; this.ctime = System.currentTimeMillis(); + this.ledgerType = ledgerType; /* * It is set in PendingReadOp.readEntryComplete, and @@ -109,14 +116,11 @@ public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize, this.hasPassword = true; if (customMetadata != null) { this.customMetadata = customMetadata; + } else { + this.customMetadata = Collections.emptyMap(); } } - public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - BookKeeper.DigestType digestType, byte[] password) { - this(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password, null); - } - /** * Copy Constructor. */ @@ -132,6 +136,7 @@ public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize, this.version = other.version; this.hasPassword = other.hasPassword; this.digestType = other.digestType; + this.ledgerType = other.ledgerType; this.password = new byte[other.password.length]; System.arraycopy(other.password, 0, this.password, 0, other.password.length); // copy the ensembles @@ -142,9 +147,9 @@ public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize, } this.customMetadata = other.customMetadata; } - + private final static byte[] EMPTY_PASSWORD = new byte[0]; private LedgerMetadata() { - this(0, 0, 0, BookKeeper.DigestType.MAC, new byte[] {}); + this(0, 0, 0, BookKeeper.DigestType.MAC, EMPTY_PASSWORD, null, LedgerType.PD_JOURNAL); this.hasPassword = false; } @@ -422,6 +427,18 @@ public static LedgerMetadata parseConfig(byte[] bytes, Version version, Optional } else { lc.ackQuorumSize = lc.writeQuorumSize; } + if (data.hasLedgerType()) { + switch (data.getLedgerType()) { + case VD_JOURNAL: + lc.ledgerType = LedgerType.VD_JOURNAL; + break; + case PD_JOURNAL: + lc.ledgerType = LedgerType.PD_JOURNAL; + break; + default: + throw new IllegalStateException("illegal ledgerType "+data.getLedgerType()+" is metadata"); + } + } lc.ensembleSize = data.getEnsembleSize(); lc.length = data.getLength(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 5ab243536..24795969c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -103,7 +103,7 @@ void sendWriteRequest(int bookieIndex) { int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : BookieProtocol.FLAG_NONE; lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend, - this, bookieIndex, flags); + this, bookieIndex, flags, lh.ledgerType); } @Override @@ -198,7 +198,11 @@ void initiate(ByteBuf toSend, int entryLength) { } @Override - public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, + BookieSocketAddress addr, Object ctx) { + + // TODO: lastAddSyncedEntryId and lederType will be handled in next patches for BP-14 + int bookieIndex = (Integer) ctx; if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java new file mode 100644 index 000000000..5fa758266 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java @@ -0,0 +1,40 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client.api; + +/** + * Describes the type of ledger. + * LedgerTypes describes the behaviour of the ledger in respect to durability and + * provides hints to the storage of data on Bookies + * + * @since 4.6 + */ +public enum LedgerType { + /** + * Persistent Durability, using Journal.<br> + * Each entry is persisted to the journal and every writes receives and acknowledgement only with the guarantee that + * it has been persisted durabily to it (data is fsync'd to the disk) + */ + PD_JOURNAL, + /** + * Volatile Durability, using Journal.<br> + * Each entry is persisted to the journal and writes receive acknowledgement without guarantees of persistence + * (data is eventually fsync'd to disk).<br> + * For this kind of ledgers the client MUST explicitly call {@link LedgerHandle#asyncSync(long, org.apache.bookkeeper.client.AsyncCallback.SyncCallback, java.lang.Object) } + * in order to have guarantees of the durability of writes and in order to advance the LastAddConfirmed entry id + */ + VD_JOURNAL +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index 409fe4b4a..50352d4c6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -105,7 +105,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.channel().writeAndFlush( new BookieProtocol.AddResponse( req.getProtocolVersion(), BookieProtocol.EUA, - req.getLedgerId(), req.getEntryId())); + req.getLedgerId(), req.getEntryId(), BookieProtocol.INVALID_ENTRY_ID)); } else if (req.getOpCode() == BookieProtocol.READENTRY) { ctx.channel().writeAndFlush( new BookieProtocol.ReadResponse( diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index d763f5768..d072631e5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -62,6 +62,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import org.apache.bookkeeper.client.api.LedgerType; /** * Implements the client-side part of the BookKeeper protocol. @@ -217,6 +218,7 @@ public void safeRun() { private void completeAdd(final int rc, final long ledgerId, final long entryId, + final long lastAddSyncedEntry, final BookieSocketAddress addr, final WriteCallback cb, final Object ctx) { @@ -224,7 +226,7 @@ private void completeAdd(final int rc, executor.submitOrdered(ledgerId, new SafeRunnable() { @Override public void safeRun() { - cb.writeComplete(rc, ledgerId, entryId, addr, ctx); + cb.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntry, addr, ctx); } @Override public String toString() { @@ -232,7 +234,7 @@ public String toString() { } }); } catch (RejectedExecutionException ree) { - cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx); + cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId,lastAddSyncedEntry, addr, ctx); } } @@ -243,13 +245,14 @@ public void addEntry(final BookieSocketAddress addr, final ByteBuf toSend, final WriteCallback cb, final Object ctx, - final int options) { + final int options, + final LedgerType ledgerType) { closeLock.readLock().lock(); try { final PerChannelBookieClientPool client = lookupClient(addr, entryId); if (client == null) { completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException), - ledgerId, entryId, addr, cb, ctx); + ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID, addr, cb, ctx); return; } @@ -261,9 +264,9 @@ public void addEntry(final BookieSocketAddress addr, @Override public void operationComplete(final int rc, PerChannelBookieClient pcbc) { if (rc != BKException.Code.OK) { - completeAdd(rc, ledgerId, entryId, addr, cb, ctx); + completeAdd(rc, ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID, addr, cb, ctx); } else { - pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options); + pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options, ledgerType); } toSend.release(); } @@ -510,7 +513,7 @@ public static void main(String[] args) throws NumberFormatException, IOException } WriteCallback cb = new WriteCallback() { - public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress addr, Object ctx) { + public void writeComplete(int rc, long ledger, long entry, long lastSyncedEntryId, BookieSocketAddress addr, Object ctx) { Counter counter = (Counter) ctx; counter.dec(); if (rc != 0) { @@ -531,7 +534,8 @@ public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress a for (int i = 0; i < 100000; i++) { counter.inc(); - bc.addEntry(addr, ledger, new byte[0], i, Unpooled.wrappedBuffer(hello), cb, counter, 0); + bc.addEntry(addr, ledger, new byte[0], i, Unpooled.wrappedBuffer(hello), cb, counter, 0, + LedgerType.PD_JOURNAL); } counter.wait(0); System.out.println("Total = " + counter.total()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 0fece29c5..fa420bff6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -155,7 +155,8 @@ public Object decode(ByteBuf packet) // Read ledger and entry id without advancing the reader index ledgerId = packet.getLong(packet.readerIndex()); entryId = packet.getLong(packet.readerIndex() + 8); - return new BookieProtocol.AddRequest(version, ledgerId, entryId, flags, masterKey, packet.retain()); + return new BookieProtocol.AddRequest(version, ledgerId, entryId, flags, masterKey, packet.retain(), + BookieProtocol.LEDGERTYPE_PD_JOURNAL); } case BookieProtocol.READENTRY: @@ -263,7 +264,8 @@ public Object decode(ByteBuf buffer) rc = buffer.readInt(); ledgerId = buffer.readLong(); entryId = buffer.readLong(); - return new BookieProtocol.AddResponse(version, rc, ledgerId, entryId); + return new BookieProtocol.AddResponse(version, rc, ledgerId, entryId, + BookieProtocol.LEDGERTYPE_PD_JOURNAL); case BookieProtocol.READENTRY: rc = buffer.readInt(); ledgerId = buffer.readLong(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 99c33fbeb..f8991da6e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -62,6 +62,16 @@ */ public static final int MASTER_KEY_LENGTH = 20; + /** + * The request is about a ledger with a PD_JOURNAL LedgerType + */ + public static final short LEDGERTYPE_PD_JOURNAL = 0; + + /** + * The request is about a ledger with a VD_JOURNAL LedgerType + */ + public static final short LEDGERTYPE_VD_JOURNAL = 1; + /** * The first int of a packet is the header. * It contains the version, opCode and flags. @@ -237,17 +247,23 @@ public String toString() { static class AddRequest extends Request { final ByteBuf data; + final short ledgerType; public AddRequest(byte protocolVersion, long ledgerId, long entryId, - short flags, byte[] masterKey, ByteBuf data) { + short flags, byte[] masterKey, ByteBuf data, short ledgerType) { super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, masterKey); this.data = data.retain(); + this.ledgerType = ledgerType; } ByteBuf getData() { return data; } + short getLedgerType() { + return ledgerType; + } + boolean isRecoveryAdd() { return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD; } @@ -291,14 +307,16 @@ AuthMessage getAuthMessage() { final int errorCode; final long ledgerId; final long entryId; + final long lastAddSyncedEntry; protected Response(byte protocolVersion, byte opCode, - int errorCode, long ledgerId, long entryId) { + int errorCode, long ledgerId, long entryId, long lastAddSyncedEntry) { this.protocolVersion = protocolVersion; this.opCode = opCode; this.errorCode = errorCode; this.ledgerId = ledgerId; this.entryId = entryId; + this.lastAddSyncedEntry = lastAddSyncedEntry; } byte getProtocolVersion() { @@ -321,6 +339,10 @@ int getErrorCode() { return errorCode; } + long getLastAddSyncedEntry() { + return lastAddSyncedEntry; + } + @Override public String toString() { return String.format("Op(%d)[Ledger:%d,Entry:%d,errorCode=%d]", @@ -332,12 +354,12 @@ public String toString() { final ByteBuf data; ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { - super(protocolVersion, READENTRY, errorCode, ledgerId, entryId); + super(protocolVersion, READENTRY, errorCode, ledgerId, entryId, INVALID_ENTRY_ID); this.data = Unpooled.EMPTY_BUFFER; } ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) { - super(protocolVersion, READENTRY, errorCode, ledgerId, entryId); + super(protocolVersion, READENTRY, errorCode, ledgerId, entryId, INVALID_ENTRY_ID); this.data = data; } @@ -351,15 +373,15 @@ ByteBuf getData() { } static class AddResponse extends Response { - AddResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { - super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId); + AddResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, long lastAddSyncedEntry) { + super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId, lastAddSyncedEntry); } } static class ErrorResponse extends Response { ErrorResponse(byte protocolVersion, byte opCode, int errorCode, long ledgerId, long entryId) { - super(protocolVersion, opCode, errorCode, ledgerId, entryId); + super(protocolVersion, opCode, errorCode, ledgerId, entryId, INVALID_ENTRY_ID); } } @@ -367,7 +389,7 @@ ByteBuf getData() { final AuthMessage authMessage; AuthResponse(byte protocolVersion, AuthMessage authMessage) { - super(protocolVersion, AUTH, EOK, -1, -1); + super(protocolVersion, AUTH, EOK, -1, -1, INVALID_ENTRY_ID); this.authMessage = authMessage; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java index ab160ce81..896eeb311 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java @@ -68,7 +68,8 @@ } public interface WriteCallback { - void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx); + void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry, + BookieSocketAddress addr, Object ctx); } public interface ReadLacCallback { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 20f601ce9..c58a36aec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -19,15 +19,11 @@ import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID; -import com.google.common.collect.Sets; import com.google.protobuf.ByteString; -import com.google.protobuf.ExtensionRegistry; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -56,7 +52,6 @@ import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; -import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Collections; @@ -68,7 +63,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.bookkeeper.auth.BookKeeperPrincipal; import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperClientStats; @@ -115,7 +109,6 @@ import com.google.protobuf.ExtensionRegistry; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; -import java.net.SocketAddress; import java.net.SocketAddress; import java.security.cert.Certificate; @@ -124,6 +117,9 @@ import java.util.List; import javax.net.ssl.SSLPeerUnverifiedException; import org.apache.bookkeeper.auth.BookKeeperPrincipal; +import org.apache.bookkeeper.client.api.LedgerType; +import static org.apache.bookkeeper.client.api.LedgerType.PD_JOURNAL; +import static org.apache.bookkeeper.client.api.LedgerType.VD_JOURNAL; /** * This class manages all details of connection to a particular bookie. It also @@ -548,13 +544,13 @@ public void operationComplete(ChannelFuture future) throws Exception { * Add options */ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf toSend, WriteCallback cb, - Object ctx, final int options) { + Object ctx, final int options, final LedgerType ledgerType) { Object request = null; CompletionKey completion = null; if (useV2WireProtocol) { completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY); request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, - (short) options, masterKey, toSend); + (short) options, masterKey, toSend, BookieProtocol.LEDGERTYPE_PD_JOURNAL); } else { final long txnId = getTxnId(); completion = new V3CompletionKey(txnId, OperationType.ADD_ENTRY); @@ -576,6 +572,15 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD); } + switch (ledgerType) { + case VD_JOURNAL: + addBuilder.setLedgerType(AddRequest.LedgerType.VD_JOURNAL); + break; + case PD_JOURNAL: + // nothing, this is the default + break; + } + request = Request.newBuilder() .setHeader(headerBuilder) .setAddRequest(addBuilder) @@ -1102,7 +1107,7 @@ public void safeRun() { new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress, rc }); } - addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId, + addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId, addCompletion.lastAddSyncedEntry, addr, addCompletion.ctx); if (LOG.isDebugEnabled()) { LOG.debug("Invoked callback method: {}", addCompletion.entryId); @@ -1288,7 +1293,7 @@ private void readV2Response(final BookieProtocol.Response response) { public void safeRun() { switch (operationType) { case ADD_ENTRY: { - handleAddResponse(ledgerId, entryId, status, completionValue); + handleAddResponse(ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID, status, completionValue); break; } case READ_ENTRY: { @@ -1375,7 +1380,8 @@ public void safeRun() { case ADD_ENTRY: { AddResponse addResponse = response.getAddResponse(); StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus(); - handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue); + long lastAddSynced = addResponse.hasLastAddSynced() ? addResponse.getLastAddSynced() : BookieProtocol.INVALID_ENTRY_ID; + handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), lastAddSynced, status, completionValue); break; } case READ_ENTRY: { @@ -1558,7 +1564,7 @@ void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue co plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx); } - void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) { + void handleAddResponse(long ledgerId, long entryId, long lastAddSynced, StatusCode status, CompletionValue completionValue) { // The completion value should always be an instance of an AddCompletion object when we reach here. AddCompletion ac = (AddCompletion)completionValue; @@ -1577,7 +1583,7 @@ void handleAddResponse(long ledgerId, long entryId, StatusCode status, Completio } rcToRet = BKException.Code.WriteException; } - ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx); + ac.cb.writeComplete(rcToRet, ledgerId, entryId, lastAddSynced, addr, ac.ctx); } void handleReadLacResponse(long ledgerId, StatusCode status, ByteBuf lacBuffer, ByteBuf lastEntryBuffer, CompletionValue completionValue) { @@ -1670,13 +1676,15 @@ void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity, St final Object ctx; protected final long ledgerId; protected final long entryId; + protected final long lastAddSyncedEntry; protected final Timeout timeout; - public CompletionValue(Object ctx, long ledgerId, long entryId, + public CompletionValue(Object ctx, long ledgerId, long entryId, long lastAddSyncedEntry, Timeout timeout) { this.ctx = ctx; this.ledgerId = ledgerId; this.entryId = entryId; + this.lastAddSyncedEntry = lastAddSyncedEntry; this.timeout = timeout; } @@ -1697,7 +1705,7 @@ public WriteLacCompletion(WriteLacCallback cb, Object ctx, long ledgerId) { public WriteLacCompletion(final OpStatsLogger writeLacOpLogger, final WriteLacCallback originalCallback, final Object originalCtx, final long ledgerId, final Timeout timeout) { - super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout); + super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, BookieProtocol.INVALID_ENTRY_ID, timeout); final long startTime = MathUtils.nowInNano(); this.cb = null == writeLacOpLogger ? originalCallback : new WriteLacCallback() { @Override @@ -1726,7 +1734,7 @@ public ReadLacCompletion(ReadLacCallback cb, Object ctx, long ledgerId) { public ReadLacCompletion(final OpStatsLogger readLacOpLogger, final ReadLacCallback originalCallback, final Object ctx, final long ledgerId, final Timeout timeout) { - super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout); + super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, BookieProtocol.INVALID_ENTRY_ID, timeout); final long startTime = MathUtils.nowInNano(); this.cb = null == readLacOpLogger ? originalCallback : new ReadLacCallback() { @Override @@ -1758,7 +1766,7 @@ public ReadCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger rea final ReadEntryCallback originalCallback, final Object originalCtx, final long ledgerId, final long entryId, final Timeout timeout) { - super(originalCtx, ledgerId, entryId, timeout); + super(originalCtx, ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID, timeout); final long startTime = MathUtils.nowInNano(); this.cb = new ReadEntryCallback() { @Override @@ -1792,7 +1800,7 @@ public StartTLSCompletion(final PerChannelBookieClient pcbc, StartTLSCallback cb public StartTLSCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger startTLSOpLogger, final StartTLSCallback originalCallback, final Object originalCtx, final Timeout timeout) { - super(originalCtx, -1, -1, timeout); + super(originalCtx, -1, -1, BookieProtocol.INVALID_ENTRY_ID, timeout); final long startTime = MathUtils.nowInNano(); this.cb = new StartTLSCallback() { @Override @@ -1830,7 +1838,7 @@ public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, GetBookieInfoC public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger getBookieInfoOpLogger, final GetBookieInfoCallback originalCallback, final Object originalCtx, final Timeout timeout) { - super(originalCtx, 0L, 0L, timeout); + super(originalCtx, 0L, 0L, BookieProtocol.INVALID_ENTRY_ID, timeout); final long startTime = MathUtils.nowInNano(); this.cb = (null == getBookieInfoOpLogger) ? originalCallback : new GetBookieInfoCallback() { @Override @@ -1868,11 +1876,11 @@ public AddCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger addE final WriteCallback originalCallback, final Object originalCtx, final long ledgerId, final long entryId, final Timeout timeout) { - super(originalCtx, ledgerId, entryId, timeout); + super(originalCtx, ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID, timeout); final long startTime = MathUtils.nowInNano(); this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() { @Override - public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSynced, BookieSocketAddress addr, Object ctx) { cancelTimeout(); if (pcbc.addEntryOpLogger != null) { long latency = MathUtils.elapsedNanos(startTime); @@ -1887,7 +1895,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre pcbc.recordError(); } - originalCallback.writeComplete(rc, ledgerId, entryId, addr, originalCtx); + originalCallback.writeComplete(rc, ledgerId, entryId, lastAddSynced, addr, originalCtx); } }; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java index c0be16247..36bd418ec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java @@ -26,7 +26,7 @@ static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) { if (r.getOpCode() == BookieProtocol.ADDENTRY) { return new BookieProtocol.AddResponse(r.getProtocolVersion(), errorCode, - r.getLedgerId(), r.getEntryId()); + r.getLedgerId(), r.getEntryId(), BookieProtocol.INVALID_ENTRY_ID); } else { assert(r.getOpCode() == BookieProtocol.READENTRY); return new BookieProtocol.ReadResponse(r.getProtocolVersion(), errorCode, @@ -34,9 +34,9 @@ } } - static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) { + static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r, long lastAddSyncedEntry) { return new BookieProtocol.AddResponse(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(), - r.getEntryId()); + r.getEntryId(), lastAddSyncedEntry); } static BookieProtocol.Response buildReadResponse(ByteBuf data, BookieProtocol.Request r) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 827aed986..b2d47ea02 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -64,7 +64,8 @@ protected void processPacket() { if (add.isRecoveryAdd()) { requestProcessor.bookie.recoveryAddEntry(add.getData(), this, channel, add.getMasterKey()); } else { - requestProcessor.bookie.addEntry(add.getData(), this, channel, add.getMasterKey()); + requestProcessor.bookie.addEntry(add.getData(), BookieProtocol.LEDGERTYPE_PD_JOURNAL, + this, channel, add.getMasterKey()); } } catch (IOException e) { LOG.error("Error writing " + add, e); @@ -89,7 +90,7 @@ protected void processPacket() { } @Override - public void writeComplete(int rc, long ledgerId, long entryId, + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry, BookieSocketAddress addr, Object ctx) { if (BookieProtocol.EOK == rc) { requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), @@ -99,7 +100,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, TimeUnit.NANOSECONDS); } sendResponse(rc, - ResponseBuilder.buildAddResponse(request), + ResponseBuilder.buildAddResponse(request, lastAddSyncedEntry), requestProcessor.addRequestStats); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index b4e89f8c5..a1f03c66f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -70,7 +70,7 @@ private AddResponse getAddResponse() { BookkeeperInternalCallbacks.WriteCallback wcb = new BookkeeperInternalCallbacks.WriteCallback() { @Override - public void writeComplete(int rc, long ledgerId, long entryId, + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry, BookieSocketAddress addr, Object ctx) { if (BookieProtocol.EOK == rc) { requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), @@ -108,7 +108,10 @@ public void writeComplete(int rc, long ledgerId, long entryId, if (addRequest.hasFlag() && addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) { requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, channel, masterKey); } else { - requestProcessor.bookie.addEntry(entryToAdd, wcb, channel, masterKey); + short ledgerType = addRequest.hasLedgerType() + && addRequest.getLedgerType().equals(AddRequest.LedgerType.VD_JOURNAL) + ? BookieProtocol.LEDGERTYPE_VD_JOURNAL : BookieProtocol.LEDGERTYPE_PD_JOURNAL; + requestProcessor.bookie.addEntry(entryToAdd, ledgerType, wcb, channel, masterKey); } status = StatusCode.EOK; } catch (IOException e) { diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto index b43e69148..898822d7f 100644 --- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto +++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto @@ -62,6 +62,7 @@ enum OperationType { READ_LAC = 7; GET_BOOKIE_INFO = 8; START_TLS = 9; + SYNC = 10; } /** @@ -111,6 +112,12 @@ message AddRequest { required int64 entryId = 2; required bytes masterKey = 3; required bytes body = 4; + + enum LedgerType { + PD_JOURNAL = 0; + VD_JOURNAL = 1; + } + optional LedgerType ledgerType = 5; } message StartTLSRequest { @@ -136,6 +143,13 @@ message GetBookieInfoRequest { optional int64 requested = 1; } +message SyncRequest { + required int64 ledgerId = 1; + required bytes masterKey = 2; + required int64 firstEntryId = 3; + required int64 lastEntryId = 4; +} + message Response { required BKPacketHeader header = 1; @@ -150,6 +164,7 @@ message Response { optional ReadLacResponse readLacResponse = 104; optional GetBookieInfoResponse getBookieInfoResponse = 105; optional StartTLSResponse startTLSResponse = 106; + optional SyncResponse syncResponse = 107; } message ReadResponse { @@ -166,6 +181,8 @@ message AddResponse { required StatusCode status = 1; required int64 ledgerId = 2; required int64 entryId = 3; + // Piggyback LAS + optional int64 lastAddSynced = 4; } message AuthMessage { @@ -193,3 +210,9 @@ message GetBookieInfoResponse { message StartTLSResponse { } + +message SyncResponse { + required StatusCode status = 1; + required int64 ledgerId = 2; + required int64 lastPersistedEntryId = 3; +} diff --git a/bookkeeper-server/src/main/proto/DataFormats.proto b/bookkeeper-server/src/main/proto/DataFormats.proto index cdade9563..7ef03ef8d 100644 --- a/bookkeeper-server/src/main/proto/DataFormats.proto +++ b/bookkeeper-server/src/main/proto/DataFormats.proto @@ -58,6 +58,12 @@ message LedgerMetadataFormat { optional bytes value = 2; } repeated cMetadataMapEntry customMetadata = 11; + + enum LedgerType { + PD_JOURNAL = 0; + VD_JOURNAL = 1; + } + optional LedgerType ledgerType = 12 [default = PD_JOURNAL]; } message LedgerRereplicationLayoutFormat { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index 2a7e5c3f2..f33566f6f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.ArrayList; import java.util.List; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.commons.io.FileUtils; import org.junit.After; @@ -334,7 +335,7 @@ public void testIndexPageEvictionWriteOrder() throws Exception { b.start(); for (int i = 1; i <= numLedgers; i++) { ByteBuf packet = generateEntry(i, 1); - b.addEntry(packet, new Bookie.NopWriteCallback(), null, "passwd".getBytes()); + b.addEntry(packet, BookieProtocol.LEDGERTYPE_PD_JOURNAL, new Bookie.NopWriteCallback(), null, "passwd".getBytes()); } conf = TestBKConfiguration.newServerConfiguration() @@ -513,7 +514,7 @@ public void testEntryMemTableFlushFailure() throws Exception { // this bookie.addEntry call is required. FileInfo for Ledger 1 would be created with this call. // without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls will fail because of BOOKKEEPER-965 change. - bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), null, "passwd".getBytes()); + bookie.addEntry(generateEntry(1, 1), BookieProtocol.LEDGERTYPE_PD_JOURNAL, new Bookie.NopWriteCallback(), null, "passwd".getBytes()); flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2)); assertFalse("Bookie is expected to be in ReadWrite mode", bookie.isReadOnly()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java index 183af5bb0..c1429bb9a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java @@ -87,7 +87,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, } @Override - public void addEntry(ByteBuf entry, WriteCallback cb, + public void addEntry(ByteBuf entry, short ledgerType, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { try { @@ -97,7 +97,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb, // and an exception would spam the logs Thread.currentThread().interrupt(); } - super.addEntry(entry, cb, ctx, masterKey); + super.addEntry(entry, ledgerType, cb, ctx, masterKey); } @Override diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java index e61100bbd..c50c37273 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java @@ -195,7 +195,7 @@ private void startUnauthorizedBookie(ServerConfiguration conf, final CountDownLa throws Exception { Bookie sBookie = new Bookie(conf) { @Override - public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, short ledgerType, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { try { latch.await(); @@ -218,8 +218,9 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] // so no ensemble change when recovering ledger on this bookie. private void startDeadBookie(ServerConfiguration conf, final CountDownLatch latch) throws Exception { Bookie dBookie = new Bookie(conf) { + @Override - public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, short ledgerType, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { try { latch.await(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java index 73c38b9f0..1f5c3a1d0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java @@ -188,7 +188,7 @@ private void ledgerRecoveryWithSlowBookie(int ensembleSize, int writeQuorumSize, Bookie fakeBookie = new Bookie(conf) { @Override - public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, short ledgerType, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { // drop request to simulate a slow and failed bookie } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java index 1b6f2e7dc..ab6598c27 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.client.api.LedgerType; public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase { @@ -361,14 +362,15 @@ public void testRecoveryOnEntryGap() throws Exception { final CountDownLatch addLatch = new CountDownLatch(1); final AtomicBoolean addSuccess = new AtomicBoolean(false); LOG.info("Add entry {} with lac = {}", entryId, lac); - lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(0), lh.getId(), lh.ledgerKey, entryId, toSend, + lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(0), + lh.getId(), lh.ledgerKey, entryId, toSend, new WriteCallback() { @Override - public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { addSuccess.set(BKException.Code.OK == rc); addLatch.countDown(); } - }, 0, BookieProtocol.FLAG_NONE); + }, 0, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); addLatch.await(); assertTrue("add entry 14 should succeed", addSuccess.get()); @@ -418,22 +420,24 @@ public void operationComplete(int rc, Void result) { private final int rc; private final long ledgerId; private final long entryId; + private final long lastAddSyncedEntryId; private final BookieSocketAddress addr; private final Object ctx; WriteCallbackEntry(WriteCallback cb, - int rc, long ledgerId, long entryId, + int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { this.cb = cb; this.rc = rc; this.ledgerId = ledgerId; this.entryId = entryId; + this.lastAddSyncedEntryId = lastAddSyncedEntryId; this.addr = addr; this.ctx = ctx; } public void callback() { - cb.writeComplete(rc, ledgerId, entryId, addr, ctx); + cb.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntryId, addr, ctx); } } @@ -450,16 +454,16 @@ public DelayResponseBookie(ServerConfiguration conf) } @Override - public void addEntry(ByteBuf entry, final WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, short ledgerType, final WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { - super.addEntry(entry, new WriteCallback() { + super.addEntry(entry, ledgerType, new WriteCallback() { @Override - public void writeComplete(int rc, long ledgerId, long entryId, + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { if (delayAddResponse.get()) { - delayQueue.add(new WriteCallbackEntry(cb, rc, ledgerId, entryId, addr, ctx)); + delayQueue.add(new WriteCallbackEntry(cb, rc, ledgerId, entryId, lastAddSyncedEntryId, addr, ctx)); } else { - cb.writeComplete(rc, ledgerId, entryId, addr, ctx); + cb.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntryId, addr, ctx); } } }, ctx, masterKey); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java index c0ff8d774..bc4cfc881 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.Enumeration; import java.util.Map.Entry; import java.util.Set; @@ -28,6 +29,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; @@ -236,7 +238,8 @@ public void testReplicateLFShouldReturnFalseIfTheReplicationFails() public void testSplitIntoSubFragmentsWithDifferentFragmentBoundaries() throws Exception { LedgerMetadata metadata = new LedgerMetadata(3, 3, 3, TEST_DIGEST_TYPE, - TEST_PSSWD) { + TEST_PSSWD, + null, LedgerType.PD_JOURNAL) { @Override ArrayList<BookieSocketAddress> getEnsemble(long entryId) { return null; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java index a0801dbb2..bee2d408f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java @@ -49,6 +49,7 @@ import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.LedgerType; import static org.junit.Assert.*; @@ -120,7 +121,8 @@ public void testWatchMetadataRemoval() throws Exception { idGenerator.generateLedgerId(new GenericCallback<Long>() { @Override public void operationComplete(int rc, final Long lid) { - manager.createLedgerMetadata(lid, new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()), + manager.createLedgerMetadata(lid, new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes(), + null, LedgerType.PD_JOURNAL), new BookkeeperInternalCallbacks.GenericCallback<Void>(){ @Override diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index ecdfa7718..918b08341 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -61,6 +61,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; @@ -101,7 +102,8 @@ public void operationComplete(int rc, final Long ledgerId) { } getLedgerManager().createLedgerMetadata(ledgerId, - new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()), new GenericCallback<Void>() { + new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes(), + null, LedgerType.PD_JOURNAL), new GenericCallback<Void>() { @Override public void operationComplete(int rc, Void result) { if (rc == BKException.Code.OK) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java index 0692d33ab..e84f6f80e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java @@ -46,6 +46,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; @@ -518,7 +519,8 @@ public void testTriggerAuditorWithNoPendingAuditTask() throws Exception { int numofledgers = 5; Random rand = new Random(); for (int i = 0; i < numofledgers; i++) { - LedgerMetadata metadata = new LedgerMetadata(3, 2, 2, DigestType.CRC32, "passwd".getBytes(), null); + LedgerMetadata metadata = new LedgerMetadata(3, 2, 2, DigestType.CRC32, "passwd".getBytes(), + null, LedgerType.PD_JOURNAL); ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>(); ensemble.add(new BookieSocketAddress("99.99.99.99:9999")); ensemble.add(new BookieSocketAddress("11.11.11.11:1111")); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 5bc34d61d..ae7ad846e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -35,6 +35,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -122,7 +123,8 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf bb, O }; WriteCallback wrcb = new WriteCallback() { - public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, + BookieSocketAddress addr, Object ctx) { if (ctx != null) { synchronized (ctx) { if (ctx instanceof ResultStruct) { @@ -145,7 +147,7 @@ public void testWriteGaps() throws Exception { BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); ByteBuf bb = createByteBuffer(1, 1, 1); - bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE); + bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); synchronized (arc) { arc.wait(1000); assertEquals(0, arc.rc); @@ -155,16 +157,16 @@ public void testWriteGaps() throws Exception { assertEquals(1, arc.entry.getInt()); } bb = createByteBuffer(2, 1, 2); - bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null, BookieProtocol.FLAG_NONE); + bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); bb = createByteBuffer(3, 1, 3); - bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null, BookieProtocol.FLAG_NONE); + bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); bb = createByteBuffer(5, 1, 5); - bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null, BookieProtocol.FLAG_NONE); + bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); bb = createByteBuffer(7, 1, 7); - bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null, BookieProtocol.FLAG_NONE); + bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); synchronized (notifyObject) { bb = createByteBuffer(11, 1, 11); - bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject, BookieProtocol.FLAG_NONE); + bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL); notifyObject.wait(); } synchronized (arc) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java index eb2ff0ee9..a7b78acc1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java @@ -37,6 +37,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.junit.After; import org.junit.Before; @@ -167,7 +168,7 @@ private long doWrites(int ledgers, int size, int totalwrites) throttle = new Semaphore(10000); WriteCallback cb = new WriteCallback() { @Override - public void writeComplete(int rc, long ledgerId, long entryId, + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { AtomicInteger counter = (AtomicInteger)ctx; counter.getAndIncrement(); @@ -187,7 +188,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, bytes.position(0); bytes.limit(bytes.capacity()); throttle.acquire(); - bookie.addEntry(Unpooled.wrappedBuffer(bytes), cb, counter, zeros); + bookie.addEntry(Unpooled.wrappedBuffer(bytes), BookieProtocol.LEDGERTYPE_PD_JOURNAL, cb, counter, zeros); } } long finish = System.currentTimeMillis(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java index 4aca80838..035484981 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Arrays; +import org.apache.bookkeeper.client.api.LedgerType; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -76,11 +77,12 @@ void write(long ledgerId, long entry, byte[] data, BookieSocketAddress addr, Wri byte[] passwd = new byte[20]; Arrays.fill(passwd, (byte) 'a'); - client.addEntry(addr, ledgerId, passwd, entry, Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE); + client.addEntry(addr, ledgerId, passwd, entry, Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE, + LedgerType.PD_JOURNAL); } @Override - public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { + public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntryId, BookieSocketAddress addr, Object ctx) { Counter counter = (Counter) ctx; counter.increment(); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services