sijie closed pull request #742: Issue-744 BP-18 introduce write flags
URL: https://github.com/apache/bookkeeper/pull/742
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index 38ed3c5d6..ffd0f422c 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -112,6 +112,7 @@ message AddRequest {
required int64 entryId = 2;
required bytes masterKey = 3;
required bytes body = 4;
+ optional int32 writeFlags = 5;
}
message StartTLSRequest {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 69e113993..a69c17011 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -32,6 +32,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -55,6 +56,7 @@
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
@@ -760,7 +762,8 @@ public void asyncCreateLedger(final int ensSize, final int
writeQuorumSize, fina
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
- ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata)
+ ackQuorumSize, digestType, passwd, cb, ctx,
+ customMetadata, EnumSet.noneOf(WriteFlag.class))
.initiate();
} finally {
closeLock.readLock().unlock();
@@ -963,7 +966,9 @@ public void asyncCreateLedgerAdv(final int ensSize, final
int writeQuorumSize, f
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
- ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata).initiateAdv((long) (-1));
+ ackQuorumSize, digestType, passwd, cb, ctx,
+ customMetadata, EnumSet.noneOf(WriteFlag.class))
+ .initiateAdv(-1L);
} finally {
closeLock.readLock().unlock();
}
@@ -1072,7 +1077,9 @@ public void asyncCreateLedgerAdv(final long ledgerId,
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
- ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata).initiateAdv(ledgerId);
+ ackQuorumSize, digestType, passwd, cb, ctx,
+ customMetadata, EnumSet.noneOf(WriteFlag.class))
+ .initiateAdv(ledgerId);
} finally {
closeLock.readLock().unlock();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 77d2ab2cf..a3d523901 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -24,6 +24,7 @@
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@
import org.apache.bookkeeper.client.api.CreateAdvBuilder;
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -62,6 +64,7 @@
final byte[] passwd;
final BookKeeper bk;
final DigestType digestType;
+ final EnumSet<WriteFlag> writeFlags;
final long startTime;
final OpStatsLogger createOpLogger;
boolean adv = false;
@@ -91,7 +94,8 @@
* preserve the order(e.g. sortedMap) upon later retireval.
*/
LedgerCreateOp(BookKeeper bk, int ensembleSize, int writeQuorumSize, int
ackQuorumSize, DigestType digestType,
- byte[] passwd, CreateCallback cb, Object ctx, final Map<String,
byte[]> customMetadata) {
+ byte[] passwd, CreateCallback cb, Object ctx, final Map<String,
byte[]> customMetadata,
+ EnumSet<WriteFlag> writeFlags) {
this.bk = bk;
this.metadata = new LedgerMetadata(
ensembleSize,
@@ -102,6 +106,7 @@
customMetadata,
bk.getConf().getStoreSystemtimeAsLedgerCreationTime());
this.digestType = digestType;
+ this.writeFlags = writeFlags;
this.passwd = passwd;
this.cb = cb;
this.ctx = ctx;
@@ -189,9 +194,11 @@ public void operationComplete(int rc, Void result) {
try {
if (adv) {
- lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType,
passwd);
+ lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType,
+ passwd, writeFlags);
} else {
- lh = new LedgerHandle(bk, ledgerId, metadata, digestType,
passwd);
+ lh = new LedgerHandle(bk, ledgerId, metadata, digestType,
+ passwd, writeFlags);
}
} catch (GeneralSecurityException e) {
LOG.error("Security exception while creating ledger: " + ledgerId,
e);
@@ -223,6 +230,7 @@ private void createComplete(int rc, LedgerHandle lh) {
private int builderAckQuorumSize = 2;
private int builderWriteQuorumSize = 2;
private byte[] builderPassword;
+ private EnumSet<WriteFlag> builderWriteFlags =
EnumSet.noneOf(WriteFlag.class);
private org.apache.bookkeeper.client.api.DigestType builderDigestType =
org.apache.bookkeeper.client.api.DigestType.CRC32;
private Map<String, byte[]> builderCustomMetadata =
Collections.emptyMap();
@@ -237,6 +245,12 @@ public CreateBuilder withEnsembleSize(int ensembleSize) {
return this;
}
+ @Override
+ public CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags) {
+ this.builderWriteFlags = writeFlags;
+ return this;
+ }
+
@Override
public CreateBuilder withWriteQuorumSize(int writeQuorumSize) {
this.builderWriteQuorumSize = writeQuorumSize;
@@ -273,6 +287,11 @@ public CreateAdvBuilder makeAdv() {
}
private boolean validate() {
+ if (builderWriteFlags == null) {
+ LOG.error("invalid null writeFlags");
+ return false;
+ }
+
if (builderWriteQuorumSize > builderEnsembleSize) {
LOG.error("invalid writeQuorumSize {} > ensembleSize {}",
builderWriteQuorumSize, builderEnsembleSize);
return false;
@@ -322,7 +341,7 @@ private void create(CreateCallback cb) {
}
LedgerCreateOp op = new LedgerCreateOp(bk, builderEnsembleSize,
builderWriteQuorumSize, builderAckQuorumSize,
DigestType.fromApiDigestType(builderDigestType),
- builderPassword, cb, null, builderCustomMetadata);
+ builderPassword, cb, null, builderCustomMetadata,
builderWriteFlags);
ReentrantReadWriteLock closeLock = bk.getCloseLock();
closeLock.readLock().lock();
try {
@@ -380,7 +399,8 @@ private void create(CreateCallback cb) {
LedgerCreateOp op = new LedgerCreateOp(parent.bk,
parent.builderEnsembleSize,
parent.builderWriteQuorumSize, parent.builderAckQuorumSize,
DigestType.fromApiDigestType(parent.builderDigestType),
- parent.builderPassword, cb, null,
parent.builderCustomMetadata);
+ parent.builderPassword, cb, null,
parent.builderCustomMetadata,
+ parent.builderWriteFlags);
ReentrantReadWriteLock closeLock = parent.bk.getCloseLock();
closeLock.readLock().lock();
try {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 5c15376f8..936f181d1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -36,6 +36,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
@@ -65,6 +66,7 @@
import org.apache.bookkeeper.client.api.BKException.Code;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
@@ -104,6 +106,7 @@
final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
final boolean enableParallelRecoveryRead;
final int recoveryReadBatchSize;
+ final EnumSet<WriteFlag> writeFlags;
ScheduledFuture<?> timeoutFuture = null;
/**
@@ -132,13 +135,14 @@
}
LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
- DigestType digestType, byte[] password)
+ DigestType digestType, byte[] password, EnumSet<WriteFlag>
writeFlags)
throws GeneralSecurityException, NumberFormatException {
this.bk = bk;
this.metadata = metadata;
this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
this.enableParallelRecoveryRead =
bk.getConf().getEnableParallelRecoveryRead();
this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
+ this.writeFlags = writeFlags;
if (metadata.isClosed()) {
lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
@@ -216,6 +220,11 @@ public long getId() {
return ledgerId;
}
+ @VisibleForTesting
+ public EnumSet<WriteFlag> getWriteFlags() {
+ return writeFlags;
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index d0ca9da1b..435c4535f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -23,23 +23,24 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.io.Serializable;
import java.security.GeneralSecurityException;
import java.util.Comparator;
+import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Ledger Advanced handle extends {@link LedgerHandle} to provide API to add
entries with
* user supplied entryIds. Through this interface Ledger Length may not be
accurate while the
@@ -54,9 +55,10 @@ public int compare(PendingAddOp o1, PendingAddOp o2) {
}
}
- LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password)
+ LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
+ DigestType digestType, byte[] password, EnumSet<WriteFlag>
writeFlags)
throws GeneralSecurityException, NumberFormatException {
- super(bk, ledgerId, metadata, digestType, password);
+ super(bk, ledgerId, metadata, digestType, password, writeFlags);
pendingAddOps = new PriorityBlockingQueue<PendingAddOp>(10, new
PendingOpsComparator());
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index b1d159679..d324572c9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -21,12 +21,14 @@
package org.apache.bookkeeper.client;
import java.security.GeneralSecurityException;
+import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.util.SafeRunnable;
@@ -73,7 +75,7 @@ public String toString() {
ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password, boolean watch)
throws GeneralSecurityException, NumberFormatException {
- super(bk, ledgerId, metadata, digestType, password);
+ super(bk, ledgerId, metadata, digestType, password,
EnumSet.noneOf(WriteFlag.class));
if (watch) {
bk.getLedgerManager().registerLedgerMetadataListener(ledgerId,
this);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
index cd5fd3d7c..aa6ad6d4b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client.api;
+import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Map;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
@@ -91,6 +93,26 @@
*/
CreateBuilder withDigestType(DigestType digestType);
+ /**
+ * Set write flags. Write wlags specify the behaviour of writes
+ *
+ * @param writeFlags the flags
+ *
+ * @return the builder itself
+ */
+ CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags);
+
+ /**
+ * Set write flags. Write wlags specify the behaviour of writes
+ *
+ * @param writeFlags the flags
+ *
+ * @return the builder itself
+ */
+ default CreateBuilder withWriteFlags(WriteFlag ... writeFlags) {
+ return withWriteFlags(EnumSet.copyOf(Arrays.asList(writeFlags)));
+ }
+
/**
* Switch the ledger into 'Advanced' mode. A ledger used in Advanced mode
will explicitly generate the sequence of
* entry identifiers. Advanced ledgers can be created with a client side
defined ledgerId
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
new file mode 100644
index 000000000..a680b8c62
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.EnumSet;
+import lombok.Getter;
+
+/**
+ * Flags to specify the behaviour of writes.
+ */
+@Getter
+public enum WriteFlag {
+
+ /**
+ * Writes will be acknowledged after writing to the filesystem
+ * but not yet been persisted to disks.
+ */
+ DEFERRED_SYNC(0x1 << 0);
+
+ private final int value;
+
+ WriteFlag(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Converts a set of flags from a binary representation.
+ *
+ * @param flagValue the binary value
+ * @return a set of flags
+ */
+ public static EnumSet<WriteFlag> getWriteFlags(int flagValue) {
+ if ((flagValue & DEFERRED_SYNC.value) == DEFERRED_SYNC.value) {
+ return EnumSet.of(DEFERRED_SYNC);
+ }
+ return EnumSet.noneOf(WriteFlag.class);
+ }
+
+ /**
+ * Converts a set of flags from a binary representation.
+ *
+ * @param flags the flags
+ * @return the binary representation
+ */
+ public static int getWriteFlagsValue(EnumSet<WriteFlag> flags) {
+ int result = 0;
+ for (WriteFlag flag : flags) {
+ result |= flag.value;
+ }
+ return result;
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index 397f87245..a4d60aaa9 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -27,6 +27,7 @@
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Enumeration;
import java.util.Map.Entry;
import java.util.Set;
@@ -34,6 +35,7 @@
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -249,7 +251,7 @@ public boolean isClosed() {
}
};
LedgerHandle lh = new LedgerHandle(bkc, 0, metadata, TEST_DIGEST_TYPE,
- TEST_PSSWD);
+ TEST_PSSWD, EnumSet.noneOf(WriteFlag.class));
testSplitIntoSubFragments(10, 21, -1, 1, lh);
testSplitIntoSubFragments(10, 21, 20, 1, lh);
testSplitIntoSubFragments(0, 0, 10, 1, lh);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index fcb35a178..3c2989512 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -20,11 +20,13 @@
*/
package org.apache.bookkeeper.client.api;
+import static org.apache.bookkeeper.client.api.WriteFlag.DEFERRED_SYNC;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -32,6 +34,7 @@
import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.MockBookKeeperTestCase;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -51,6 +54,7 @@
private static final Map<String, byte[]> customMetadata = new HashMap<>();
private static final byte[] password = new byte[3];
private static final byte[] entryData = new byte[32];
+ private static final EnumSet<WriteFlag> writeFlagsDeferredSync =
EnumSet.of(DEFERRED_SYNC);
@Test
public void testCreateLedger() throws Exception {
@@ -88,6 +92,14 @@ public void testFailWriteQuorumSize0() throws Exception {
.execute());
}
+ @Test(expected = BKIncorrectParameterException.class)
+ public void testFailNullWriteFlags() throws Exception {
+ result(newCreateLedgerOp()
+ .withWriteFlags((EnumSet<WriteFlag>) null)
+ .withPassword(password)
+ .execute());
+ }
+
@Test(expected = BKIncorrectParameterException.class)
public void testFailAckQuorumSize0() throws Exception {
result(newCreateLedgerOp()
@@ -191,6 +203,94 @@ public void testCreateAdvLedger() throws Exception {
assertArrayEquals(password, metadata.getPassword());
}
+ @Test
+ public void testDefaultWriteFlagsEmpty() throws Exception {
+ setNewGeneratedLedgerId(ledgerId);
+ WriteHandle writer = newCreateLedgerOp()
+ .withAckQuorumSize(ackQuorumSize)
+ .withEnsembleSize(ensembleSize)
+ .withPassword(password)
+ .withWriteQuorumSize(writeQuorumSize)
+ .withCustomMetadata(customMetadata)
+ .execute()
+ .get();
+ assertEquals(ledgerId, writer.getId());
+ LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+ assertEquals(ensembleSize, metadata.getEnsembleSize());
+ assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+ assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+ assertArrayEquals(password, metadata.getPassword());
+ LedgerHandle lh = (LedgerHandle) writer;
+ assertEquals(EnumSet.noneOf(WriteFlag.class), lh.getWriteFlags());
+ }
+
+ @Test
+ public void testCreateAdvLedgerWriteFlags() throws Exception {
+ setNewGeneratedLedgerId(ledgerId);
+ WriteAdvHandle writer = newCreateLedgerOp()
+ .withAckQuorumSize(ackQuorumSize)
+ .withEnsembleSize(ensembleSize)
+ .withPassword(password)
+ .withWriteQuorumSize(writeQuorumSize)
+ .withCustomMetadata(customMetadata)
+ .withWriteFlags(writeFlagsDeferredSync)
+ .makeAdv()
+ .execute()
+ .get();
+ assertEquals(ledgerId, writer.getId());
+ LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+ assertEquals(ensembleSize, metadata.getEnsembleSize());
+ assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+ assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+ assertArrayEquals(password, metadata.getPassword());
+ LedgerHandle lh = (LedgerHandle) writer;
+ assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+ }
+
+ @Test
+ public void testCreateLedgerWriteFlags() throws Exception {
+ setNewGeneratedLedgerId(ledgerId);
+ WriteHandle writer = newCreateLedgerOp()
+ .withAckQuorumSize(ackQuorumSize)
+ .withEnsembleSize(ensembleSize)
+ .withPassword(password)
+ .withWriteQuorumSize(writeQuorumSize)
+ .withCustomMetadata(customMetadata)
+ .withWriteFlags(writeFlagsDeferredSync)
+ .execute()
+ .get();
+ assertEquals(ledgerId, writer.getId());
+ LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+ assertEquals(ensembleSize, metadata.getEnsembleSize());
+ assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+ assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+ assertArrayEquals(password, metadata.getPassword());
+ LedgerHandle lh = (LedgerHandle) writer;
+ assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+ }
+
+ @Test
+ public void testCreateLedgerWriteFlagsVarargs() throws Exception {
+ setNewGeneratedLedgerId(ledgerId);
+ WriteHandle writer = newCreateLedgerOp()
+ .withAckQuorumSize(ackQuorumSize)
+ .withEnsembleSize(ensembleSize)
+ .withPassword(password)
+ .withWriteQuorumSize(writeQuorumSize)
+ .withCustomMetadata(customMetadata)
+ .withWriteFlags(DEFERRED_SYNC)
+ .execute()
+ .get();
+ assertEquals(ledgerId, writer.getId());
+ LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+ assertEquals(ensembleSize, metadata.getEnsembleSize());
+ assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+ assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+ assertArrayEquals(password, metadata.getPassword());
+ LedgerHandle lh = (LedgerHandle) writer;
+ assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+ }
+
@Test(expected = BKIncorrectParameterException.class)
public void testFailCreateAdvLedgerBadFixedLedgerIdMinus1() throws
Exception {
result(newCreateLedgerOp()
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
new file mode 100644
index 000000000..c51bdf5bc
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import static org.apache.bookkeeper.client.api.WriteFlag.DEFERRED_SYNC;
+import static org.junit.Assert.assertEquals;
+
+import java.util.EnumSet;
+import org.junit.Test;
+
+/**
+ * Unit tests for WriteFlag.
+ */
+public class WriteFlagTest {
+
+ private static final int NONE = 0;
+
+ @Test
+ public void testGetWriteFlagsDeferredSync() {
+ assertEquals(EnumSet.of(DEFERRED_SYNC),
+ WriteFlag.getWriteFlags(DEFERRED_SYNC.getValue()));
+ }
+
+ @Test
+ public void testGetWriteFlagsNone() {
+ assertEquals(EnumSet.noneOf(WriteFlag.class),
+ WriteFlag.getWriteFlags(NONE));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetWriteFlagsValueNull() {
+ WriteFlag.getWriteFlagsValue(null);
+ }
+
+ @Test
+ public void testGetWriteFlagsValueEmpty() {
+ assertEquals(0,
WriteFlag.getWriteFlagsValue(EnumSet.noneOf(WriteFlag.class)));
+ }
+
+ @Test
+ public void testGetWriteFlagsValueDeferredSync() {
+ assertEquals(1,
WriteFlag.getWriteFlagsValue(EnumSet.of(DEFERRED_SYNC)));
+ }
+}
diff --git a/site/docs/latest/api/ledger-api.md
b/site/docs/latest/api/ledger-api.md
index 255ef80be..2303aaab4 100644
--- a/site/docs/latest/api/ledger-api.md
+++ b/site/docs/latest/api/ledger-api.md
@@ -471,3 +471,307 @@ mvn exec:java -Dexec.mainClass=org.apache.bookkeeper.Dice
Value = 3, isLeader = true
Value = 1, isLeader = false
```
+
+## New API
+
+Since 4.6 BookKeeper provides a new client API which leverages Java8
[CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html)
facility.
+[WriteHandle](../javadoc/org/apache/bookkeeper/client/api/WriteHandle),
[WriteAdvHandle](../javadoc/org/apache/bookkeeper/client/api/WriteAdvHandle),
[ReadHandle](../javadoc/org/apache/bookkeeper/client/api/ReadHandle) are
introduced for replacing the generic
[LedgerHandle](../javadoc/org/apache/bookkeeper/client/LedgerHandle).
+
+> All the new API now is available in `org.apache.bookkeeper.client.api`. You
should only use interfaces defined in this package.
+
+*Beware* that this API in 4.6 is still experimental API and can be subject to
changes in next minor releases.
+
+### Create a new client
+
+In order to create a new
[`BookKeeper`](../javadoc/org/apache/bookkeeper/client/api/BookKeeper) client
object, you need to construct a
[`ClientConfiguration`](../javadoc/org/apache/bookkeeper/conf/ClientConfiguration)
object and set a [connection string](#connection-string) first, and then use
[`BookKeeperBuilder`](../javadoc/org/apache/bookkeeper/client/api/BookKeeperBuilder)
to build the client.
+
+Here is an example building the bookkeeper client.
+
+```java
+// construct a client configuration instance
+ClientConfiguration conf = new ClientConfiguration();
+conf.setZkServers(zkConnectionString);
+conf.setZkLedgersRootPath("/path/to/ledgers/root");
+
+// build the bookkeeper client
+BookKeeper bk = BookKeeper.newBuilder(conf)
+ .statsLogger(...)
+ ...
+ .build();
+
+```
+
+### Create ledgers
+
+the easiest way to create a {% pop ledger %} using the java client is via the
[`createbuilder`](../javadoc/org/apache/bookkeeper/client/api/createbuilder).
you must specify at least
+a [`digesttype`](../javadoc/org/apache/bookkeeper/client/api/digesttype) and a
password.
+
+here's an example:
+
+```java
+BookKeeper bk = ...;
+
+byte[] password = "some-password".getBytes();
+
+WriteHandle wh = bk.newCreateLedgerOp()
+ .withDigestType(DigestType.CRC32)
+ .withPassword(password)
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .execute() // execute the creation op
+ .get(); // wait for the execution to complete
+```
+
+A [`WriteHandle`](../javadoc/org/apache/bookkeeper/client/api/WriteHandle) is
returned for applications to write and read entries to and from the ledger.
+
+### Write flags
+
+You can specify behaviour of the writer by setting
[`WriteFlags`](../javadoc/org/apache/bookkeeper/client/api/WriteFlag) at ledger
creation type.
+These flags are applied only during write operations and are not recorded on
metadata.
+
+
+Available write flags:
+
+| Flag | Explanation | Notes |
+:---------|:------------|:-------
+DEFERRED_SYNC | Writes are acknowledged early, without waiting for
+guarantees of durability | Data will be only written to the OS page cache,
without forcing an fsync.
+
+```java
+BookKeeper bk = ...;
+
+byte[] password = "some-password".getBytes();
+
+WriteHandle wh = bk.newCreateLedgerOp()
+ .withDigestType(DigestType.CRC32)
+ .withPassword(password)
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .withWriteFlags(DEFERRED_SYNC)
+ .execute() // execute the creation op
+ .get(); // wait for the execution to complete
+```
+
+
+### Append entries to ledgers
+
+The [`WriteHandle`](../javadoc/org/apache/bookkeeper/client/api/WriteHandle)
can be used for applications to append entries to the ledgers.
+
+```java
+WriteHandle wh = ...;
+
+CompletableFuture<Long> addFuture = wh.append("Some entry data".getBytes());
+
+// option 1: you can wait for add to complete synchronously
+try {
+ long entryId = FutureUtils.result(addFuture.get());
+} catch (BKException bke) {
+ // error handling
+}
+
+// option 2: you can process the result and exception asynchronously
+addFuture
+ .thenApply(entryId -> {
+ // process the result
+ })
+ .exceptionally(cause -> {
+ // handle the exception
+ })
+
+// option 3: bookkeeper provides a twitter-future-like event listener for
processing result and exception asynchronously
+addFuture.whenComplete(new FutureEventListener() {
+ @Override
+ public void onSuccess(long entryId) {
+ // process the result
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ // handle the exception
+ }
+});
+```
+
+The append method supports three representations of a bytes array: the native
java `byte[]`, java nio `ByteBuffer` and netty `ByteBuf`.
+It is recommended to use `ByteBuf` as it is more gc friendly.
+
+### Open ledgers
+
+You can open ledgers to read entries. Opening ledgers is done by
[`openBuilder`](../javadoc/org/apache/bookkeeper/client/api/openBuilder). You
must specify the ledgerId and the password
+in order to open the ledgers.
+
+here's an example:
+
+```java
+BookKeeper bk = ...;
+
+long ledgerId = ...;
+byte[] password = "some-password".getBytes();
+
+ReadHandle rh = bk.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withPassword(password)
+ .execute() // execute the open op
+ .get(); // wait for the execution to complete
+```
+
+A [`ReadHandle`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle) is
returned for applications to read entries to and from the ledger.
+
+#### Recovery vs NoRecovery
+
+By default, the
[`openBuilder`](../javadoc/org/apache/bookkeeper/client/api/openBuilder) opens
the ledger in a `NoRecovery` mode. You can open the ledger in `Recovery` mode
by specifying
+`withRecovery(true)` in the open builder.
+
+```java
+BookKeeper bk = ...;
+
+long ledgerId = ...;
+byte[] password = "some-password".getBytes();
+
+ReadHandle rh = bk.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withPassword(password)
+ .withRecovery(true)
+ .execute()
+ .get();
+
+```
+
+**What is the difference between "Recovery" and "NoRecovery"?**
+
+If you are opening a ledger in "Recovery" mode, it will basically fence and
seal the ledger -- no more entries are allowed
+to be appended to it. The writer which is currently appending entries to the
ledger will fail with
[`LedgerFencedException`](../javadoc/org/apache/bookkeeper/client/api/BKException.Code#LedgerFencedException).
+
+In constrat, opening a ledger in "NoRecovery" mode, it will not fence and seal
the ledger. "NoRecovery" mode is usually used by applications to tailing-read
from a ledger.
+
+### Read entries from ledgers
+
+The [`ReadHandle`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle)
returned from the open builder can be used for applications to read entries
from the ledgers.
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = ...;
+long endEntryId = ...;
+CompletableFuture<LedgerEntries> readFuture = rh.read(startEntryId,
endEntryId);
+
+// option 1: you can wait for read to complete synchronously
+try {
+ LedgerEntries entries = FutureUtils.result(readFuture.get());
+} catch (BKException bke) {
+ // error handling
+}
+
+// option 2: you can process the result and exception asynchronously
+readFuture
+ .thenApply(entries -> {
+ // process the result
+ })
+ .exceptionally(cause -> {
+ // handle the exception
+ })
+
+// option 3: bookkeeper provides a twitter-future-like event listener for
processing result and exception asynchronously
+readFuture.whenComplete(new FutureEventListener<>() {
+ @Override
+ public void onSuccess(LedgerEntries entries) {
+ // process the result
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ // handle the exception
+ }
+});
+```
+
+Once you are done with processing the
[`LedgerEntries`](../javadoc/org/apache/bookkeeper/client/api/LedgerEntries),
you can call `#close()` on the `LedgerEntries` instance to
+release the buffers held by it.
+
+Applications are allowed to read any entries between `0` and
[`LastAddConfirmed`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle.html#getLastAddConfirmed).
If the applications
+attempts to read entries beyond `LastAddConfirmed`, they will receive
[`IncorrectParameterException`](../javadoc/org/apache/bookkeeper/client/api/BKException.Code#IncorrectParameterException).
+
+### Read unconfirmed entries from ledgers
+
+`readUnconfirmed` is provided the mechanism for applications to read entries
beyond `LastAddConfirmed`. Applications should be aware of `readUnconfirmed`
doesn't provide any
+repeatable read consistency.
+
+```java
+CompletableFuture<LedgerEntries> readFuture = rh.readUnconfirmed(startEntryId,
endEntryId);
+```
+
+### Tailing Reads
+
+There are two methods for applications to achieve tailing reads: `Polling` and
`Long-Polling`.
+
+#### Polling
+
+You can do this in synchronous way:
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = 0L;
+long nextEntryId = startEntryId;
+int numEntriesPerBatch = 4;
+while (!rh.isClosed() || nextEntryId <= rh.getLastAddConfirmed()) {
+ long lac = rh.getLastAddConfirmed();
+ if (nextEntryId > lac) {
+ // no more entries are added
+ Thread.sleep(1000);
+
+ lac = rh.readLastAddConfirmed().get();
+ continue;
+ }
+
+ long endEntryId = Math.min(lac, nextEntryId + numEntriesPerBatch - 1);
+ LedgerEntries entries = rh.read(nextEntryId, endEntryId).get();
+
+ // process the entries
+
+ nextEntryId = endEntryId + 1;
+}
+```
+
+#### Long Polling
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = 0L;
+long nextEntryId = startEntryId;
+int numEntriesPerBatch = 4;
+while (!rh.isClosed() || nextEntryId <= rh.getLastAddConfirmed()) {
+ long lac = rh.getLastAddConfirmed();
+ if (nextEntryId > lac) {
+ // no more entries are added
+ try (LastConfirmedAndEntry lacAndEntry =
rh.readLastAddConfirmedAndEntry(nextEntryId, 1000, false).get()) {
+ if (lacAndEntry.hasEntry()) {
+ // process the entry
+
+ ++nextEntryId;
+ }
+ }
+ } else {
+ long endEntryId = Math.min(lac, nextEntryId + numEntriesPerBatch - 1);
+ LedgerEntries entries = rh.read(nextEntryId, endEntryId).get();
+
+ // process the entries
+ nextEntryId = endEntryId + 1;
+ }
+}
+```
+
+### Delete ledgers
+
+{% pop Ledgers %} can be deleted by using
[`DeleteBuilder`](../javadoc/org/apache/bookkeeper/client/api/DeleteBuilder).
+
+```java
+BookKeeper bk = ...;
+long ledgerId = ...;
+
+bk.newDeleteLedgerOp()
+ .withLedgerId(ledgerId)
+ .execute()
+ .get();
+```
diff --git a/site/docs/latest/api/overview.md b/site/docs/latest/api/overview.md
index 3eb649273..3e0adcd61 100644
--- a/site/docs/latest/api/overview.md
+++ b/site/docs/latest/api/overview.md
@@ -5,7 +5,7 @@ title: BookKeeper API
BookKeeper offers a few APIs that applications can use to interact with it:
* The [ledger API](../ledger-api) is a lower-level API that enables you to
interact with {% pop ledgers %} directly
-* The [Ledger Advanced API)(../ledger-adv-api) is an advanced extension to
[Ledger API](../ledger-api) to provide more flexibilities to applications.
+* The [Ledger Advanced API](../ledger-adv-api) is an advanced extension to
[Ledger API](../ledger-api) to provide more flexibilities to applications.
* The [DistributedLog API](../distributedlog-api) is a higher-level API that
provides convenient abstractions.
## Trade-offs
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services