This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e4ff0d  ISSUE #744: BP-18 introduce write flags
3e4ff0d is described below

commit 3e4ff0dca8557ff03dda2d292c3c6678df8f1a02
Author: eolivelli <[email protected]>
AuthorDate: Wed Dec 13 15:43:50 2017 -0800

    ISSUE #744: BP-18 introduce write flags
    
    Introduce writeflags on the API and on protobuf files.
    Client and server do not use these flags yet.
    
    Author: eolivelli <[email protected]>
    Author: Enrico Olivelli <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #742 from eolivelli/bp14-writeflags, closes #744
---
 .../src/main/proto/BookkeeperProtocol.proto        |   1 +
 .../org/apache/bookkeeper/client/BookKeeper.java   |  13 +-
 .../apache/bookkeeper/client/LedgerCreateOp.java   |  30 +-
 .../org/apache/bookkeeper/client/LedgerHandle.java |  11 +-
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |  10 +-
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    |   4 +-
 .../bookkeeper/client/api/CreateBuilder.java       |  22 ++
 .../apache/bookkeeper/client/api/WriteFlag.java    |  70 +++++
 .../client/TestLedgerFragmentReplication.java      |   4 +-
 .../client/api/BookKeeperBuildersTest.java         | 100 +++++++
 .../bookkeeper/client/api/WriteFlagTest.java       |  62 +++++
 site/docs/latest/api/ledger-api.md                 | 304 +++++++++++++++++++++
 site/docs/latest/api/overview.md                   |   2 +-
 13 files changed, 617 insertions(+), 16 deletions(-)

diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto 
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index 38ed3c5..ffd0f42 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -112,6 +112,7 @@ message AddRequest {
     required int64 entryId = 2;
     required bytes masterKey = 3;
     required bytes body = 4;
+    optional int32 writeFlags = 5;
 }
 
 message StartTLSRequest {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index d0f8a64..17df48c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -32,6 +32,7 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -55,6 +56,7 @@ import org.apache.bookkeeper.client.api.BookKeeperBuilder;
 import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
@@ -760,7 +762,8 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx, 
customMetadata)
+                               ackQuorumSize, digestType, passwd, cb, ctx,
+                               customMetadata, EnumSet.noneOf(WriteFlag.class))
                 .initiate();
         } finally {
             closeLock.readLock().unlock();
@@ -963,7 +966,9 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx, 
customMetadata).initiateAdv((long) (-1));
+                               ackQuorumSize, digestType, passwd, cb, ctx,
+                               customMetadata, EnumSet.noneOf(WriteFlag.class))
+                                       .initiateAdv(-1L);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -1072,7 +1077,9 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx, 
customMetadata).initiateAdv(ledgerId);
+                               ackQuorumSize, digestType, passwd, cb, ctx,
+                               customMetadata, EnumSet.noneOf(WriteFlag.class))
+                    .initiateAdv(ledgerId);
         } finally {
             closeLock.readLock().unlock();
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 77d2ab2..a3d5239 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.client;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@ import 
org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback;
 import org.apache.bookkeeper.client.api.CreateAdvBuilder;
 import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -62,6 +64,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
     final byte[] passwd;
     final BookKeeper bk;
     final DigestType digestType;
+    final EnumSet<WriteFlag> writeFlags;
     final long startTime;
     final OpStatsLogger createOpLogger;
     boolean adv = false;
@@ -91,7 +94,8 @@ class LedgerCreateOp implements GenericCallback<Void> {
      *       preserve the order(e.g. sortedMap) upon later retireval.
      */
     LedgerCreateOp(BookKeeper bk, int ensembleSize, int writeQuorumSize, int 
ackQuorumSize, DigestType digestType,
-            byte[] passwd, CreateCallback cb, Object ctx, final Map<String, 
byte[]> customMetadata) {
+            byte[] passwd, CreateCallback cb, Object ctx, final Map<String, 
byte[]> customMetadata,
+            EnumSet<WriteFlag> writeFlags) {
         this.bk = bk;
         this.metadata = new LedgerMetadata(
             ensembleSize,
@@ -102,6 +106,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
             customMetadata,
             bk.getConf().getStoreSystemtimeAsLedgerCreationTime());
         this.digestType = digestType;
+        this.writeFlags = writeFlags;
         this.passwd = passwd;
         this.cb = cb;
         this.ctx = ctx;
@@ -189,9 +194,11 @@ class LedgerCreateOp implements GenericCallback<Void> {
 
         try {
             if (adv) {
-                lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType, 
passwd);
+                lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType,
+                        passwd, writeFlags);
             } else {
-                lh = new LedgerHandle(bk, ledgerId, metadata, digestType, 
passwd);
+                lh = new LedgerHandle(bk, ledgerId, metadata, digestType,
+                        passwd, writeFlags);
             }
         } catch (GeneralSecurityException e) {
             LOG.error("Security exception while creating ledger: " + ledgerId, 
e);
@@ -223,6 +230,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
         private int builderAckQuorumSize = 2;
         private int builderWriteQuorumSize = 2;
         private byte[] builderPassword;
+        private EnumSet<WriteFlag> builderWriteFlags = 
EnumSet.noneOf(WriteFlag.class);
         private org.apache.bookkeeper.client.api.DigestType builderDigestType =
             org.apache.bookkeeper.client.api.DigestType.CRC32;
         private Map<String, byte[]> builderCustomMetadata = 
Collections.emptyMap();
@@ -238,6 +246,12 @@ class LedgerCreateOp implements GenericCallback<Void> {
         }
 
         @Override
+        public CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags) {
+            this.builderWriteFlags = writeFlags;
+            return this;
+        }
+
+        @Override
         public CreateBuilder withWriteQuorumSize(int writeQuorumSize) {
             this.builderWriteQuorumSize = writeQuorumSize;
             return this;
@@ -273,6 +287,11 @@ class LedgerCreateOp implements GenericCallback<Void> {
         }
 
         private boolean validate() {
+            if (builderWriteFlags == null) {
+                LOG.error("invalid null writeFlags");
+                return false;
+            }
+
             if (builderWriteQuorumSize > builderEnsembleSize) {
                 LOG.error("invalid writeQuorumSize {} > ensembleSize {}", 
builderWriteQuorumSize, builderEnsembleSize);
                 return false;
@@ -322,7 +341,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
             }
             LedgerCreateOp op = new LedgerCreateOp(bk, builderEnsembleSize,
                 builderWriteQuorumSize, builderAckQuorumSize, 
DigestType.fromApiDigestType(builderDigestType),
-                builderPassword, cb, null, builderCustomMetadata);
+                builderPassword, cb, null, builderCustomMetadata, 
builderWriteFlags);
             ReentrantReadWriteLock closeLock = bk.getCloseLock();
             closeLock.readLock().lock();
             try {
@@ -380,7 +399,8 @@ class LedgerCreateOp implements GenericCallback<Void> {
             LedgerCreateOp op = new LedgerCreateOp(parent.bk, 
parent.builderEnsembleSize,
                     parent.builderWriteQuorumSize, parent.builderAckQuorumSize,
                     DigestType.fromApiDigestType(parent.builderDigestType),
-                    parent.builderPassword, cb, null, 
parent.builderCustomMetadata);
+                    parent.builderPassword, cb, null, 
parent.builderCustomMetadata,
+                    parent.builderWriteFlags);
             ReentrantReadWriteLock closeLock = parent.bk.getCloseLock();
             closeLock.readLock().lock();
             try {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 5c15376..936f181 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -36,6 +36,7 @@ import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.List;
@@ -65,6 +66,7 @@ import 
org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallb
 import org.apache.bookkeeper.client.api.BKException.Code;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
@@ -104,6 +106,7 @@ public class LedgerHandle implements WriteHandle {
     final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
     final boolean enableParallelRecoveryRead;
     final int recoveryReadBatchSize;
+    final EnumSet<WriteFlag> writeFlags;
     ScheduledFuture<?> timeoutFuture = null;
 
     /**
@@ -132,13 +135,14 @@ public class LedgerHandle implements WriteHandle {
     }
 
     LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
-                 DigestType digestType, byte[] password)
+                 DigestType digestType, byte[] password, EnumSet<WriteFlag> 
writeFlags)
             throws GeneralSecurityException, NumberFormatException {
         this.bk = bk;
         this.metadata = metadata;
         this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
         this.enableParallelRecoveryRead = 
bk.getConf().getEnableParallelRecoveryRead();
         this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
+        this.writeFlags = writeFlags;
 
         if (metadata.isClosed()) {
             lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
@@ -216,6 +220,11 @@ public class LedgerHandle implements WriteHandle {
         return ledgerId;
     }
 
+    @VisibleForTesting
+    public EnumSet<WriteFlag> getWriteFlags() {
+        return writeFlags;
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index d0ca9da..435c453 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -23,23 +23,24 @@ package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.io.Serializable;
 import java.security.GeneralSecurityException;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
 import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add 
entries with
  * user supplied entryIds. Through this interface Ledger Length may not be 
accurate while the
@@ -54,9 +55,10 @@ public class LedgerHandleAdv extends LedgerHandle implements 
WriteAdvHandle {
         }
     }
 
-    LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata, 
DigestType digestType, byte[] password)
+    LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
+            DigestType digestType, byte[] password, EnumSet<WriteFlag> 
writeFlags)
             throws GeneralSecurityException, NumberFormatException {
-        super(bk, ledgerId, metadata, digestType, password);
+        super(bk, ledgerId, metadata, digestType, password, writeFlags);
         pendingAddOps = new PriorityBlockingQueue<PendingAddOp>(10, new 
PendingOpsComparator());
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index b1d1596..d324572 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -21,12 +21,14 @@
 package org.apache.bookkeeper.client;
 
 import java.security.GeneralSecurityException;
+import java.util.EnumSet;
 import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -73,7 +75,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements 
LedgerMetadataListene
     ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
                          DigestType digestType, byte[] password, boolean watch)
             throws GeneralSecurityException, NumberFormatException {
-        super(bk, ledgerId, metadata, digestType, password);
+        super(bk, ledgerId, metadata, digestType, password, 
EnumSet.noneOf(WriteFlag.class));
         if (watch) {
             bk.getLedgerManager().registerLedgerMetadataListener(ledgerId, 
this);
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
index cd5fd3d..aa6ad6d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client.api;
 
+import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Map;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
@@ -92,6 +94,26 @@ public interface CreateBuilder extends 
OpBuilder<WriteHandle> {
     CreateBuilder withDigestType(DigestType digestType);
 
     /**
+     * Set write flags. Write wlags specify the behaviour of writes
+     *
+     * @param writeFlags the flags
+     *
+     * @return the builder itself
+     */
+    CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags);
+
+    /**
+     * Set write flags. Write wlags specify the behaviour of writes
+     *
+     * @param writeFlags the flags
+     *
+     * @return the builder itself
+     */
+    default CreateBuilder withWriteFlags(WriteFlag ... writeFlags) {
+        return withWriteFlags(EnumSet.copyOf(Arrays.asList(writeFlags)));
+    }
+
+    /**
      * Switch the ledger into 'Advanced' mode. A ledger used in Advanced mode 
will explicitly generate the sequence of
      * entry identifiers. Advanced ledgers can be created with a client side 
defined ledgerId
      *
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
new file mode 100644
index 0000000..a680b8c
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.EnumSet;
+import lombok.Getter;
+
+/**
+ * Flags to specify the behaviour of writes.
+ */
+@Getter
+public enum WriteFlag {
+
+    /**
+     * Writes will be acknowledged after writing to the filesystem
+     * but not yet been persisted to disks.
+     */
+    DEFERRED_SYNC(0x1 << 0);
+
+    private final int value;
+
+    WriteFlag(int value) {
+        this.value = value;
+    }
+
+    /**
+     * Converts a set of flags from a binary representation.
+     *
+     * @param flagValue the binary value
+     * @return a set of flags
+     */
+    public static EnumSet<WriteFlag> getWriteFlags(int flagValue) {
+        if ((flagValue & DEFERRED_SYNC.value) == DEFERRED_SYNC.value) {
+            return EnumSet.of(DEFERRED_SYNC);
+        }
+        return EnumSet.noneOf(WriteFlag.class);
+    }
+
+    /**
+     * Converts a set of flags from a binary representation.
+     *
+     * @param flags the flags
+     * @return the binary representation
+     */
+    public static int getWriteFlagsValue(EnumSet<WriteFlag> flags) {
+        int result = 0;
+        for (WriteFlag flag : flags) {
+            result |= flag.value;
+        }
+        return result;
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index 397f872..a4d60aa 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.Enumeration;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -34,6 +35,7 @@ import java.util.SortedMap;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -249,7 +251,7 @@ public class TestLedgerFragmentReplication extends 
BookKeeperClusterTestCase {
             }
         };
         LedgerHandle lh = new LedgerHandle(bkc, 0, metadata, TEST_DIGEST_TYPE,
-                TEST_PSSWD);
+                TEST_PSSWD, EnumSet.noneOf(WriteFlag.class));
         testSplitIntoSubFragments(10, 21, -1, 1, lh);
         testSplitIntoSubFragments(10, 21, 20, 1, lh);
         testSplitIntoSubFragments(0, 0, 10, 1, lh);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index fcb35a1..3c29895 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -20,11 +20,13 @@
  */
 package org.apache.bookkeeper.client.api;
 
+import static org.apache.bookkeeper.client.api.WriteFlag.DEFERRED_SYNC;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,6 +34,7 @@ import 
org.apache.bookkeeper.client.BKException.BKClientClosedException;
 import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.MockBookKeeperTestCase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -51,6 +54,7 @@ public class BookKeeperBuildersTest extends 
MockBookKeeperTestCase {
     private static final Map<String, byte[]> customMetadata = new HashMap<>();
     private static final byte[] password = new byte[3];
     private static final byte[] entryData = new byte[32];
+    private static final EnumSet<WriteFlag> writeFlagsDeferredSync = 
EnumSet.of(DEFERRED_SYNC);
 
     @Test
     public void testCreateLedger() throws Exception {
@@ -89,6 +93,14 @@ public class BookKeeperBuildersTest extends 
MockBookKeeperTestCase {
     }
 
     @Test(expected = BKIncorrectParameterException.class)
+    public void testFailNullWriteFlags() throws Exception {
+        result(newCreateLedgerOp()
+            .withWriteFlags((EnumSet<WriteFlag>) null)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
     public void testFailAckQuorumSize0() throws Exception {
         result(newCreateLedgerOp()
             .withEnsembleSize(2)
@@ -191,6 +203,94 @@ public class BookKeeperBuildersTest extends 
MockBookKeeperTestCase {
         assertArrayEquals(password, metadata.getPassword());
     }
 
+    @Test
+    public void testDefaultWriteFlagsEmpty() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        WriteHandle writer = newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(ensembleSize)
+            .withPassword(password)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .execute()
+            .get();
+        assertEquals(ledgerId, writer.getId());
+        LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+        assertEquals(ensembleSize, metadata.getEnsembleSize());
+        assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+        assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+        assertArrayEquals(password, metadata.getPassword());
+        LedgerHandle lh = (LedgerHandle) writer;
+        assertEquals(EnumSet.noneOf(WriteFlag.class), lh.getWriteFlags());
+    }
+
+    @Test
+    public void testCreateAdvLedgerWriteFlags() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        WriteAdvHandle writer = newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(ensembleSize)
+            .withPassword(password)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .withWriteFlags(writeFlagsDeferredSync)
+            .makeAdv()
+            .execute()
+            .get();
+        assertEquals(ledgerId, writer.getId());
+        LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+        assertEquals(ensembleSize, metadata.getEnsembleSize());
+        assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+        assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+        assertArrayEquals(password, metadata.getPassword());
+        LedgerHandle lh = (LedgerHandle) writer;
+        assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+    }
+
+    @Test
+    public void testCreateLedgerWriteFlags() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        WriteHandle writer = newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(ensembleSize)
+            .withPassword(password)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .withWriteFlags(writeFlagsDeferredSync)
+            .execute()
+            .get();
+        assertEquals(ledgerId, writer.getId());
+        LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+        assertEquals(ensembleSize, metadata.getEnsembleSize());
+        assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+        assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+        assertArrayEquals(password, metadata.getPassword());
+        LedgerHandle lh = (LedgerHandle) writer;
+        assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+    }
+
+    @Test
+    public void testCreateLedgerWriteFlagsVarargs() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        WriteHandle writer = newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(ensembleSize)
+            .withPassword(password)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .withWriteFlags(DEFERRED_SYNC)
+            .execute()
+            .get();
+        assertEquals(ledgerId, writer.getId());
+        LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+        assertEquals(ensembleSize, metadata.getEnsembleSize());
+        assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+        assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+        assertArrayEquals(password, metadata.getPassword());
+        LedgerHandle lh = (LedgerHandle) writer;
+        assertEquals(writeFlagsDeferredSync, lh.getWriteFlags());
+    }
+
     @Test(expected = BKIncorrectParameterException.class)
     public void testFailCreateAdvLedgerBadFixedLedgerIdMinus1() throws 
Exception {
         result(newCreateLedgerOp()
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
new file mode 100644
index 0000000..c51bdf5
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteFlagTest.java
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import static org.apache.bookkeeper.client.api.WriteFlag.DEFERRED_SYNC;
+import static org.junit.Assert.assertEquals;
+
+import java.util.EnumSet;
+import org.junit.Test;
+
+/**
+ * Unit tests for WriteFlag.
+ */
+public class WriteFlagTest {
+
+    private static final int NONE = 0;
+
+    @Test
+    public void testGetWriteFlagsDeferredSync() {
+        assertEquals(EnumSet.of(DEFERRED_SYNC),
+                WriteFlag.getWriteFlags(DEFERRED_SYNC.getValue()));
+    }
+
+    @Test
+    public void testGetWriteFlagsNone() {
+        assertEquals(EnumSet.noneOf(WriteFlag.class),
+                WriteFlag.getWriteFlags(NONE));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testGetWriteFlagsValueNull() {
+        WriteFlag.getWriteFlagsValue(null);
+    }
+
+    @Test
+    public void testGetWriteFlagsValueEmpty() {
+        assertEquals(0, 
WriteFlag.getWriteFlagsValue(EnumSet.noneOf(WriteFlag.class)));
+    }
+
+    @Test
+    public void testGetWriteFlagsValueDeferredSync() {
+        assertEquals(1, 
WriteFlag.getWriteFlagsValue(EnumSet.of(DEFERRED_SYNC)));
+    }
+}
diff --git a/site/docs/latest/api/ledger-api.md 
b/site/docs/latest/api/ledger-api.md
index 255ef80..2303aaa 100644
--- a/site/docs/latest/api/ledger-api.md
+++ b/site/docs/latest/api/ledger-api.md
@@ -471,3 +471,307 @@ mvn exec:java -Dexec.mainClass=org.apache.bookkeeper.Dice
 Value = 3, isLeader = true
 Value = 1, isLeader = false
 ```
+
+## New API
+
+Since 4.6 BookKeeper provides a new client API which leverages Java8 
[CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html)
 facility.
+[WriteHandle](../javadoc/org/apache/bookkeeper/client/api/WriteHandle), 
[WriteAdvHandle](../javadoc/org/apache/bookkeeper/client/api/WriteAdvHandle), 
[ReadHandle](../javadoc/org/apache/bookkeeper/client/api/ReadHandle) are 
introduced for replacing the generic 
[LedgerHandle](../javadoc/org/apache/bookkeeper/client/LedgerHandle).
+
+> All the new API now is available in `org.apache.bookkeeper.client.api`. You 
should only use interfaces defined in this package.
+
+*Beware* that this API in 4.6 is still experimental API and can be subject to 
changes in next minor releases.
+
+### Create a new client
+
+In order to create a new 
[`BookKeeper`](../javadoc/org/apache/bookkeeper/client/api/BookKeeper) client 
object, you need to construct a 
[`ClientConfiguration`](../javadoc/org/apache/bookkeeper/conf/ClientConfiguration)
 object and set a [connection string](#connection-string) first, and then use 
[`BookKeeperBuilder`](../javadoc/org/apache/bookkeeper/client/api/BookKeeperBuilder)
 to build the client.
+
+Here is an example building the bookkeeper client.
+
+```java
+// construct a client configuration instance
+ClientConfiguration conf = new ClientConfiguration();
+conf.setZkServers(zkConnectionString);
+conf.setZkLedgersRootPath("/path/to/ledgers/root");
+
+// build the bookkeeper client
+BookKeeper bk = BookKeeper.newBuilder(conf)
+    .statsLogger(...)
+    ...
+    .build();
+
+```
+
+### Create ledgers
+
+the easiest way to create a {% pop ledger %} using the java client is via the 
[`createbuilder`](../javadoc/org/apache/bookkeeper/client/api/createbuilder). 
you must specify at least
+a [`digesttype`](../javadoc/org/apache/bookkeeper/client/api/digesttype) and a 
password.
+
+here's an example:
+
+```java
+BookKeeper bk = ...;
+
+byte[] password = "some-password".getBytes();
+
+WriteHandle wh = bk.newCreateLedgerOp()
+    .withDigestType(DigestType.CRC32)
+    .withPassword(password)
+    .withEnsembleSize(3)
+    .withWriteQuorumSize(3)
+    .withAckQuorumSize(2)
+    .execute()          // execute the creation op
+    .get();             // wait for the execution to complete
+```
+
+A [`WriteHandle`](../javadoc/org/apache/bookkeeper/client/api/WriteHandle) is 
returned for applications to write and read entries to and from the ledger.
+
+### Write flags
+
+You can specify behaviour of the writer by setting 
[`WriteFlags`](../javadoc/org/apache/bookkeeper/client/api/WriteFlag) at ledger 
creation type.
+These flags are applied only during write operations and are not recorded on 
metadata.
+
+
+Available write flags:
+
+| Flag  | Explanation  | Notes |
+:---------|:------------|:-------
+DEFERRED_SYNC | Writes are acknowledged early, without waiting for
+guarantees of durability | Data will be only written to the OS page cache, 
without forcing an fsync.
+
+```java
+BookKeeper bk = ...;
+
+byte[] password = "some-password".getBytes();
+
+WriteHandle wh = bk.newCreateLedgerOp()
+    .withDigestType(DigestType.CRC32)
+    .withPassword(password)
+    .withEnsembleSize(3)
+    .withWriteQuorumSize(3)
+    .withAckQuorumSize(2)
+    .withWriteFlags(DEFERRED_SYNC)
+    .execute()          // execute the creation op
+    .get();             // wait for the execution to complete
+```
+
+
+### Append entries to ledgers
+
+The [`WriteHandle`](../javadoc/org/apache/bookkeeper/client/api/WriteHandle) 
can be used for applications to append entries to the ledgers.
+
+```java
+WriteHandle wh = ...;
+
+CompletableFuture<Long> addFuture = wh.append("Some entry data".getBytes());
+
+// option 1: you can wait for add to complete synchronously
+try {
+    long entryId = FutureUtils.result(addFuture.get());
+} catch (BKException bke) {
+    // error handling
+}
+
+// option 2: you can process the result and exception asynchronously
+addFuture
+    .thenApply(entryId -> {
+        // process the result
+    })
+    .exceptionally(cause -> {
+        // handle the exception
+    })
+
+// option 3: bookkeeper provides a twitter-future-like event listener for 
processing result and exception asynchronously
+addFuture.whenComplete(new FutureEventListener() {
+    @Override
+    public void onSuccess(long entryId) {
+        // process the result
+    }
+    @Override
+    public void onFailure(Throwable cause) {
+        // handle the exception
+    }
+});
+```
+
+The append method supports three representations of a bytes array: the native 
java `byte[]`, java nio `ByteBuffer` and netty `ByteBuf`.
+It is recommended to use `ByteBuf` as it is more gc friendly.
+
+### Open ledgers
+
+You can open ledgers to read entries. Opening ledgers is done by 
[`openBuilder`](../javadoc/org/apache/bookkeeper/client/api/openBuilder). You 
must specify the ledgerId and the password
+in order to open the ledgers.
+
+here's an example:
+
+```java
+BookKeeper bk = ...;
+
+long ledgerId = ...;
+byte[] password = "some-password".getBytes();
+
+ReadHandle rh = bk.newOpenLedgerOp()
+    .withLedgerId(ledgerId)
+    .withPassword(password)
+    .execute()          // execute the open op
+    .get();             // wait for the execution to complete
+```
+
+A [`ReadHandle`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle) is 
returned for applications to read entries to and from the ledger.
+
+#### Recovery vs NoRecovery
+
+By default, the 
[`openBuilder`](../javadoc/org/apache/bookkeeper/client/api/openBuilder) opens 
the ledger in a `NoRecovery` mode. You can open the ledger in `Recovery` mode 
by specifying
+`withRecovery(true)` in the open builder.
+
+```java
+BookKeeper bk = ...;
+
+long ledgerId = ...;
+byte[] password = "some-password".getBytes();
+
+ReadHandle rh = bk.newOpenLedgerOp()
+    .withLedgerId(ledgerId)
+    .withPassword(password)
+    .withRecovery(true)
+    .execute()
+    .get();
+
+```
+
+**What is the difference between "Recovery" and "NoRecovery"?**
+
+If you are opening a ledger in "Recovery" mode, it will basically fence and 
seal the ledger -- no more entries are allowed
+to be appended to it. The writer which is currently appending entries to the 
ledger will fail with 
[`LedgerFencedException`](../javadoc/org/apache/bookkeeper/client/api/BKException.Code#LedgerFencedException).
+
+In constrat, opening a ledger in "NoRecovery" mode, it will not fence and seal 
the ledger. "NoRecovery" mode is usually used by applications to tailing-read 
from a ledger.
+
+### Read entries from ledgers
+
+The [`ReadHandle`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle) 
returned from the open builder can be used for applications to read entries 
from the ledgers.
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = ...;
+long endEntryId = ...;
+CompletableFuture<LedgerEntries> readFuture = rh.read(startEntryId, 
endEntryId);
+
+// option 1: you can wait for read to complete synchronously
+try {
+    LedgerEntries entries = FutureUtils.result(readFuture.get());
+} catch (BKException bke) {
+    // error handling
+}
+
+// option 2: you can process the result and exception asynchronously
+readFuture
+    .thenApply(entries -> {
+        // process the result
+    })
+    .exceptionally(cause -> {
+        // handle the exception
+    })
+
+// option 3: bookkeeper provides a twitter-future-like event listener for 
processing result and exception asynchronously
+readFuture.whenComplete(new FutureEventListener<>() {
+    @Override
+    public void onSuccess(LedgerEntries entries) {
+        // process the result
+    }
+    @Override
+    public void onFailure(Throwable cause) {
+        // handle the exception
+    }
+});
+```
+
+Once you are done with processing the 
[`LedgerEntries`](../javadoc/org/apache/bookkeeper/client/api/LedgerEntries), 
you can call `#close()` on the `LedgerEntries` instance to
+release the buffers held by it.
+
+Applications are allowed to read any entries between `0` and 
[`LastAddConfirmed`](../javadoc/org/apache/bookkeeper/client/api/ReadHandle.html#getLastAddConfirmed).
 If the applications
+attempts to read entries beyond `LastAddConfirmed`, they will receive 
[`IncorrectParameterException`](../javadoc/org/apache/bookkeeper/client/api/BKException.Code#IncorrectParameterException).
+
+### Read unconfirmed entries from ledgers
+
+`readUnconfirmed` is provided the mechanism for applications to read entries 
beyond `LastAddConfirmed`. Applications should be aware of `readUnconfirmed` 
doesn't provide any
+repeatable read consistency.
+
+```java
+CompletableFuture<LedgerEntries> readFuture = rh.readUnconfirmed(startEntryId, 
endEntryId);
+```
+
+### Tailing Reads
+
+There are two methods for applications to achieve tailing reads: `Polling` and 
`Long-Polling`.
+
+#### Polling
+
+You can do this in synchronous way:
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = 0L;
+long nextEntryId = startEntryId;
+int numEntriesPerBatch = 4;
+while (!rh.isClosed() || nextEntryId <= rh.getLastAddConfirmed()) {
+    long lac = rh.getLastAddConfirmed();
+    if (nextEntryId > lac) {
+        // no more entries are added
+        Thread.sleep(1000);
+
+        lac = rh.readLastAddConfirmed().get();
+        continue;
+    }
+
+    long endEntryId = Math.min(lac, nextEntryId + numEntriesPerBatch - 1);
+    LedgerEntries entries = rh.read(nextEntryId, endEntryId).get();
+
+    // process the entries
+
+    nextEntryId = endEntryId + 1;
+}
+```
+
+#### Long Polling
+
+```java
+ReadHandle rh = ...;
+
+long startEntryId = 0L;
+long nextEntryId = startEntryId;
+int numEntriesPerBatch = 4;
+while (!rh.isClosed() || nextEntryId <= rh.getLastAddConfirmed()) {
+    long lac = rh.getLastAddConfirmed();
+    if (nextEntryId > lac) {
+        // no more entries are added
+        try (LastConfirmedAndEntry lacAndEntry = 
rh.readLastAddConfirmedAndEntry(nextEntryId, 1000, false).get()) {
+            if (lacAndEntry.hasEntry()) {
+                // process the entry
+
+                ++nextEntryId;
+            }
+        }
+    } else {
+        long endEntryId = Math.min(lac, nextEntryId + numEntriesPerBatch - 1);
+        LedgerEntries entries = rh.read(nextEntryId, endEntryId).get();
+
+        // process the entries
+        nextEntryId = endEntryId + 1;
+    }
+}
+```
+
+### Delete ledgers
+
+{% pop Ledgers %} can be deleted by using 
[`DeleteBuilder`](../javadoc/org/apache/bookkeeper/client/api/DeleteBuilder).
+
+```java
+BookKeeper bk = ...;
+long ledgerId = ...;
+
+bk.newDeleteLedgerOp()
+    .withLedgerId(ledgerId)
+    .execute()
+    .get();
+```
diff --git a/site/docs/latest/api/overview.md b/site/docs/latest/api/overview.md
index 3eb6492..3e0adcd 100644
--- a/site/docs/latest/api/overview.md
+++ b/site/docs/latest/api/overview.md
@@ -5,7 +5,7 @@ title: BookKeeper API
 BookKeeper offers a few APIs that applications can use to interact with it:
 
 * The [ledger API](../ledger-api) is a lower-level API that enables you to 
interact with {% pop ledgers %} directly
-* The [Ledger Advanced API)(../ledger-adv-api) is an advanced extension to 
[Ledger API](../ledger-api) to provide more flexibilities to applications.
+* The [Ledger Advanced API](../ledger-adv-api) is an advanced extension to 
[Ledger API](../ledger-api) to provide more flexibilities to applications.
 * The [DistributedLog API](../distributedlog-api) is a higher-level API that 
provides convenient abstractions.
 
 ## Trade-offs

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to