This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3e4ff0d ISSUE #744: BP-18 introduce write flags
3e4ff0d is described below
commit 3e4ff0dca8557ff03dda2d292c3c6678df8f1a02
Author: eolivelli <[email protected]>
AuthorDate: Wed Dec 13 15:43:50 2017 -0800
ISSUE #744: BP-18 introduce write flags
Introduce writeflags on the API and on protobuf files.
Client and server do not use these flags yet.
Author: eolivelli <[email protected]>
Author: Enrico Olivelli <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #742 from eolivelli/bp14-writeflags, closes #744
---
.../src/main/proto/BookkeeperProtocol.proto | 1 +
.../org/apache/bookkeeper/client/BookKeeper.java | 13 +-
.../apache/bookkeeper/client/LedgerCreateOp.java | 30 +-
.../org/apache/bookkeeper/client/LedgerHandle.java | 11 +-
.../apache/bookkeeper/client/LedgerHandleAdv.java | 10 +-
.../bookkeeper/client/ReadOnlyLedgerHandle.java | 4 +-
.../bookkeeper/client/api/CreateBuilder.java | 22 ++
.../apache/bookkeeper/client/api/WriteFlag.java | 70 +++++
.../client/TestLedgerFragmentReplication.java | 4 +-
.../client/api/BookKeeperBuildersTest.java | 100 +++++++
.../bookkeeper/client/api/WriteFlagTest.java | 62 +++++
site/docs/latest/api/ledger-api.md | 304 +++++++++++++++++++++
site/docs/latest/api/overview.md | 2 +-
13 files changed, 617 insertions(+), 16 deletions(-)
diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index 38ed3c5..ffd0f42 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -112,6 +112,7 @@ message AddRequest {
required int64 entryId = 2;
required bytes masterKey = 3;
required bytes body = 4;
+ optional int32 writeFlags = 5;
}
message StartTLSRequest {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index d0f8a64..17df48c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -32,6 +32,7 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -55,6 +56,7 @@ import org.apache.bookkeeper.client.api.BookKeeperBuilder;
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
@@ -760,7 +762,8 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
- ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata)
+ ackQuorumSize, digestType, passwd, cb, ctx,
+ customMetadata, EnumSet.noneOf(WriteFlag.class))
.initiate();
} finally {
closeLock.readLock().unlock();
@@ -963,7 +966,9 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
- ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata).initiateAdv((long) (-1));
+ ackQuorumSize, digestType, passwd, cb, ctx,
+ customMetadata, EnumSet.noneOf(WriteFlag.class))
+ .initiateAdv(-1L);
} finally {
closeLock.readLock().unlock();
}
@@ -1072,7 +1077,9 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
- ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata).initiateAdv(ledgerId);
+ ackQuorumSize, digestType, passwd, cb, ctx,
+ customMetadata, EnumSet.noneOf(WriteFlag.class))
+ .initiateAdv(ledgerId);
} finally {
closeLock.readLock().unlock();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 77d2ab2..a3d5239 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.client;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@ import
org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback;
import org.apache.bookkeeper.client.api.CreateAdvBuilder;
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -62,6 +64,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
final byte[] passwd;
final BookKeeper bk;
final DigestType digestType;
+ final EnumSet<WriteFlag> writeFlags;
final long startTime;
final OpStatsLogger createOpLogger;
boolean adv = false;
@@ -91,7 +94,8 @@ class LedgerCreateOp implements GenericCallback<Void> {
* preserve the order(e.g. sortedMap) upon later retireval.
*/
LedgerCreateOp(BookKeeper bk, int ensembleSize, int writeQuorumSize, int
ackQuorumSize, DigestType digestType,
- byte[] passwd, CreateCallback cb, Object ctx, final Map<String,
byte[]> customMetadata) {
+ byte[] passwd, CreateCallback cb, Object ctx, final Map<String,
byte[]> customMetadata,
+ EnumSet<WriteFlag> writeFlags) {
this.bk = bk;
this.metadata = new LedgerMetadata(
ensembleSize,
@@ -102,6 +106,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
customMetadata,
bk.getConf().getStoreSystemtimeAsLedgerCreationTime());
this.digestType = digestType;
+ this.writeFlags = writeFlags;
this.passwd = passwd;
this.cb = cb;
this.ctx = ctx;
@@ -189,9 +194,11 @@ class LedgerCreateOp implements GenericCallback<Void> {
try {
if (adv) {
- lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType,
passwd);
+ lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType,
+ passwd, writeFlags);
} else {
- lh = new LedgerHandle(bk, ledgerId, metadata, digestType,
passwd);
+ lh = new LedgerHandle(bk, ledgerId, metadata, digestType,
+ passwd, writeFlags);
}
} catch (GeneralSecurityException e) {
LOG.error("Security exception while creating ledger: " + ledgerId,
e);
@@ -223,6 +230,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
private int builderAckQuorumSize = 2;
private int builderWriteQuorumSize = 2;
private byte[] builderPassword;
+ private EnumSet<WriteFlag> builderWriteFlags =
EnumSet.noneOf(WriteFlag.class);
private org.apache.bookkeeper.client.api.DigestType builderDigestType =
org.apache.bookkeeper.client.api.DigestType.CRC32;
private Map<String, byte[]> builderCustomMetadata =
Collections.emptyMap();
@@ -238,6 +246,12 @@ class LedgerCreateOp implements GenericCallback<Void> {
}
@Override
+ public CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags) {
+ this.builderWriteFlags = writeFlags;
+ return this;
+ }
+
+ @Override
public CreateBuilder withWriteQuorumSize(int writeQuorumSize) {
this.builderWriteQuorumSize = writeQuorumSize;
return this;
@@ -273,6 +287,11 @@ class LedgerCreateOp implements GenericCallback<Void> {
}
private boolean validate() {
+ if (builderWriteFlags == null) {
+ LOG.error("invalid null writeFlags");
+ return false;
+ }
+
if (builderWriteQuorumSize > builderEnsembleSize) {
LOG.error("invalid writeQuorumSize {} > ensembleSize {}",
builderWriteQuorumSize, builderEnsembleSize);
return false;
@@ -322,7 +341,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
}
LedgerCreateOp op = new LedgerCreateOp(bk, builderEnsembleSize,
builderWriteQuorumSize, builderAckQuorumSize,
DigestType.fromApiDigestType(builderDigestType),
- builderPassword, cb, null, builderCustomMetadata);
+ builderPassword, cb, null, builderCustomMetadata,
builderWriteFlags);
ReentrantReadWriteLock closeLock = bk.getCloseLock();
closeLock.readLock().lock();
try {
@@ -380,7 +399,8 @@ class LedgerCreateOp implements GenericCallback<Void> {
LedgerCreateOp op = new LedgerCreateOp(parent.bk,
parent.builderEnsembleSize,
parent.builderWriteQuorumSize, parent.builderAckQuorumSize,
DigestType.fromApiDigestType(parent.builderDigestType),
- parent.builderPassword, cb, null,
parent.builderCustomMetadata);
+ parent.builderPassword, cb, null,
parent.builderCustomMetadata,
+ parent.builderWriteFlags);
ReentrantReadWriteLock closeLock = parent.bk.getCloseLock();
closeLock.readLock().lock();
try {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 5c15376..936f181 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -36,6 +36,7 @@ import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
@@ -65,6 +66,7 @@ import
org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallb
import org.apache.bookkeeper.client.api.BKException.Code;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
@@ -104,6 +106,7 @@ public class LedgerHandle implements WriteHandle {
final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
final boolean enableParallelRecoveryRead;
final int recoveryReadBatchSize;
+ final EnumSet<WriteFlag> writeFlags;
ScheduledFuture<?> timeoutFuture = null;
/**
@@ -132,13 +135,14 @@ public class LedgerHandle implements WriteHandle {
}
LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
- DigestType digestType, byte[] password)
+ DigestType digestType, byte[] password, EnumSet<WriteFlag>
writeFlags)
throws GeneralSecurityException, NumberFormatException {
this.bk = bk;
this.metadata = metadata;
this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
this.enableParallelRecoveryRead =
bk.getConf().getEnableParallelRecoveryRead();
this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
+ this.writeFlags = writeFlags;
if (metadata.isClosed()) {
lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
@@ -216,6 +220,11 @@ public class LedgerHandle implements WriteHandle {
return ledgerId;
}
+ @VisibleForTesting
+ public EnumSet<WriteFlag> getWriteFlags() {
+ return writeFlags;
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index d0ca9da..435c453 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -23,23 +23,24 @@ package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.io.Serializable;
import java.security.GeneralSecurityException;
import java.util.Comparator;
+import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Ledger Advanced handle extends {@link LedgerHandle} to provide API to add
entries with
* user supplied entryIds. Through this interface Ledger Length may not be
accurate while the
@@ -54,9 +55,10 @@ public class LedgerHandleAdv extends LedgerHandle implements
WriteAdvHandle {
}
}
- LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password)
+ LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
+ DigestType digestType, byte[] password, EnumSet<WriteFlag>
writeFlags)
throws GeneralSecurityException, NumberFormatException {
- super(bk, ledgerId, metadata, digestType, password);
+ super(bk, ledgerId, metadata, digestType, password, writeFlags);
pendingAddOps = new PriorityBlockingQueue<PendingAddOp>(10, new
PendingOpsComparator());
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index b1d1596..d324572 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -21,12 +21,14 @@
package org.apache.bookkeeper.client;
import java.security.GeneralSecurityException;
+import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.util.SafeRunnable;
@@ -73,7 +75,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements
LedgerMetadataListene
ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password, boolean watch)
throws GeneralSecurityException, NumberFormatException {
- super(bk, ledgerId, metadata, digestType, password);
+ super(bk, ledgerId, metadata, digestType, password,
EnumSet.noneOf(WriteFlag.class));
if (watch) {
bk.getLedgerManager().registerLedgerMetadataListener(ledgerId,
this);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
index cd5fd3d..aa6ad6d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client.api;
+import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Map;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
@@ -92,6 +94,26 @@ public interface CreateBuilder extends
OpBuilder<WriteHandle> {
CreateBuilder withDigestType(DigestType digestType);
/**
+ * Set write flags. Write wlags specify the behaviour of writes
+ *
+ * @param writeFlags the flags
+ *
+ * @return the builder itself
+ */
+ CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags);
+
+ /**
+ * Set write flags. Write wlags specify the behaviour of writes
+ *
+ * @param writeFlags the flags
+ *
+ * @return the builder itself
+ */
+ default CreateBuilder withWriteFlags(WriteFlag ... writeFlags) {
+ return withWriteFlags(EnumSet.copyOf(Arrays.asList(writeFlags)));
+ }
+
+ /**
* Switch the ledger into 'Advanced' mode. A ledger used in Advanced mode
will explicitly generate the sequence of
* entry identifiers. Advanced ledgers can be created with a client side
defined ledgerId
*
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
new file mode 100644
index 0000000..a680b8c
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.EnumSet;
+import lombok.Getter;
+
+/**
+ * Flags to specify the behaviour of writes.
+ */
+@Getter
+public enum WriteFlag {
+
+ /**
+ * Writes will be acknowledged after writing to the filesystem
+ * but not yet been persisted to disks.
+ */
+ DEFERRED_SYNC(0x1 << 0);
+
+ private final int value;
+
+ WriteFlag(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Converts a set of flags from a binary representation.
+ *
+ * @param flagValue the binary value
+ * @return a set of flags
+ */
+ public static EnumSet<WriteFlag> getWriteFlags(int flagValue) {
+ if ((flagValue & DEFERRED_SYNC.value) == DEFERRED_SYNC.value) {
+ return EnumSet.of(DEFERRED_SYNC);
+ }
+ return EnumSet.noneOf(WriteFlag.class);
+ }
+
+ /**
+ * Converts a set of flags from a binary representation.
+ *
+ * @param flags the flags
+ * @return the binary representation
+ */
+ public static int getWriteFlagsValue(EnumSet<WriteFlag> flags) {
+ int result = 0;
+ for (WriteFlag flag : flags) {
+ result |= flag.value;
+ }
+ return result;
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index 397f872..a4d60aa 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Enumeration;
import java.util.Map.Entry;
import java.util.Set;
@@ -34,6 +35,7 @@ import java.util.SortedMap;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -249,7 +251,7 @@ public class TestLedgerFragmentReplication extends
BookKeeperClusterTestCase {
}
};
LedgerHandle lh = new LedgerHandle(bkc, 0, metadata, TEST_DIGEST_TYPE,
- TEST_PSSWD);
+ TEST_PSSWD, EnumSet.noneOf(WriteFlag.class));
testSplitIntoSubFragments(10, 21, -1, 1, lh);
testSplitIntoSubFragments(10, 21, 20, 1, lh);
testSplitIntoSubFragments(0, 0, 10, 1, lh);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index fcb35a1..3c29895 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -20,11 +20,13 @@
*/
package org.apache.bookkeeper.client.api;
+import static org.apache.bookkeeper.client.api.WriteFlag.DEFERRED_SYNC;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -32,6 +34,7 @@ import
org.apache.bookkeeper.client.BKException.BKClientClosedException;
import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.MockBookKeeperTestCase;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -51,6 +54,7 @@ public class BookKeeperBuildersTest extends
MockBookKeeperTestCase {
private static final Map<String, byte[]> customMetadata = new HashMap<>();
private static final byte[] password = new byte[3];
private static final byte[] entryData = new byte[32];
+ private static final EnumSet<WriteFlag> writeFlagsDeferredSync =
EnumSet.of(DEFERRED_SYNC);
@Test
public void testCreateLedger() throws Exception {
@@ -89,6 +93,14 @@ public class BookKeeperBuildersTest extends
MockBookKeeperTestCase {
}
@Test(expected = BKIncorrectParameterException.class)
+ public void testFailNullWriteFlags() throws Exception {
+ result(newCreateLedgerOp()
+ .withWriteFlags((EnumSet<WriteFlag>) null)
+ .withPassword(password)
+ .execute());
+ }
+
+ @Test(expected = BKIncorrectParameterException.class)
public void testFailAckQuorumSize0() throws Exception {
result(newCreateLedgerOp()
.withEnsembleSize(2)
@@ -191,6 +203,94 @@ public class BookKeeperBuildersTest extends
MockBookKeeperTestCase {
assertArrayEquals(password, metadata.getPassword());
}
+ @Test
+ public void testDefaultWriteFlagsEmpty() throws Exception {
+ setNewGeneratedLedgerId(ledgerId);
+ WriteHandle writer = newCreateLedgerOp()
+ .withAckQuorumSize(ackQuorumSize)
+ .withEnsembleSize(ensembleSize)
+ .withPassword(password)
+ .withWriteQuorumSize(writeQuorumSize)
+ .withCustomMetadata(customMetadata)
+ .execute()
+ .get();
+ assertEquals(ledgerId, writer.getId());
+ LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+ assertEquals(ensembleSize, metadata.getEnsembleSize());
+ assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+ assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+ assertArrayEquals(password, metadata.getPassword());
+ LedgerHandle lh = (LedgerHandle) writer;
+ assertEquals(EnumSet.noneOf(WriteFlag.class), lh.getWriteFlags());
+ }
+
+ @Test
+ public void testCreateAdvLedgerWriteFlags() throws Exception {
+ setNewGeneratedLedgerId(ledgerId);
+ WriteAdvHandle writer = newCreateLedgerOp()
+ .withAckQuorumSize(ackQuorumSize)
+ .withEnsembleSize(ensembleSize)
+ .withPassword(password)
+ .withWriteQuorumSize(writeQuorumSize)
+ .withCustomMetadata(customMetadata)
+ .withWriteFlags(writeFlagsDeferredSync)
+ .makeAdv()
+ .execute()
+ .get();
+ assertEquals(ledgerId, writer.getId());
+ LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+ assertEquals(ensembleSize, metadata.getEnsembleSize());
+ assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+ assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+ assertArrayEquals(password, metadata.getPassword());
+ LedgerHandle lh = (LedgerHandle) writer;
+ assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+ }
+
+ @Test
+ public void testCreateLedgerWriteFlags() throws Exception {
+ setNewGeneratedLedgerId(ledgerId);
+ WriteHandle writer = newCreateLedgerOp()
+ .withAckQuorumSize(ackQuorumSize)
+ .withEnsembleSize(ensembleSize)
+ .withPassword(password)
+ .withWriteQuorumSize(writeQuorumSize)
+ .withCustomMetadata(customMetadata)
+ .withWriteFlags(writeFlagsDeferredSync)
+ .execute()
+ .get();
+ assertEquals(ledgerId, writer.getId());
+ LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+ assertEquals(ensembleSize, metadata.getEnsembleSize());
+ assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+ assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+ assertArrayEquals(password, metadata.getPassword());
+ LedgerHandle lh = (LedgerHandle) writer;
+ assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+ }
+
+ @Test
+ public void testCreateLedgerWriteFlagsVarargs() throws Exception {
+ setNewGeneratedLedgerId(ledgerId);
+ WriteHandle writer = newCreateLedgerOp()
+ .withAckQuorumSize(ackQuorumSize)
+ .withEnsembleSize(ensembleSize)
+ .withPassword(password)
+ .withWriteQuorumSize(writeQuorumSize)
+ .withCustomMetadata(customMetadata)
+ .withWriteFlags(DEFERRED_SYNC)
+ .execute()
+ .get();
+ assertEquals(ledgerId, writer.getId());
+ LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+ assertEquals(ensembleSize, metadata.getEnsembleSize());
+ assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+ assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+ assertArrayEquals(password, metadata.getPassword());
+ LedgerHandle lh = (LedgerHandle) writer;
+ assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+ }
+
@Test(expected = BKIncorrectParameterException.class)
public void testFailCreateAdvLedgerBadFixedLedgerIdMinus1() throws
Exception {
result(newCreateLedgerOp()
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
new file mode 100644
index 0000000..c51bdf5
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import static org.apache.bookkeeper.client.api.WriteFlag.DEFERRED_SYNC;
+import static org.junit.Assert.assertEquals;
+
+import java.util.EnumSet;
+import org.junit.Test;
+
+/**
+ * Unit tests for WriteFlag.
+ */
+public class WriteFlagTest {
+
+ private static final int NONE = 0;
+
+ @Test
+ public void testGetWriteFlagsDeferredSync() {
+ assertEquals(EnumSet.of(DEFERRED_SYNC),
+ WriteFlag.getWriteFlags(DEFERRED_SYNC.getValue()));
+ }
+
+ @Test
+ public void testGetWriteFlagsNone() {
+ assertEquals(EnumSet.noneOf(WriteFlag.class),
+ WriteFlag.getWriteFlags(NONE));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetWriteFlagsValueNull() {
+ WriteFlag.getWriteFlagsValue(null);
+ }
+
+ @Test
+ public void testGetWriteFlagsValueEmpty() {
+ assertEquals(0,
WriteFlag.getWriteFlagsValue(EnumSet.noneOf(WriteFlag.class)));
+ }
+
+ @Test
+ public void testGetWriteFlagsValueDeferredSync() {
+ assertEquals(1,
WriteFlag.getWriteFlagsValue(EnumSet.of(DEFERRED_SYNC)));
+ }
+}
diff --git a/site/docs/latest/api/ledger-api.md
b/site/docs/latest/api/ledger-api.md
index 255ef80..2303aaa 100644
--- a/site/docs/latest/api/ledger-api.md
+++ b/site/docs/latest/api/ledger-api.md
@@ -471,3 +471,307 @@ mvn exec:java -Dexec.mainClass=org.apache.bookkeeper.Dice
Value = 3, isLeader = true
Value = 1, isLeader = false
```
+
+## New API
+
+Since 4.6 BookKeeper provides a new client API which leverages Java8
[CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html)
facility.
+[WriteHandle](../javadoc/org/apache/bookkeeper/client/api/WriteHandle),
[WriteAdvHandle](../javadoc/org/apache/bookkeeper/client/api/WriteAdvHandle),
[ReadHandle](../javadoc/org/apache/bookkeeper/client/api/ReadHandle) are
introduced for replacing the generic
[LedgerHandle](../javadoc/org/apache/bookkeeper/client/LedgerHandle).
+
+> All the new API now is available in `org.apache.bookkeeper.client.api`. You
should only use interfaces defined in this package.
+
+*Beware* that this API in 4.6 is still experimental API and can be subject to
changes in next minor releases.
+
+### Create a new client
+
+In order to create a new
[`BookKeeper`](../javadoc/org/apache/bookkeeper/client/api/BookKeeper) client
object, you need to construct a
[`ClientConfiguration`](../javadoc/org/apache/bookkeeper/conf/ClientConfiguration)
object and set a [connection string](#connection-string) first, and then use
[`BookKeeperBuilder`](../javadoc/org/apache/bookkeeper/client/api/BookKeeperBuilder)
to build the client.
+
+Here is an example building the bookkeeper client.
+
+```java
+// construct a client configuration instance
+ClientConfiguration conf = new ClientConfiguration();
+conf.setZkServers(zkConnectionString);
+conf.setZkLedgersRootPath("/path/to/ledgers/root");
+
+// build the bookkeeper client
+BookKeeper bk = BookKeeper.newBuilder(conf)
+ .statsLogger(...)
+ ...
+ .build();
+
+```
+
+### Create ledgers
+
+the easiest way to create a {% pop ledger %} using the java client is via the
[`createbuilder`](../javadoc/org/apache/bookkeeper/client/api/createbuilder).
you must specify at least
+a [`digesttype`](../javadoc/org/apache/bookkeeper/client/api/digesttype) and a
password.
+
+here's an example:
+
+```java
+BookKeeper bk = ...;
+
+byte[] password = "some-password".getBytes();
+
+WriteHandle wh = bk.newCreateLedgerOp()
+ .withDigestType(DigestType.CRC32)
+ .withPassword(password)
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .execute() // execute the creation op
+ .get(); // wait for the execution to complete
+```
+
+A [`WriteHandle`](../javadoc/org/apache/bookkeeper/client/api/WriteHandle) is
returned for applications to write and read entries to and from the ledger.
+
+### Write flags
+
+You can specify behaviour of the writer by setting
[`WriteFlags`](../javadoc/org/apache/bookkeeper/client/api/WriteFlag) at ledger
creation type.
+These flags are applied only during write operations and are not recorded on
metadata.
+
+
+Available write flags:
+
+| Flag | Explanation | Notes |
+:---------|:------------|:-------
+DEFERRED_SYNC | Writes are acknowledged early, without waiting for
+guarantees of durability | Data will be only written to the OS page cache,
without forcing an fsync.
+
+```java
+BookKeeper bk = ...;
+
+byte[] password = "some-password".getBytes();
+
+WriteHandle wh = bk.newCreateLedgerOp()
+ .withDigestType(DigestType.CRC32)
+ .withPassword(password)
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .withWriteFlags(DEFERRED_SYNC)
+ .execute() // execute the creation op
+ .get(); // wait for the execution to complete
+```
+
+
+### Append entries to ledgers
+
+The [`WriteHandle`](../javadoc/org/apache/bookkeeper/client/api/WriteHandle)
can be used for applications to append entries to the ledgers.
+
+```java
+WriteHandle wh = ...;
+
+CompletableFuture<Long> addFuture = wh.append("Some entry data".getBytes());
+
+// option 1: you can wait for add to complete synchronously
+try {
+ long entryId = FutureUtils.result(addFuture.get());
+} catch (BKException bke) {
+ // error handling
+}
+
+// option 2: you can process the result and exception asynchronously
+addFuture
+ .thenApply(entryId -> {
+ // process the result
+ })
+ .exceptionally(cause -> {
+ // handle the exception
+ })
+
+// option 3: bookkeeper provides a twitter-future-like event listener for
processing result and exception asynchronously
+addFuture.whenComplete(new FutureEventListener() {
+ @Override
+ public void onSuccess(long entryId) {
+ // process the result
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ // handle the exception
+ }
+});
+```
+
+The append method supports three representations of a bytes array: the native
java `byte[]`, java nio `ByteBuffer` and netty `ByteBuf`.
+It is recommended to use `ByteBuf` as it is more gc friendly.
+
+### Open ledgers
+
+You can open ledgers to read entries. Opening ledgers is done by
[`openBuilder`](../javadoc/org/apache/bookkeeper/client/api/openBuilder). You
must specify the ledgerId and the password
+in order to open the ledgers.
+
+here's an example:
+
+```java
+BookKeeper bk = ...;
+
+long ledgerId = ...;
+byte[] password = "some-password".getBytes();
+
+ReadHandle rh = bk.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withPassword(password)
+ .execute() // execute the open op
+ .get(); // wait for the execution to complete
+```
+
+A [`ReadHandle`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle) is
returned for applications to read entries to and from the ledger.
+
+#### Recovery vs NoRecovery
+
+By default, the
[`openBuilder`](../javadoc/org/apache/bookkeeper/client/api/openBuilder) opens
the ledger in a `NoRecovery` mode. You can open the ledger in `Recovery` mode
by specifying
+`withRecovery(true)` in the open builder.
+
+```java
+BookKeeper bk = ...;
+
+long ledgerId = ...;
+byte[] password = "some-password".getBytes();
+
+ReadHandle rh = bk.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withPassword(password)
+ .withRecovery(true)
+ .execute()
+ .get();
+
+```
+
+**What is the difference between "Recovery" and "NoRecovery"?**
+
+If you are opening a ledger in "Recovery" mode, it will basically fence and
seal the ledger -- no more entries are allowed
+to be appended to it. The writer which is currently appending entries to the
ledger will fail with
[`LedgerFencedException`](../javadoc/org/apache/bookkeeper/client/api/BKException.Code#LedgerFencedException).
+
+In constrat, opening a ledger in "NoRecovery" mode, it will not fence and seal
the ledger. "NoRecovery" mode is usually used by applications to tailing-read
from a ledger.
+
+### Read entries from ledgers
+
+The [`ReadHandle`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle)
returned from the open builder can be used for applications to read entries
from the ledgers.
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = ...;
+long endEntryId = ...;
+CompletableFuture<LedgerEntries> readFuture = rh.read(startEntryId,
endEntryId);
+
+// option 1: you can wait for read to complete synchronously
+try {
+ LedgerEntries entries = FutureUtils.result(readFuture.get());
+} catch (BKException bke) {
+ // error handling
+}
+
+// option 2: you can process the result and exception asynchronously
+readFuture
+ .thenApply(entries -> {
+ // process the result
+ })
+ .exceptionally(cause -> {
+ // handle the exception
+ })
+
+// option 3: bookkeeper provides a twitter-future-like event listener for
processing result and exception asynchronously
+readFuture.whenComplete(new FutureEventListener<>() {
+ @Override
+ public void onSuccess(LedgerEntries entries) {
+ // process the result
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ // handle the exception
+ }
+});
+```
+
+Once you are done with processing the
[`LedgerEntries`](../javadoc/org/apache/bookkeeper/client/api/LedgerEntries),
you can call `#close()` on the `LedgerEntries` instance to
+release the buffers held by it.
+
+Applications are allowed to read any entries between `0` and
[`LastAddConfirmed`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle.html#getLastAddConfirmed).
If the applications
+attempts to read entries beyond `LastAddConfirmed`, they will receive
[`IncorrectParameterException`](../javadoc/org/apache/bookkeeper/client/api/BKException.Code#IncorrectParameterException).
+
+### Read unconfirmed entries from ledgers
+
+`readUnconfirmed` is provided the mechanism for applications to read entries
beyond `LastAddConfirmed`. Applications should be aware of `readUnconfirmed`
doesn't provide any
+repeatable read consistency.
+
+```java
+CompletableFuture<LedgerEntries> readFuture = rh.readUnconfirmed(startEntryId,
endEntryId);
+```
+
+### Tailing Reads
+
+There are two methods for applications to achieve tailing reads: `Polling` and
`Long-Polling`.
+
+#### Polling
+
+You can do this in synchronous way:
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = 0L;
+long nextEntryId = startEntryId;
+int numEntriesPerBatch = 4;
+while (!rh.isClosed() || nextEntryId <= rh.getLastAddConfirmed()) {
+ long lac = rh.getLastAddConfirmed();
+ if (nextEntryId > lac) {
+ // no more entries are added
+ Thread.sleep(1000);
+
+ lac = rh.readLastAddConfirmed().get();
+ continue;
+ }
+
+ long endEntryId = Math.min(lac, nextEntryId + numEntriesPerBatch - 1);
+ LedgerEntries entries = rh.read(nextEntryId, endEntryId).get();
+
+ // process the entries
+
+ nextEntryId = endEntryId + 1;
+}
+```
+
+#### Long Polling
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = 0L;
+long nextEntryId = startEntryId;
+int numEntriesPerBatch = 4;
+while (!rh.isClosed() || nextEntryId <= rh.getLastAddConfirmed()) {
+ long lac = rh.getLastAddConfirmed();
+ if (nextEntryId > lac) {
+ // no more entries are added
+ try (LastConfirmedAndEntry lacAndEntry =
rh.readLastAddConfirmedAndEntry(nextEntryId, 1000, false).get()) {
+ if (lacAndEntry.hasEntry()) {
+ // process the entry
+
+ ++nextEntryId;
+ }
+ }
+ } else {
+ long endEntryId = Math.min(lac, nextEntryId + numEntriesPerBatch - 1);
+ LedgerEntries entries = rh.read(nextEntryId, endEntryId).get();
+
+ // process the entries
+ nextEntryId = endEntryId + 1;
+ }
+}
+```
+
+### Delete ledgers
+
+{% pop Ledgers %} can be deleted by using
[`DeleteBuilder`](../javadoc/org/apache/bookkeeper/client/api/DeleteBuilder).
+
+```java
+BookKeeper bk = ...;
+long ledgerId = ...;
+
+bk.newDeleteLedgerOp()
+ .withLedgerId(ledgerId)
+ .execute()
+ .get();
+```
diff --git a/site/docs/latest/api/overview.md b/site/docs/latest/api/overview.md
index 3eb6492..3e0adcd 100644
--- a/site/docs/latest/api/overview.md
+++ b/site/docs/latest/api/overview.md
@@ -5,7 +5,7 @@ title: BookKeeper API
BookKeeper offers a few APIs that applications can use to interact with it:
* The [ledger API](../ledger-api) is a lower-level API that enables you to
interact with {% pop ledgers %} directly
-* The [Ledger Advanced API)(../ledger-adv-api) is an advanced extension to
[Ledger API](../ledger-api) to provide more flexibilities to applications.
+* The [Ledger Advanced API](../ledger-adv-api) is an advanced extension to
[Ledger API](../ledger-api) to provide more flexibilities to applications.
* The [DistributedLog API](../distributedlog-api) is a higher-level API that
provides convenient abstractions.
## Trade-offs
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].