This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.6 by this push:
new 796de64 ISSUE #550: add readLastAddConfirmedAndEntry in ReadHandle
for long poll read
796de64 is described below
commit 796de64add1573485f46d98b423474ad22a4c0c8
Author: Jia Zhai <[email protected]>
AuthorDate: Sun Nov 19 22:48:18 2017 -0800
ISSUE #550: add readLastAddConfirmedAndEntry in ReadHandle for long poll
read
Descriptions of the changes in this PR:
1, add a class LastAddConfirmedAndEntry and metnod
readLastAddConfirmedAndEntry() in ReadHandle;
2, add implementation for readLastAddConfirmedAndEntry in LedgerHandler;
3, add testcase in BookKeeperApiTest;
4, remove un-used imports, break down long lines, fix wrong comments.
Author: Jia Zhai <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>, Venkateswararao Jujjuri (JV) <None>
This closes #729 from zhaijack/issue-550, closes #550
(cherry picked from commit c9a1b8adbb2eb4e7bd2fe9d55648ac684828fe8f)
Signed-off-by: Sijie Guo <[email protected]>
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 33 +++++++++---
.../bookkeeper/client/SyncCallbackUtils.java | 25 ++++++---
.../client/api/LastConfirmedAndEntry.java | 50 +++++++++++++++++
.../apache/bookkeeper/client/api/ReadHandle.java | 19 +++++++
.../client/impl/LastConfirmedAndEntryImpl.java | 63 ++++++++++++++++++++++
.../bookkeeper/client/api/BookKeeperApiTest.java | 6 +++
6 files changed, 183 insertions(+), 13 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index e71ae9c..60ad65f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -51,14 +51,14 @@ import
org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
+import
org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadResult;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
import
org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
-import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.WriteHandle;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -965,7 +965,7 @@ public class LedgerHandle implements WriteHandle {
/**
* Obtains asynchronously the last confirmed write from a quorum of
bookies.
* It is similar as
- * {@link
#asyncTryReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback,
Object)},
+ * {@link
#asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback,
Object)},
* but it doesn't wait all the responses from the quorum. It would callback
* immediately if it received a LAC which is larger than current LAC.
*
@@ -1008,7 +1008,7 @@ public class LedgerHandle implements WriteHandle {
}
/**
- * @{@inheritDoc }
+ * {@inheritDoc}
*/
@Override
public CompletableFuture<Long> tryReadLastAddConfirmed() {
@@ -1018,7 +1018,7 @@ public class LedgerHandle implements WriteHandle {
}
/**
- * @{@inheritDoc }
+ * {@inheritDoc}
*/
@Override
public CompletableFuture<Long> readLastAddConfirmed() {
@@ -1028,6 +1028,18 @@ public class LedgerHandle implements WriteHandle {
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public CompletableFuture<LastConfirmedAndEntry>
readLastAddConfirmedAndEntry(long entryId,
+
long timeOutInMillis,
+
boolean parallel) {
+ FutureReadLastConfirmedAndEntry result = new
FutureReadLastConfirmedAndEntry();
+ asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel,
result, null);
+ return result;
+ }
+
+ /**
* Asynchronous read next entry and the latest last add confirmed.
* If the next entryId is less than known last add confirmed, the call
will read next entry directly.
* If the next entryId is ahead of known last add confirmed, the call will
issue a long poll read
@@ -1083,7 +1095,8 @@ public class LedgerHandle implements WriteHandle {
return;
}
// wait for entry <i>entryId</i>
- ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb =
new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
+ ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb =
+ new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
AtomicBoolean completed = new AtomicBoolean(false);
@Override
public void readLastConfirmedAndEntryComplete(int rc, long
lastAddConfirmed, LedgerEntry entry) {
@@ -1098,7 +1111,13 @@ public class LedgerHandle implements WriteHandle {
}
}
};
- new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1,
timeOutInMillis, bk.getScheduler()).parallelRead(parallel).initiate();
+ new ReadLastConfirmedAndEntryOp(this,
+ innercb,
+ entryId - 1,
+ timeOutInMillis,
+ bk.getScheduler())
+ .parallelRead(parallel)
+ .initiate();
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
index 822d327..13bcff8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
@@ -21,14 +21,16 @@ import com.google.common.collect.Iterators;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import static org.apache.bookkeeper.client.LedgerHandle.LOG;
-import org.apache.bookkeeper.client.api.Handle;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
/**
* Utility for callbacks
*
*/
+@Slf4j
class SyncCallbackUtils {
/**
@@ -182,10 +184,10 @@ class SyncCallbackUtils {
@Override
public void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
if (rc != BKException.Code.OK) {
- LOG.warn("LastAddConfirmedUpdate failed: {} ",
BKException.getMessage(rc));
+ log.warn("LastAddConfirmedUpdate failed: {} ",
BKException.getMessage(rc));
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Callback LAC Updated for: {} ", lh.getId());
+ if (log.isDebugEnabled()) {
+ log.debug("Callback LAC Updated for: {} ", lh.getId());
}
}
}
@@ -266,7 +268,8 @@ class SyncCallbackUtils {
}
}
- static class FutureReadLastConfirmed extends CompletableFuture<Long>
implements AsyncCallback.ReadLastConfirmedCallback {
+ static class FutureReadLastConfirmed extends CompletableFuture<Long>
+ implements AsyncCallback.ReadLastConfirmedCallback {
@Override
public void readLastConfirmedComplete(int rc, long lastConfirmed,
Object ctx) {
@@ -312,4 +315,14 @@ class SyncCallbackUtils {
}
}
+ static class FutureReadLastConfirmedAndEntry
+ extends CompletableFuture<LastConfirmedAndEntry> implements
AsyncCallback.ReadLastConfirmedAndEntryCallback {
+
+ @Override
+ public void readLastConfirmedAndEntryComplete(int rc, long
lastConfirmed, LedgerEntry entry, Object ctx) {
+ LastConfirmedAndEntry result = new
LastConfirmedAndEntryImpl(lastConfirmed, entry);
+ finish(rc, result, this);
+ }
+ }
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
new file mode 100644
index 0000000..3a10d96
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+/**
+ * This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
+ * It is used for readLastAddConfirmedAndEntry.
+ */
+public interface LastConfirmedAndEntry {
+
+ /**
+ * Gets LastAddConfirmed entryId.
+ *
+ * @return the LastAddConfirmed
+ */
+ Long getLastAddConfirmed();
+
+ /**
+ * Whether this entity contains an entry.
+ *
+ * @return true if Entry not null
+ */
+ boolean hasEntry();
+
+ /**
+ * Gets wanted LedgerEntry.
+ *
+ * @return the LedgerEntry
+ */
+ LedgerEntry getEntry();
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
index 7ac4645..cea0f8e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
@@ -115,4 +115,23 @@ public interface ReadHandle extends Handle {
*/
long getLength();
+ /**
+ * Asynchronous read specific entry and the latest last add confirmed.
+ * If the next entryId is less than known last add confirmed, the call
will read next entry directly.
+ * If the next entryId is ahead of known last add confirmed, the call will
issue a long poll read
+ * to wait for the next entry <i>entryId</i>.
+ *
+ * @param entryId
+ * next entry id to read
+ * @param timeOutInMillis
+ * timeout period to wait for the entry id to be available (for
long poll only)
+ * if timeout for get the entry, it will return null entry.
+ * @param parallel
+ * whether to issue the long poll reads in parallel
+ * @return an handle to the result of the operation
+ */
+ CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long
entryId,
+ long
timeOutInMillis,
+
boolean parallel);
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
new file mode 100644
index 0000000..4ed78f9
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.impl;
+
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+
+/**
+ * This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
+ * It is used for readLastAddConfirmedAndEntry.
+ */
+public class LastConfirmedAndEntryImpl implements LastConfirmedAndEntry {
+
+ private final Long lac;
+ private final LedgerEntry entry;
+
+ public LastConfirmedAndEntryImpl(Long lac, LedgerEntry entry) {
+ this.lac = lac;
+ this.entry = entry;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Long getLastAddConfirmed() {
+ return lac;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean hasEntry() {
+ return entry != null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public LedgerEntry getEntry() {
+ return entry;
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index 61fac94..8a3d68f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -203,6 +203,12 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
assertEquals(2,
result(reader.tryReadLastAddConfirmed()).intValue());
checkEntries(result(reader.read(0, reader.getLastAddConfirmed())),
data);
checkEntries(result(reader.readUnconfirmed(0,
reader.getLastAddConfirmed())), data);
+
+ // test readLastAddConfirmedAndEntry
+ LastConfirmedAndEntry lastConfirmedAndEntry =
+ result(reader.readLastAddConfirmedAndEntry(0, 999, false));
+ assertEquals(2,
lastConfirmedAndEntry.getLastAddConfirmed().intValue());
+ assertArrayEquals(data,
lastConfirmedAndEntry.getEntry().getEntry());
}
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].