This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9655943 BP-14 WriteFlag DEFERRED_SYNC Client Side Implementation
9655943 is described below
commit 9655943e6be5ac86060134c16ec24d6f0c822f56
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue May 22 16:05:07 2018 +0200
BP-14 WriteFlag DEFERRED_SYNC Client Side Implementation
Introduce implementation of WriteFlag#DEFERRED_SYNC on LedgerHandle (purely
client-side).
Tests are only on the client-side (with Mockito), there is no
implementation on the bookie side. Most significant tests will come with the
force() API, at this point we can only test that we have not broken the LAC
advance mechanism.
Author: Enrico Olivelli <[email protected]>
Reviewers: Sijie Guo <[email protected]>, Venkateswararao Jujjuri (JV)
This closes #853 from eolivelli/bp14-deferred-force-write-client-side
---
.../apache/bookkeeper/benchmark/BenchBookie.java | 10 ++--
.../org/apache/bookkeeper/client/BookKeeper.java | 7 ++-
.../apache/bookkeeper/client/LedgerCreateOp.java | 8 +--
.../client/LedgerFragmentReplicator.java | 4 +-
.../org/apache/bookkeeper/client/LedgerHandle.java | 29 ++++++++---
.../apache/bookkeeper/client/LedgerHandleAdv.java | 2 +-
.../apache/bookkeeper/client/LedgerRecoveryOp.java | 1 +
.../org/apache/bookkeeper/client/PendingAddOp.java | 17 +++++--
.../bookkeeper/client/ReadOnlyLedgerHandle.java | 3 +-
.../bookkeeper/client/api/CreateBuilder.java | 22 ++++++++
.../apache/bookkeeper/client/api/WriteFlag.java | 11 +++-
.../org/apache/bookkeeper/proto/BookieClient.java | 31 ++++++------
.../bookkeeper/proto/PerChannelBookieClient.java | 26 +++++++---
.../bookkeeper/proto/WriteEntryProcessorV3.java | 2 +-
.../apache/bookkeeper/client/BookKeeperTest.java | 22 ++++++++
.../apache/bookkeeper/client/DeferredSyncTest.java | 58 ++++++++++++++++++++++
.../bookkeeper/client/MockBookKeeperTestCase.java | 13 ++---
.../apache/bookkeeper/client/MockLedgerHandle.java | 3 +-
.../client/ParallelLedgerRecoveryTest.java | 3 +-
.../apache/bookkeeper/client/PendingAddOpTest.java | 3 +-
.../client/TestLedgerFragmentReplication.java | 3 +-
.../client/api/BookKeeperBuildersTest.java | 2 +-
.../bookkeeper/client/api/WriteFlagTest.java | 4 +-
.../apache/bookkeeper/test/BookieClientTest.java | 13 ++---
24 files changed, 217 insertions(+), 80 deletions(-)
diff --git
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 3efff7a..e1c5979 100644
---
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -187,7 +188,8 @@ public class BenchBookie {
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new
byte[20],
- entry, ByteBufList.get(toSend), tc, null,
BookieProtocol.FLAG_NONE);
+ entry, ByteBufList.get(toSend), tc, null,
BookieProtocol.FLAG_NONE,
+ false, WriteFlag.NONE);
}
LOG.info("Waiting for warmup");
tc.waitFor(warmUpCount);
@@ -204,7 +206,8 @@ public class BenchBookie {
toSend.writerIndex(toSend.capacity());
lc.resetComplete();
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new
byte[20],
- entry, ByteBufList.get(toSend), lc, null,
BookieProtocol.FLAG_NONE);
+ entry, ByteBufList.get(toSend), lc, null,
+ BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
lc.waitForComplete();
}
long endTime = System.nanoTime();
@@ -222,7 +225,8 @@ public class BenchBookie {
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new
byte[20],
- entry, ByteBufList.get(toSend), tc, null,
BookieProtocol.FLAG_NONE);
+ entry, ByteBufList.get(toSend), tc, null,
BookieProtocol.FLAG_NONE,
+ false, WriteFlag.NONE);
}
tc.waitFor(throughputCount);
endTime = System.currentTimeMillis();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index bd0a12d..98d3651 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -33,7 +33,6 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.URI;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -871,7 +870,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
ackQuorumSize, digestType, passwd, cb, ctx,
- customMetadata,
EnumSet.noneOf(WriteFlag.class), getStatsLogger())
+ customMetadata, WriteFlag.NONE,
getStatsLogger())
.initiate();
} finally {
closeLock.readLock().unlock();
@@ -1075,7 +1074,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
ackQuorumSize, digestType, passwd, cb, ctx,
- customMetadata,
EnumSet.noneOf(WriteFlag.class), getStatsLogger())
+ customMetadata, WriteFlag.NONE,
getStatsLogger())
.initiateAdv(-1L);
} finally {
closeLock.readLock().unlock();
@@ -1186,7 +1185,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
ackQuorumSize, digestType, passwd, cb, ctx,
- customMetadata,
EnumSet.noneOf(WriteFlag.class), getStatsLogger())
+ customMetadata, WriteFlag.NONE,
getStatsLogger())
.initiateAdv(ledgerId);
} finally {
closeLock.readLock().unlock();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index d6c77de..9362b6a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -24,7 +24,6 @@ package org.apache.bookkeeper.client;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -247,7 +246,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
private int builderAckQuorumSize = 2;
private int builderWriteQuorumSize = 2;
private byte[] builderPassword;
- private EnumSet<WriteFlag> builderWriteFlags =
EnumSet.noneOf(WriteFlag.class);
+ private EnumSet<WriteFlag> builderWriteFlags = WriteFlag.NONE;
private org.apache.bookkeeper.client.api.DigestType builderDigestType =
org.apache.bookkeeper.client.api.DigestType.CRC32;
private Map<String, byte[]> builderCustomMetadata =
Collections.emptyMap();
@@ -262,15 +261,12 @@ class LedgerCreateOp implements GenericCallback<Void> {
return this;
}
+ @Override
public CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags) {
this.builderWriteFlags = writeFlags;
return this;
}
- public CreateBuilder withWriteFlags(WriteFlag... writeFlags) {
- return withWriteFlags(EnumSet.copyOf(Arrays.asList(writeFlags)));
- }
-
@Override
public CreateBuilder withWriteQuorumSize(int writeQuorumSize) {
this.builderWriteQuorumSize = writeQuorumSize;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 3f599e2..c051437 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -322,7 +323,8 @@ public class LedgerFragmentReplicator {
for (BookieSocketAddress newBookie : newBookies) {
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entryId,
ByteBufList.clone(toSend),
- multiWriteCallback, dataLength,
BookieProtocol.FLAG_RECOVERY_ADD);
+ multiWriteCallback, dataLength,
BookieProtocol.FLAG_RECOVERY_ADD,
+ false, WriteFlag.NONE);
}
toSend.release();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 271e6fc..b3fcbf7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -104,8 +104,19 @@ public class LedgerHandle implements WriteHandle {
final BookKeeper bk;
final long ledgerId;
long lastAddPushed;
+
+ /**
+ * Last entryId which has been confirmed to be written durably to the
bookies.
+ * This value is used by readers, the the LAC protocol
+ */
volatile long lastAddConfirmed;
+ /**
+ * Next entryId which is expected to move forward during {@link
#sendAddSuccessCallbacks() }. This is important
+ * in order to have an ordered sequence of addEntry ackknowledged to the
writer
+ */
+ volatile long pendingAddsSequenceHead;
+
long length;
final DigestManager macManager;
final DistributionSchedule distributionSchedule;
@@ -177,6 +188,8 @@ public class LedgerHandle implements WriteHandle {
length = 0;
}
+ this.pendingAddsSequenceHead = lastAddConfirmed;
+
this.ledgerId = ledgerId;
if (bk.getConf().getThrottleValue() > 0) {
@@ -1030,7 +1043,7 @@ public class LedgerHandle implements WriteHandle {
public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object
ctx) {
data.retain();
- PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
+ PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
doAsyncAddEntry(op);
}
@@ -1097,7 +1110,8 @@ public class LedgerHandle implements WriteHandle {
*/
void asyncRecoveryAddEntry(final byte[] data, final int offset, final int
length,
final AddCallback cb, final Object ctx) {
- PendingAddOp op = PendingAddOp.create(this,
Unpooled.wrappedBuffer(data, offset, length), cb, ctx)
+ PendingAddOp op = PendingAddOp.create(this,
Unpooled.wrappedBuffer(data, offset, length),
+ writeFlags, cb, ctx)
.enableRecoveryAdd();
doAsyncAddEntry(op);
}
@@ -1685,17 +1699,20 @@ public class LedgerHandle implements WriteHandle {
return;
}
// Check if it is the next entry in the sequence.
- if (pendingAddOp.entryId != 0 && pendingAddOp.entryId !=
lastAddConfirmed + 1) {
+ if (pendingAddOp.entryId != 0 && pendingAddOp.entryId !=
pendingAddsSequenceHead + 1) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Head of the queue entryId: {} is not lac: {} +
1", pendingAddOp.entryId,
- lastAddConfirmed);
+ LOG.debug("Head of the queue entryId: {} is not the
expected value: {}", pendingAddOp.entryId,
+ pendingAddsSequenceHead + 1);
}
return;
}
pendingAddOps.remove();
explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
- lastAddConfirmed = pendingAddOp.entryId;
+ pendingAddsSequenceHead = pendingAddOp.entryId;
+ if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
+ this.lastAddConfirmed = pendingAddsSequenceHead;
+ }
pendingAddOp.submitCallback(BKException.Code.OK);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index f67e1d6..70e9430 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -179,7 +179,7 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
private void asyncAddEntry(final long entryId, ByteBuf data,
final AddCallbackWithLatency cb, final Object ctx) {
- PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
+ PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
op.setEntryId(entryId);
if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 3956c68..d414b7b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -112,6 +112,7 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
synchronized (lh) {
lh.lastAddPushed = lh.lastAddConfirmed =
data.getLastAddConfirmed();
lh.length = data.getLength();
+ lh.pendingAddsSequenceHead =
lh.lastAddConfirmed;
startEntryToRead = endEntryToRead =
lh.lastAddConfirmed;
}
// keep a copy of ledger metadata before proceeding
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 0d00ead..0153d44 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -28,12 +28,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
+import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
@@ -79,10 +80,10 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback {
int pendingWriteRequests;
boolean callbackTriggered;
boolean hasRun;
-
+ EnumSet<WriteFlag> writeFlags;
boolean allowFailFast = false;
-
- static PendingAddOp create(LedgerHandle lh, ByteBuf payload,
AddCallbackWithLatency cb, Object ctx) {
+ static PendingAddOp create(LedgerHandle lh, ByteBuf payload,
EnumSet<WriteFlag> writeFlags,
+ AddCallbackWithLatency cb, Object ctx) {
PendingAddOp op = RECYCLER.get();
op.lh = lh;
op.isRecoveryAdd = false;
@@ -104,6 +105,7 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback {
op.requestTimeNanos = Long.MAX_VALUE;
op.allowFailFast = false;
op.qwcLatency = 0;
+ op.writeFlags = writeFlags;
return op;
}
@@ -137,7 +139,7 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback {
int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY :
FLAG_NONE;
lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey,
- entryId, toSend, this, bookieIndex, flags, allowFailFast);
+ entryId, toSend, this, bookieIndex, flags, allowFailFast,
lh.writeFlags);
++pendingWriteRequests;
}
@@ -316,6 +318,10 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback {
// bookie client is closed.
lh.errorOutPendingAdds(rc);
return;
+ case BKException.Code.IllegalOpException:
+ // illegal operation requested, like using unsupported feature in
v2 protocol
+ lh.handleUnrecoverableErrorDuringAdd(rc);
+ return;
case BKException.Code.LedgerFencedException:
LOG.warn("Fencing exception on write: L{} E{} on {}",
ledgerId, entryId, addr);
@@ -462,6 +468,7 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback {
callbackTriggered = false;
hasRun = false;
allowFailFast = false;
+ writeFlags = null;
recyclerHandle.recycle(this);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index eef0e70..f2433ed 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.client;
import java.security.GeneralSecurityException;
-import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
@@ -77,7 +76,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements
LedgerMetadataListene
ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password, boolean watch)
throws GeneralSecurityException, NumberFormatException {
- super(bk, ledgerId, metadata, digestType, password,
EnumSet.noneOf(WriteFlag.class));
+ super(bk, ledgerId, metadata, digestType, password, WriteFlag.NONE);
if (watch) {
bk.getLedgerManager().registerLedgerMetadataListener(ledgerId,
this);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
index cd5fd3d..a0b67e0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client.api;
+import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Map;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
@@ -73,6 +75,26 @@ public interface CreateBuilder extends
OpBuilder<WriteHandle> {
CreateBuilder withPassword(byte[] password);
/**
+ * Set write flags. Write flags specify the behaviour of writes
+ *
+ * @param writeFlags the flags
+ *
+ * @return the builder itself
+ */
+ CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags);
+
+ /**
+ * Set write flags. Write flags specify the behaviour of writes
+ *
+ * @param writeFlags the flags
+ *
+ * @return the builder itself
+ */
+ default CreateBuilder withWriteFlags(WriteFlag ... writeFlags) {
+ return withWriteFlags(EnumSet.copyOf(Arrays.asList(writeFlags)));
+ }
+
+ /**
* Set a map a custom data to be attached to the ledger. The application
is responsible for the semantics of these
* data.
*
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
index a680b8c..30199c2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
@@ -35,6 +35,13 @@ public enum WriteFlag {
*/
DEFERRED_SYNC(0x1 << 0);
+ /**
+ * No flag is set, use default behaviour.
+ */
+ public static final EnumSet<WriteFlag> NONE =
EnumSet.noneOf(WriteFlag.class);
+
+ private static final EnumSet<WriteFlag> ONLY_DEFERRED_SYNC =
EnumSet.of(DEFERRED_SYNC);
+
private final int value;
WriteFlag(int value) {
@@ -49,9 +56,9 @@ public enum WriteFlag {
*/
public static EnumSet<WriteFlag> getWriteFlags(int flagValue) {
if ((flagValue & DEFERRED_SYNC.value) == DEFERRED_SYNC.value) {
- return EnumSet.of(DEFERRED_SYNC);
+ return ONLY_DEFERRED_SYNC;
}
- return EnumSet.noneOf(WriteFlag.class);
+ return WriteFlag.NONE;
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 31c12fc..3dd837c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -35,6 +35,7 @@ import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -49,6 +50,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -247,19 +249,9 @@ public class BookieClient implements
PerChannelBookieClientFactory {
final ByteBufList toSend,
final WriteCallback cb,
final Object ctx,
- final int options) {
- addEntry(addr, ledgerId, masterKey, entryId, toSend, cb, ctx, options,
false);
- }
-
- public void addEntry(final BookieSocketAddress addr,
- final long ledgerId,
- final byte[] masterKey,
- final long entryId,
- final ByteBufList toSend,
- final WriteCallback cb,
- final Object ctx,
final int options,
- final boolean allowFastFail) {
+ final boolean allowFastFail,
+ final EnumSet<WriteFlag> writeFlags) {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
@@ -273,7 +265,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
client.obtain(ChannelReadyForAddEntryCallback.create(
this, toSend, ledgerId, entryId, addr,
- ctx, cb, options, masterKey, allowFastFail),
+ ctx, cb, options, masterKey, allowFastFail,
writeFlags),
ledgerId);
}
@@ -310,11 +302,13 @@ public class BookieClient implements
PerChannelBookieClientFactory {
private int options;
private byte[] masterKey;
private boolean allowFastFail;
+ private EnumSet<WriteFlag> writeFlags;
static ChannelReadyForAddEntryCallback create(
BookieClient bookieClient, ByteBufList toSend, long ledgerId,
long entryId, BookieSocketAddress addr, Object ctx,
- WriteCallback cb, int options, byte[] masterKey, boolean
allowFastFail) {
+ WriteCallback cb, int options, byte[] masterKey, boolean
allowFastFail,
+ EnumSet<WriteFlag> writeFlags) {
ChannelReadyForAddEntryCallback callback = RECYCLER.get();
callback.bookieClient = bookieClient;
callback.toSend = toSend;
@@ -326,6 +320,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
callback.options = options;
callback.masterKey = masterKey;
callback.allowFastFail = allowFastFail;
+ callback.writeFlags = writeFlags;
return callback;
}
@@ -336,7 +331,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
} else {
pcbc.addEntry(ledgerId, masterKey, entryId,
- toSend, cb, ctx, options, allowFastFail);
+ toSend, cb, ctx, options, allowFastFail,
writeFlags);
}
toSend.release();
@@ -367,7 +362,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
options = -1;
masterKey = null;
allowFastFail = false;
-
+ writeFlags = null;
recyclerHandle.recycle(this);
}
}
@@ -565,7 +560,9 @@ public class BookieClient implements
PerChannelBookieClientFactory {
for (int i = 0; i < 100000; i++) {
counter.inc();
- bc.addEntry(addr, ledger, new byte[0], i,
ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb, counter, 0);
+ bc.addEntry(addr, ledger, new byte[0], i,
+ ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb,
counter, 0, false,
+ WriteFlag.NONE);
}
counter.wait(0);
System.out.println("Total = " + counter.total());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index f6ebbfa..4d69acf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -70,6 +70,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
@@ -87,6 +88,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -579,19 +581,23 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
* Write callback
* @param ctx
* Write callback context
- * @param options
- * Add options
+ * @param allowFastFail
+ * allowFastFail flag
+ * @param writeFlags
+ * WriteFlags
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId,
ByteBufList toSend, WriteCallback cb,
- Object ctx, final int options) {
- addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options,
false);
- }
-
- void addEntry(final long ledgerId, byte[] masterKey, final long entryId,
ByteBufList toSend, WriteCallback cb,
- Object ctx, final int options, boolean allowFastFail) {
+ Object ctx, final int options, boolean allowFastFail, final
EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
+ if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
+ LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
+ executor.executeOrdered(ledgerId, () -> {
+ cb.writeComplete(BKException.Code.IllegalOpException,
ledgerId, entryId, addr, ctx);
+ });
+ return;
+ }
completionKey = acquireV2Key(ledgerId, entryId,
OperationType.ADD_ENTRY);
request = BookieProtocol.AddRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
@@ -627,6 +633,10 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
}
+ if (!writeFlags.isEmpty()) {
+ // add flags only if needed, in order to be able to talk with
old bookies
+
addBuilder.setWriteFlags(WriteFlag.getWriteFlagsValue(writeFlags));
+ }
request = Request.newBuilder()
.setHeader(headerBuilder)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 578d666..fe68da4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -110,7 +110,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
if (addRequest.hasWriteFlags()) {
writeFlags = WriteFlag.getWriteFlags(addRequest.getWriteFlags());
} else {
- writeFlags = EnumSet.noneOf(WriteFlag.class);
+ writeFlags = WriteFlag.NONE;
}
final boolean ackBeforeSync =
writeFlags.contains(WriteFlag.DEFERRED_SYNC);
StatusCode status = null;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 1acc6fd..f1521f4 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.client;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -39,7 +40,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import
org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
+import org.apache.bookkeeper.client.BKException.BKIllegalOpException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -779,4 +783,22 @@ public class BookKeeperTest extends
BookKeeperClusterTestCase {
latch.await();
bkc.close();
}
+
+ @Test(expected = BKIllegalOpException.class)
+ public void testCannotUseWriteFlagsOnV2Protocol() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+ conf.setUseV2WireProtocol(true);
+ try (BookKeeperTestClient bkc = new BookKeeperTestClient(conf);) {
+ try (WriteHandle wh = result(bkc.newCreateLedgerOp()
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .withPassword("".getBytes())
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .execute())) {
+ result(wh.appendAsync("test".getBytes()));
+ }
+ }
+ }
+
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
new file mode 100644
index 0000000..4bd5a8c
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.junit.Test;
+
+/**
+ * Client side tests on deferred sync write flag.
+ */
+public class DeferredSyncTest extends MockBookKeeperTestCase {
+
+ static final byte[] PASSWORD = "password".getBytes();
+ static final ByteBuf DATA = Unpooled.wrappedBuffer("foobar".getBytes());
+ static final int NUM_ENTRIES = 100;
+
+ @Test
+ public void testAddEntryLastAddConfirmedDoesNotAdvance() throws Exception {
+ try (WriteHandle wh = result(newCreateLedgerOp()
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .withPassword(PASSWORD)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .execute())) {
+ for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+ result(wh.appendAsync(DATA));
+ }
+ long lastEntryID = result(wh.appendAsync(DATA));
+ assertEquals(NUM_ENTRIES - 1, lastEntryID);
+ LedgerHandle lh = (LedgerHandle) wh;
+ assertEquals(NUM_ENTRIES - 1, lh.getLastAddPushed());
+ assertEquals(-1, lh.getLastAddConfirmed());
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 433591c..b853614 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -36,6 +36,7 @@ import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -49,7 +50,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.BKException.Code;
-import org.apache.bookkeeper.client.LedgerCreateOp.CreateBuilderImpl;
+import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -213,7 +214,7 @@ public abstract class MockBookKeeperTestCase {
when(bk.getConf()).thenReturn(config);
}
- protected CreateBuilderImpl newCreateLedgerOp() {
+ protected CreateBuilder newCreateLedgerOp() {
return new LedgerCreateOp.CreateBuilderImpl(bk);
}
@@ -522,13 +523,7 @@ public abstract class MockBookKeeperTestCase {
anyLong(), any(byte[].class),
anyLong(), any(ByteBufList.class),
any(BookkeeperInternalCallbacks.WriteCallback.class),
- any(), anyInt());
-
- stub.when(bookieClient).addEntry(any(BookieSocketAddress.class),
- anyLong(), any(byte[].class),
- anyLong(), any(ByteBufList.class),
- any(BookkeeperInternalCallbacks.WriteCallback.class),
- any(), anyInt(), anyBoolean());
+ any(), anyInt(), anyBoolean(), any(EnumSet.class));
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index 838ed5b..a0d339c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -27,7 +27,6 @@ import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.EnumSet;
import java.util.Enumeration;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@@ -61,7 +60,7 @@ public class MockLedgerHandle extends LedgerHandle {
MockLedgerHandle(MockBookKeeper bk, long id, DigestType digest, byte[]
passwd) throws GeneralSecurityException {
super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC,
"".getBytes()), DigestType.MAC, "".getBytes(),
- EnumSet.noneOf(WriteFlag.class));
+ WriteFlag.NONE);
this.bk = bk;
this.id = id;
this.digest = digest;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index a6cd902..b21386b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
@@ -434,7 +435,7 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
addSuccess.set(BKException.Code.OK == rc);
addLatch.countDown();
}
- }, 0, BookieProtocol.FLAG_NONE);
+ }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
addLatch.await();
assertTrue("add entry 14 should succeed", addSuccess.get());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
index e9af76e..f428b1f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
@@ -29,6 +29,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +62,7 @@ public class PendingAddOpTest {
public void testExecuteAfterCancelled() {
AtomicInteger rcHolder = new AtomicInteger(-0xdead);
PendingAddOp op = PendingAddOp.create(
- lh, payload, (rc, handle, entryId, qwcLatency, ctx) -> {
+ lh, payload, WriteFlag.NONE, (rc, handle, entryId, qwcLatency,
ctx) -> {
rcHolder.set(rc);
}, null);
assertSame(lh, op.lh);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index c620cc1..883dc9c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.fail;
import com.google.common.collect.Sets;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.Enumeration;
import java.util.Map.Entry;
import java.util.Set;
@@ -248,7 +247,7 @@ public class TestLedgerFragmentReplication extends
BookKeeperClusterTestCase {
}
};
LedgerHandle lh = new LedgerHandle(bkc, 0, metadata, TEST_DIGEST_TYPE,
- TEST_PSSWD, EnumSet.noneOf(WriteFlag.class));
+ TEST_PSSWD, WriteFlag.NONE);
testSplitIntoSubFragments(10, 21, -1, 1, lh);
testSplitIntoSubFragments(10, 21, 20, 1, lh);
testSplitIntoSubFragments(0, 0, 10, 1, lh);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index cac76d4..b0385b1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -218,7 +218,7 @@ public class BookKeeperBuildersTest extends
MockBookKeeperTestCase {
assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
assertArrayEquals(password, metadata.getPassword());
LedgerHandle lh = (LedgerHandle) writer;
- assertEquals(EnumSet.noneOf(WriteFlag.class), lh.getWriteFlags());
+ assertEquals(WriteFlag.NONE, lh.getWriteFlags());
}
@Test
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
index c51bdf5..746535a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
@@ -41,7 +41,7 @@ public class WriteFlagTest {
@Test
public void testGetWriteFlagsNone() {
- assertEquals(EnumSet.noneOf(WriteFlag.class),
+ assertEquals(WriteFlag.NONE,
WriteFlag.getWriteFlags(NONE));
}
@@ -52,7 +52,7 @@ public class WriteFlagTest {
@Test
public void testGetWriteFlagsValueEmpty() {
- assertEquals(0,
WriteFlag.getWriteFlagsValue(EnumSet.noneOf(WriteFlag.class)));
+ assertEquals(0, WriteFlag.getWriteFlagsValue(WriteFlag.NONE));
}
@Test
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 7acdb86..7ff6386 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -158,7 +159,7 @@ public class BookieClientTest {
BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
scheduler,
NullStatsLogger.INSTANCE);
ByteBufList bb = createByteBuffer(1, 1, 1);
- bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc,
BookieProtocol.FLAG_NONE);
+ bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc,
BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
synchronized (arc) {
arc.wait(1000);
assertEquals(0, arc.rc);
@@ -168,16 +169,16 @@ public class BookieClientTest {
assertEquals(1, arc.entry.getInt());
}
bb = createByteBuffer(2, 1, 2);
- bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null,
BookieProtocol.FLAG_NONE);
+ bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null,
BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
bb = createByteBuffer(3, 1, 3);
- bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null,
BookieProtocol.FLAG_NONE);
+ bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null,
BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
bb = createByteBuffer(5, 1, 5);
- bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null,
BookieProtocol.FLAG_NONE);
+ bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null,
BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
bb = createByteBuffer(7, 1, 7);
- bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null,
BookieProtocol.FLAG_NONE);
+ bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null,
BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
synchronized (notifyObject) {
bb = createByteBuffer(11, 1, 11);
- bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject,
BookieProtocol.FLAG_NONE);
+ bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject,
BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
notifyObject.wait();
}
synchronized (arc) {
--
To stop receiving notification emails like this one, please contact
[email protected].