This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.6 by this push:
     new 796de64  ISSUE #550: add readLastAddConfirmedAndEntry in ReadHandle 
for long poll read
796de64 is described below

commit 796de64add1573485f46d98b423474ad22a4c0c8
Author: Jia Zhai <[email protected]>
AuthorDate: Sun Nov 19 22:48:18 2017 -0800

    ISSUE #550: add readLastAddConfirmedAndEntry in ReadHandle for long poll 
read
    
    Descriptions of the changes in this PR:
    1, add a class LastAddConfirmedAndEntry and metnod 
readLastAddConfirmedAndEntry() in ReadHandle;
    2, add implementation for readLastAddConfirmedAndEntry in LedgerHandler;
    3, add testcase in BookKeeperApiTest;
    4, remove un-used imports, break down long lines, fix wrong comments.
    
    Author: Jia Zhai <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #729 from zhaijack/issue-550, closes #550
    
    (cherry picked from commit c9a1b8adbb2eb4e7bd2fe9d55648ac684828fe8f)
    Signed-off-by: Sijie Guo <[email protected]>
---
 .../org/apache/bookkeeper/client/LedgerHandle.java | 33 +++++++++---
 .../bookkeeper/client/SyncCallbackUtils.java       | 25 ++++++---
 .../client/api/LastConfirmedAndEntry.java          | 50 +++++++++++++++++
 .../apache/bookkeeper/client/api/ReadHandle.java   | 19 +++++++
 .../client/impl/LastConfirmedAndEntryImpl.java     | 63 ++++++++++++++++++++++
 .../bookkeeper/client/api/BookKeeperApiTest.java   |  6 +++
 6 files changed, 183 insertions(+), 13 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index e71ae9c..60ad65f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -51,14 +51,14 @@ import 
org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
+import 
org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
 import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadResult;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
 import 
org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
-import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.WriteHandle;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -965,7 +965,7 @@ public class LedgerHandle implements WriteHandle {
     /**
      * Obtains asynchronously the last confirmed write from a quorum of 
bookies.
      * It is similar as
-     * {@link 
#asyncTryReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback,
 Object)},
+     * {@link 
#asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback,
 Object)},
      * but it doesn't wait all the responses from the quorum. It would callback
      * immediately if it received a LAC which is larger than current LAC.
      *
@@ -1008,7 +1008,7 @@ public class LedgerHandle implements WriteHandle {
     }
 
     /**
-     * @{@inheritDoc }
+     * {@inheritDoc}
      */
     @Override
     public CompletableFuture<Long> tryReadLastAddConfirmed() {
@@ -1018,7 +1018,7 @@ public class LedgerHandle implements WriteHandle {
     }
 
     /**
-     * @{@inheritDoc }
+     * {@inheritDoc}
      */
     @Override
     public CompletableFuture<Long> readLastAddConfirmed() {
@@ -1028,6 +1028,18 @@ public class LedgerHandle implements WriteHandle {
     }
 
     /**
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> 
readLastAddConfirmedAndEntry(long entryId,
+                                                                               
  long timeOutInMillis,
+                                                                               
  boolean parallel) {
+        FutureReadLastConfirmedAndEntry result = new 
FutureReadLastConfirmedAndEntry();
+        asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel, 
result, null);
+        return result;
+    }
+
+    /**
      * Asynchronous read next entry and the latest last add confirmed.
      * If the next entryId is less than known last add confirmed, the call 
will read next entry directly.
      * If the next entryId is ahead of known last add confirmed, the call will 
issue a long poll read
@@ -1083,7 +1095,8 @@ public class LedgerHandle implements WriteHandle {
             return;
         }
         // wait for entry <i>entryId</i>
-        ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb = 
new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
+        ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb =
+            new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
             AtomicBoolean completed = new AtomicBoolean(false);
             @Override
             public void readLastConfirmedAndEntryComplete(int rc, long 
lastAddConfirmed, LedgerEntry entry) {
@@ -1098,7 +1111,13 @@ public class LedgerHandle implements WriteHandle {
                 }
             }
         };
-        new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1, 
timeOutInMillis, bk.getScheduler()).parallelRead(parallel).initiate();
+        new ReadLastConfirmedAndEntryOp(this,
+            innercb,
+            entryId - 1,
+            timeOutInMillis,
+            bk.getScheduler())
+            .parallelRead(parallel)
+            .initiate();
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
index 822d327..13bcff8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
@@ -21,14 +21,16 @@ import com.google.common.collect.Iterators;
 import java.util.Enumeration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import static org.apache.bookkeeper.client.LedgerHandle.LOG;
-import org.apache.bookkeeper.client.api.Handle;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
 
 /**
  * Utility for callbacks
  *
  */
+@Slf4j
 class SyncCallbackUtils {
 
     /**
@@ -182,10 +184,10 @@ class SyncCallbackUtils {
         @Override
         public void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
             if (rc != BKException.Code.OK) {
-                LOG.warn("LastAddConfirmedUpdate failed: {} ", 
BKException.getMessage(rc));
+                log.warn("LastAddConfirmedUpdate failed: {} ", 
BKException.getMessage(rc));
             } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Callback LAC Updated for: {} ", lh.getId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Callback LAC Updated for: {} ", lh.getId());
                 }
             }
         }
@@ -266,7 +268,8 @@ class SyncCallbackUtils {
         }
     }
 
-    static class FutureReadLastConfirmed extends CompletableFuture<Long> 
implements AsyncCallback.ReadLastConfirmedCallback {
+    static class FutureReadLastConfirmed extends CompletableFuture<Long>
+        implements AsyncCallback.ReadLastConfirmedCallback {
 
         @Override
         public void readLastConfirmedComplete(int rc, long lastConfirmed, 
Object ctx) {
@@ -312,4 +315,14 @@ class SyncCallbackUtils {
         }
     }
 
+    static class FutureReadLastConfirmedAndEntry
+        extends CompletableFuture<LastConfirmedAndEntry> implements 
AsyncCallback.ReadLastConfirmedAndEntryCallback {
+
+        @Override
+        public void readLastConfirmedAndEntryComplete(int rc, long 
lastConfirmed, LedgerEntry entry, Object ctx) {
+            LastConfirmedAndEntry result = new 
LastConfirmedAndEntryImpl(lastConfirmed, entry);
+            finish(rc, result, this);
+        }
+    }
+
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
new file mode 100644
index 0000000..3a10d96
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+/**
+ * This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
+ * It is used for readLastAddConfirmedAndEntry.
+ */
+public interface LastConfirmedAndEntry {
+
+    /**
+     * Gets LastAddConfirmed entryId.
+     *
+     * @return the LastAddConfirmed
+     */
+    Long getLastAddConfirmed();
+
+    /**
+     * Whether this entity contains an entry.
+     *
+     * @return true if Entry not null
+     */
+    boolean hasEntry();
+
+    /**
+     * Gets wanted LedgerEntry.
+     *
+     * @return the LedgerEntry
+     */
+    LedgerEntry getEntry();
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
index 7ac4645..cea0f8e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
@@ -115,4 +115,23 @@ public interface ReadHandle extends Handle {
      */
     long getLength();
 
+    /**
+     * Asynchronous read specific entry and the latest last add confirmed.
+     * If the next entryId is less than known last add confirmed, the call 
will read next entry directly.
+     * If the next entryId is ahead of known last add confirmed, the call will 
issue a long poll read
+     * to wait for the next entry <i>entryId</i>.
+     *
+     * @param entryId
+     *          next entry id to read
+     * @param timeOutInMillis
+     *          timeout period to wait for the entry id to be available (for 
long poll only)
+     *          if timeout for get the entry, it will return null entry.
+     * @param parallel
+     *          whether to issue the long poll reads in parallel
+     * @return an handle to the result of the operation
+     */
+    CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long 
entryId,
+                                                                          long 
timeOutInMillis,
+                                                                          
boolean parallel);
+
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
new file mode 100644
index 0000000..4ed78f9
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.impl;
+
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+
+/**
+ * This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
+ * It is used for readLastAddConfirmedAndEntry.
+ */
+public class LastConfirmedAndEntryImpl implements LastConfirmedAndEntry {
+
+    private final Long lac;
+    private final LedgerEntry entry;
+
+    public LastConfirmedAndEntryImpl(Long lac, LedgerEntry entry) {
+        this.lac = lac;
+        this.entry = entry;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Long getLastAddConfirmed() {
+        return lac;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean hasEntry() {
+        return entry != null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public LedgerEntry getEntry() {
+        return entry;
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index 61fac94..8a3d68f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -203,6 +203,12 @@ public class BookKeeperApiTest extends 
MockBookKeeperTestCase {
             assertEquals(2, 
result(reader.tryReadLastAddConfirmed()).intValue());
             checkEntries(result(reader.read(0, reader.getLastAddConfirmed())), 
data);
             checkEntries(result(reader.readUnconfirmed(0, 
reader.getLastAddConfirmed())), data);
+
+            // test readLastAddConfirmedAndEntry
+            LastConfirmedAndEntry lastConfirmedAndEntry =
+                result(reader.readLastAddConfirmedAndEntry(0, 999, false));
+            assertEquals(2, 
lastConfirmedAndEntry.getLastAddConfirmed().intValue());
+            assertArrayEquals(data, 
lastConfirmedAndEntry.getEntry().getEntry());
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to