This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c1947c7 Add sync variants of all methods in handles
c1947c7 is described below
commit c1947c7b7d7c0a6736b2a35511b5f70072c12972
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Mar 27 23:25:00 2018 -0700
Add sync variants of all methods in handles
As discussed on the mailing list [1], this patch removes the
inconsistency around the naming of the close call on the new handle
APIs, by creating sync versions of each async calls, and renaming the
async versions to have the suffix "Async".
Most of the changes are very mechanical - just a copy of the old
method and some small fixups the javadoc. One thing to note is that
I've made a copy of the close and closeAsync methods in the
WriteHandle interface, so that the ReadHandle and Handle javadoc for
these methods do not have to talk about what it means to close/seal a
ledger.
Another change is that I've removed the SneakyThrows from close, that
would have also been needed on the other sync methods. Instead, I pass a
exception handler to FutureUtils which generates a BKException.
[1]
https://lists.apache.org/thread.html/c3784cffb949438510d21e5eac8c0351865c6748c42c380e673a60db%3Cdev.bookkeeper.apache.org%3E
Author: Ivan Kelly <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>
This closes #1288 from ivankelly/async-rename
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 20 ++---
.../apache/bookkeeper/client/LedgerHandleAdv.java | 2 +-
.../apache/bookkeeper/client/api/BKException.java | 16 +++-
.../org/apache/bookkeeper/client/api/Handle.java | 29 +++----
.../apache/bookkeeper/client/api/ReadHandle.java | 95 +++++++++++++++++++---
.../bookkeeper/client/api/WriteAdvHandle.java | 57 ++++++++++++-
.../apache/bookkeeper/client/api/WriteHandle.java | 76 ++++++++++++++++-
.../cli/commands/client/SimpleTestCommand.java | 2 +-
.../bookkeeper/client/BookieWriteLedgerTest.java | 4 +-
.../client/TestMaxEnsembleChangeNum.java | 8 +-
.../bookkeeper/client/api/BookKeeperApiTest.java | 60 +++++++-------
.../bookkeeper/client/api/WriteAdvHandleTest.java | 20 ++---
.../bookkeeper/client/api/WriteHandleTest.java | 5 +-
.../java/org/apache/bookkeeper/util/TestUtils.java | 2 +-
.../cli/commands/client/SimpleTestCommandTest.java | 8 +-
15 files changed, 300 insertions(+), 104 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index eebe43c..8db5667 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -389,14 +389,14 @@ public class LedgerHandle implements WriteHandle {
@Override
public void close()
throws InterruptedException, BKException {
- SyncCallbackUtils.waitForResult(asyncClose());
+ SyncCallbackUtils.waitForResult(closeAsync());
}
/**
* {@inheritDoc}
*/
@Override
- public CompletableFuture<Void> asyncClose() {
+ public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> result = new CompletableFuture<>();
SyncCloseCallback callback = new SyncCloseCallback(result);
asyncClose(callback, null);
@@ -707,7 +707,7 @@ public class LedgerHandle implements WriteHandle {
* id of last entry of sequence
*/
@Override
- public CompletableFuture<LedgerEntries> read(long firstEntry, long
lastEntry) {
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
// Little sanity check
if (firstEntry < 0 || firstEntry > lastEntry) {
LOG.error("IncorrectParameterException on ledgerId:{}
firstEntry:{} lastEntry:{}",
@@ -748,7 +748,7 @@ public class LedgerHandle implements WriteHandle {
* @see #readUnconfirmedEntries(long, long)
*/
@Override
- public CompletableFuture<LedgerEntries> readUnconfirmed(long firstEntry,
long lastEntry) {
+ public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long
firstEntry, long lastEntry) {
// Little sanity check
if (firstEntry < 0 || firstEntry > lastEntry) {
LOG.error("IncorrectParameterException on ledgerId:{}
firstEntry:{} lastEntry:{}",
@@ -852,7 +852,7 @@ public class LedgerHandle implements WriteHandle {
* {@inheritDoc}
*/
@Override
- public CompletableFuture<Long> append(ByteBuf data) {
+ public CompletableFuture<Long> appendAsync(ByteBuf data) {
SyncAddCallback callback = new SyncAddCallback();
asyncAddEntry(data, callback, null);
return callback;
@@ -1206,7 +1206,7 @@ public class LedgerHandle implements WriteHandle {
* {@inheritDoc}
*/
@Override
- public CompletableFuture<Long> tryReadLastAddConfirmed() {
+ public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
FutureReadLastConfirmed result = new FutureReadLastConfirmed();
asyncTryReadLastConfirmed(result, null);
return result;
@@ -1216,7 +1216,7 @@ public class LedgerHandle implements WriteHandle {
* {@inheritDoc}
*/
@Override
- public CompletableFuture<Long> readLastAddConfirmed() {
+ public CompletableFuture<Long> readLastAddConfirmedAsync() {
FutureReadLastConfirmed result = new FutureReadLastConfirmed();
asyncReadLastConfirmed(result, null);
return result;
@@ -1226,9 +1226,9 @@ public class LedgerHandle implements WriteHandle {
* {@inheritDoc}
*/
@Override
- public CompletableFuture<LastConfirmedAndEntry>
readLastAddConfirmedAndEntry(long entryId,
-
long timeOutInMillis,
-
boolean parallel) {
+ public CompletableFuture<LastConfirmedAndEntry>
readLastAddConfirmedAndEntryAsync(long entryId,
+
long timeOutInMillis,
+
boolean parallel) {
FutureReadLastConfirmedAndEntry result = new
FutureReadLastConfirmedAndEntry();
asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel,
result, null);
return result;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 8058103..afd56cf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -246,7 +246,7 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
}
@Override
- public CompletableFuture<Long> write(long entryId, ByteBuf data) {
+ public CompletableFuture<Long> writeAsync(long entryId, ByteBuf data) {
SyncAddCallback callback = new SyncAddCallback();
asyncAddEntry(entryId, data, callback, data);
return callback;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
index 3a51716..b81f33d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
@@ -16,6 +16,7 @@
package org.apache.bookkeeper.client.api;
import java.lang.reflect.Field;
+import java.util.function.Function;
import org.apache.bookkeeper.client.LedgerHandleAdv;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
@@ -28,7 +29,20 @@ import
org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
*/
@Public
@Unstable
-public abstract class BKException extends Exception {
+public class BKException extends Exception {
+ static final Function<Throwable, BKException> HANDLER = cause -> {
+ if (cause == null) {
+ return null;
+ }
+ if (cause instanceof BKException) {
+ return (BKException) cause;
+ } else {
+ BKException ex = new
BKException(Code.UnexpectedConditionException);
+ ex.initCause(cause);
+ return ex;
+ }
+ };
+
protected final int code;
private static final LogMessagePool logMessagePool = new LogMessagePool();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
index f520e61..1f2f8ca 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.client.api;
import java.util.concurrent.CompletableFuture;
-import lombok.SneakyThrows;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -43,19 +42,25 @@ public interface Handle extends AutoCloseable {
long getId();
/**
- * Close this ledger synchronously.
+ * Close this handle synchronously.
*
* @throws org.apache.bookkeeper.client.api.BKException
* @throws java.lang.InterruptedException
- * @see #asyncClose
+ * @see #closeAsync
*/
@Override
- @SneakyThrows(Exception.class)
default void close() throws BKException, InterruptedException {
- FutureUtils.result(asyncClose());
+ FutureUtils.<Void, BKException>result(closeAsync(),
BKException.HANDLER);
}
/**
+ * Asynchronous close the handle.
+ *
+ * @return an handle to access the result of the operation
+ */
+ CompletableFuture<Void> closeAsync();
+
+ /**
* Returns the metadata of this ledger.
*
* <p>This call only retrieves the metadata cached locally. If there is
any metadata updated, the read
@@ -65,18 +70,4 @@ public interface Handle extends AutoCloseable {
* @return the metadata of this ledger.
*/
LedgerMetadata getLedgerMetadata();
-
- /**
- * Asynchronous close, any adds in flight will return errors.
- *
- * <p>Closing a ledger will ensure that all clients agree on what the last
- * entry of the ledger is. This ensures that, once the ledger has been
closed,
- * all reads from the ledger will return the same set of entries.
- *
- * @return an handle to access the result of the operation
- *
- * @see FutureUtils#result(java.util.concurrent.CompletableFuture) to have
a simple method to access the result
- */
- CompletableFuture<Void> asyncClose();
-
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
index 48baf1b..04533dc 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client.api;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
/**
* Provide read access to a ledger.
@@ -42,7 +43,21 @@ public interface ReadHandle extends Handle {
* id of last entry of sequence, inclusive
* @return an handle to the result of the operation
*/
- CompletableFuture<LedgerEntries> read(long firstEntry, long lastEntry);
+ CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry);
+
+ /**
+ * Read a sequence of entries synchronously.
+ *
+ * @param firstEntry
+ * id of first entry of sequence
+ * @param lastEntry
+ * id of last entry of sequence, inclusive
+ * @return the result of the operation
+ */
+ default LedgerEntries read(long firstEntry, long lastEntry) throws
BKException, InterruptedException {
+ return FutureUtils.<LedgerEntries,
BKException>result(readAsync(firstEntry, lastEntry),
+
BKException.HANDLER);
+ }
/**
* Read a sequence of entries asynchronously, allowing to read after the
LastAddConfirmed range.
@@ -64,10 +79,27 @@ public interface ReadHandle extends Handle {
* id of last entry of sequence, inclusive
* @return an handle to the result of the operation
*
- * @see #read(long, long)
- * @see #readLastAddConfirmed()
+ * @see #readAsync(long, long)
+ * @see #readLastAddConfirmedAsync()
+ */
+ CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry,
long lastEntry);
+
+ /**
+ * Read a sequence of entries synchronously.
+ *
+ * @param firstEntry
+ * id of first entry of sequence
+ * @param lastEntry
+ * id of last entry of sequence, inclusive
+ * @return an handle to the result of the operation
+ *
+ * @see #readUnconfirmedAsync(long, long)
*/
- CompletableFuture<LedgerEntries> readUnconfirmed(long firstEntry, long
lastEntry);
+ default LedgerEntries readUnconfirmed(long firstEntry, long lastEntry)
+ throws BKException, InterruptedException {
+ return FutureUtils.<LedgerEntries,
BKException>result(readUnconfirmedAsync(firstEntry, lastEntry),
+
BKException.HANDLER);
+ }
/**
* Obtains asynchronously the last confirmed write from a quorum of
bookies. This
@@ -80,9 +112,20 @@ public interface ReadHandle extends Handle {
*
* @return an handle to the result of the operation
* @see #getLastAddConfirmed()
+ */
+ CompletableFuture<Long> readLastAddConfirmedAsync();
+
+ /**
+ * Obtains asynchronously the last confirmed write from a quorum of
bookies.
*
+ * @return the result of the operation
+ * @see #readLastAddConfirmedAsync()
*/
- CompletableFuture<Long> readLastAddConfirmed();
+ default long readLastAddConfirmed() throws BKException,
InterruptedException {
+ return FutureUtils.<Long,
BKException>result(readLastAddConfirmedAsync(),
+ BKException.HANDLER);
+ }
+
/**
* Obtains asynchronously the last confirmed write from a quorum of bookies
@@ -90,10 +133,20 @@ public interface ReadHandle extends Handle {
* immediately if it received a LAC which is larger than current LAC.
*
* @return an handle to the result of the operation
- * @see #tryReadLastAddConfirmed()
+ */
+ CompletableFuture<Long> tryReadLastAddConfirmedAsync();
+
+ /**
+ * Obtains asynchronously the last confirmed write from a quorum of bookies
+ * but it doesn't wait all the responses from the quorum.
*
+ * @return the result of the operation
+ * @see #tryReadLastAddConfirmedAsync()
*/
- CompletableFuture<Long> tryReadLastAddConfirmed();
+ default long tryReadLastAddConfirmed() throws BKException,
InterruptedException {
+ return FutureUtils.<Long,
BKException>result(tryReadLastAddConfirmedAsync(),
+ BKException.HANDLER);
+ }
/**
* Get the last confirmed entry id on this ledger. It reads the local
state of the ledger handle,
@@ -144,8 +197,30 @@ public interface ReadHandle extends Handle {
* whether to issue the long poll reads in parallel
* @return an handle to the result of the operation
*/
- CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long
entryId,
- long
timeOutInMillis,
-
boolean parallel);
+ CompletableFuture<LastConfirmedAndEntry>
readLastAddConfirmedAndEntryAsync(long entryId,
+
long timeOutInMillis,
+
boolean parallel);
+
+ /**
+ * Asynchronous read specific entry and the latest last add confirmed.
+ *
+ * @param entryId
+ * next entry id to read
+ * @param timeOutInMillis
+ * timeout period to wait for the entry id to be available (for
long poll only)
+ * if timeout for get the entry, it will return null entry.
+ * @param parallel
+ * whether to issue the long poll reads in parallel
+ * @return the result of the operation
+ * @see #readLastAddConfirmedAndEntry(long, long, boolean)
+ */
+ default LastConfirmedAndEntry readLastAddConfirmedAndEntry(long entryId,
+ long
timeOutInMillis,
+ boolean
parallel)
+ throws BKException, InterruptedException {
+ return FutureUtils.<LastConfirmedAndEntry, BKException>result(
+ readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis,
parallel),
+ BKException.HANDLER);
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
index 037f7dd..37f45b9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
/**
* Provide write access to a ledger. Using WriteAdvHandler the writer MUST
explictly set an entryId. Beware that the
@@ -47,7 +48,19 @@ public interface WriteAdvHandle extends ReadHandle {
* @param data array of bytes to be written
* @return an handle to the result, in case of success it will return the
same value of param entryId.
*/
- default CompletableFuture<Long> write(final long entryId, final ByteBuffer
data) {
+ default CompletableFuture<Long> writeAsync(final long entryId, final
ByteBuffer data) {
+ return writeAsync(entryId, Unpooled.wrappedBuffer(data));
+ }
+
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param entryId entryId to be added
+ * @param data array of bytes to be written
+ * @return the same value of param entryId.
+ */
+ default long write(final long entryId, final ByteBuffer data)
+ throws BKException, InterruptedException {
return write(entryId, Unpooled.wrappedBuffer(data));
}
@@ -58,7 +71,19 @@ public interface WriteAdvHandle extends ReadHandle {
* @param data array of bytes to be written
* @return an handle to the result, in case of success it will return the
same value of param {@code entryId}.
*/
- default CompletableFuture<Long> write(final long entryId, final byte[]
data) {
+ default CompletableFuture<Long> writeAsync(final long entryId, final
byte[] data) {
+ return writeAsync(entryId, Unpooled.wrappedBuffer(data));
+ }
+
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param entryId entryId to be added.
+ * @param data array of bytes to be written
+ * @return same value of param {@code entryId}.
+ */
+ default long write(final long entryId, final byte[] data)
+ throws BKException, InterruptedException {
return write(entryId, Unpooled.wrappedBuffer(data));
}
@@ -71,7 +96,21 @@ public interface WriteAdvHandle extends ReadHandle {
* @param length the length to data to write
* @return an handle to the result, in case of success it will return the
same value of param {@code entryId}.
*/
- default CompletableFuture<Long> write(final long entryId, final byte[]
data, int offset, int length) {
+ default CompletableFuture<Long> writeAsync(final long entryId, final
byte[] data, int offset, int length) {
+ return writeAsync(entryId, Unpooled.wrappedBuffer(data, offset,
length));
+ }
+
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param entryId entryId to be added.
+ * @param data array of bytes to be written
+ * @param offset the offset of the bytes array
+ * @param length the length to data to write
+ * @return the same value of param {@code entryId}.
+ */
+ default long write(final long entryId, final byte[] data, int offset, int
length)
+ throws BKException, InterruptedException {
return write(entryId, Unpooled.wrappedBuffer(data, offset, length));
}
@@ -82,6 +121,16 @@ public interface WriteAdvHandle extends ReadHandle {
* @param data array of bytes to be written
* @return an handle to the result, in case of success it will return the
same value of param entryId
*/
- CompletableFuture<Long> write(long entryId, ByteBuf data);
+ CompletableFuture<Long> writeAsync(long entryId, ByteBuf data);
+ /**
+ * Add entry asynchronously to an open ledger.
+ *
+ * @param entryId entryId to be added
+ * @param data array of bytes to be written
+ * @return the same value of param entryId
+ */
+ default long write(long entryId, ByteBuf data) throws BKException,
InterruptedException {
+ return FutureUtils.<Long, BKException>result(writeAsync(entryId,
data), BKException.HANDLER);
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
index cefd749..b2c0459 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
@@ -24,8 +24,10 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
/**
* Provide write access to a ledger.
@@ -45,7 +47,18 @@ public interface WriteHandle extends ReadHandle {
* completable future is returned
* @return an handle to the result, in case of success it will return the
id of the newly appended entry
*/
- CompletableFuture<Long> append(ByteBuf data);
+ CompletableFuture<Long> appendAsync(ByteBuf data);
+
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param data a bytebuf to be written. The bytebuf's reference count will
be decremented by 1 after the
+ * call completes.
+ * @return the id of the newly appended entry
+ */
+ default long append(ByteBuf data) throws BKException, InterruptedException
{
+ return FutureUtils.<Long, BKException>result(appendAsync(data),
BKException.HANDLER);
+ }
/**
* Add entry asynchronously to an open ledger.
@@ -53,7 +66,17 @@ public interface WriteHandle extends ReadHandle {
* @param data array of bytes to be written
* @return an handle to the result, in case of success it will return the
id of the newly appended entry
*/
- default CompletableFuture<Long> append(ByteBuffer data) {
+ default CompletableFuture<Long> appendAsync(ByteBuffer data) {
+ return appendAsync(Unpooled.wrappedBuffer(data));
+ }
+
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param data array of bytes to be written
+ * @return the id of the newly appended entry
+ */
+ default long append(ByteBuffer data) throws BKException,
InterruptedException {
return append(Unpooled.wrappedBuffer(data));
}
@@ -64,7 +87,17 @@ public interface WriteHandle extends ReadHandle {
* @return a completable future represents the add result, in case of
success the future returns the entry id
* of this newly appended entry
*/
- default CompletableFuture<Long> append(byte[] data) {
+ default CompletableFuture<Long> appendAsync(byte[] data) {
+ return appendAsync(Unpooled.wrappedBuffer(data));
+ }
+
+ /**
+ * Add an entry synchronously to an open ledger.
+ *
+ * @param data array of bytes to be written
+ * @return the entry id of this newly appended entry
+ */
+ default long append(byte[] data) throws BKException, InterruptedException {
return append(Unpooled.wrappedBuffer(data));
}
@@ -77,7 +110,19 @@ public interface WriteHandle extends ReadHandle {
* @return a completable future represents the add result, in case of
success the future returns the entry id
* of this newly appended entry
*/
- default CompletableFuture<Long> append(byte[] data, int offset, int
length) {
+ default CompletableFuture<Long> appendAsync(byte[] data, int offset, int
length) {
+ return appendAsync(Unpooled.wrappedBuffer(data, offset, length));
+ }
+
+ /**
+ * Add an entry synchronously to an open ledger.
+ *
+ * @param data array of bytes to be written
+ * @param offset the offset in the bytes array
+ * @param length the length of the bytes to be appended
+ * @return the entry id of this newly appended entry
+ */
+ default long append(byte[] data, int offset, int length) throws
BKException, InterruptedException {
return append(Unpooled.wrappedBuffer(data, offset, length));
}
@@ -89,4 +134,27 @@ public interface WriteHandle extends ReadHandle {
*/
long getLastAddPushed();
+ /**
+ * Asynchronous close the write handle, any adds in flight will return
errors.
+ *
+ * <p>Closing a ledger will ensure that all clients agree on what the last
+ * entry of the ledger is. Once the ledger has been closed, all reads from
the
+ * ledger will return the same set of entries.
+ *
+ * @return an handle to access the result of the operation
+ */
+ @Override
+ CompletableFuture<Void> closeAsync();
+
+ /**
+ * Synchronous close the write handle, any adds in flight will return
errors.
+ *
+ * <p>Closing a ledger will ensure that all clients agree on what the last
+ * entry of the ledger is. Once the ledger has been closed, all reads from
the
+ * ledger will return the same set of entries.
+ */
+ @Override
+ default void close() throws BKException, InterruptedException {
+ FutureUtils.<Void, BKException>result(closeAsync(),
BKException.HANDLER);
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommand.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommand.java
index 60633eb..d54486f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommand.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommand.java
@@ -67,7 +67,7 @@ public class SimpleTestCommand extends ClientCommand {
System.out.println("Ledger ID: " + wh.getId());
long lastReport = System.nanoTime();
for (int i = 0; i < numEntries; i++) {
- result(wh.append(data));
+ wh.append(data);
if (TimeUnit.SECONDS.convert(System.nanoTime() - lastReport,
TimeUnit.NANOSECONDS) > 1) {
System.out.println(i + " entries written");
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 9a63e70..1b061f0 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -617,7 +617,7 @@ public class BookieWriteLedgerTest extends
long entryId = i++;
LOG.info("Writing {}:{} as {}",
ledgerId, entryId,
entry.slice().readInt());
- lastRequest = writer.write(entryId, entry);
+ lastRequest = writer.writeAsync(entryId,
entry);
}
lastRequest.join();
return Pair.of(writer, entries);
@@ -1218,7 +1218,7 @@ public class BookieWriteLedgerTest extends
private void readEntries(ReadHandle reader, List<ByteBuf> entries) throws
Exception {
assertEquals("Not enough entries in ledger " + reader.getId(),
reader.getLastAddConfirmed(), entries.size() - 1);
- try (LedgerEntries readEntries = reader.read(0,
reader.getLastAddConfirmed()).join()) {
+ try (LedgerEntries readEntries = reader.read(0,
reader.getLastAddConfirmed())) {
int i = 0;
for (org.apache.bookkeeper.client.api.LedgerEntry e : readEntries)
{
int entryId = i++;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
index 2a07e1d..0da50b5 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
@@ -53,7 +53,7 @@ public class TestMaxEnsembleChangeNum extends
MockBookKeeperTestCase {
lId = writer.getId();
//first fragment
for (int i = 0; i < numEntries; i++) {
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
}
assertEquals("There should be zero ensemble change",
1, getLedgerMetadata(lId).getEnsembles().size());
@@ -65,7 +65,7 @@ public class TestMaxEnsembleChangeNum extends
MockBookKeeperTestCase {
killBookie(writer.getLedgerMetadata().getEnsembleAt(writer.getLastAddConfirmed()).get(0));
// add failure
try {
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
fail("should not come to here");
} catch (BKException exception){
assertEquals(exception.getCode(), WriteException);
@@ -82,11 +82,11 @@ public class TestMaxEnsembleChangeNum extends
MockBookKeeperTestCase {
killBookie(writer.getLedgerMetadata().getEnsembleAt(writer.getLastAddConfirmed()).get(0));
for (int i = 0; i < numEntries; i++) {
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
}
// ensure there is a ensemble changed
assertEquals("There should be one ensemble change",
expectedSize + num,
writer.getLedgerMetadata().getAllEnsembles().size());
}
}
-}
\ No newline at end of file
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index eadabfb..93fc4ec 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -72,11 +72,11 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
.execute())) {
// test writer is able to write
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
assertEquals(0L, writer.getLastAddPushed());
- result(writer.append(Unpooled.wrappedBuffer(data)));
+ writer.append(Unpooled.wrappedBuffer(data));
assertEquals(1L, writer.getLastAddPushed());
- long expectedEntryId =
result(writer.append(ByteBuffer.wrap(data)));
+ long expectedEntryId = writer.append(ByteBuffer.wrap(data));
assertEquals(expectedEntryId, writer.getLastAddConfirmed());
assertEquals(3 * data.length, writer.getLength());
}
@@ -97,9 +97,9 @@ public class BookKeeperApiTest extends MockBookKeeperTestCase
{
// test writer is able to write
long entryId = 0;
- result(writer.write(entryId++, ByteBuffer.wrap(data)));
- result(writer.write(entryId++, Unpooled.wrappedBuffer(data)));
- long expectedEntryId = result(writer.write(entryId++,
ByteBuffer.wrap(data)));
+ writer.write(entryId++, ByteBuffer.wrap(data));
+ writer.write(entryId++, Unpooled.wrappedBuffer(data));
+ long expectedEntryId = writer.write(entryId++,
ByteBuffer.wrap(data));
assertEquals(expectedEntryId, writer.getLastAddConfirmed());
assertEquals(3 * data.length, writer.getLength());
}
@@ -120,9 +120,9 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
// test writer is able to write
long entryId = 0;
- result(writer.write(entryId++, ByteBuffer.wrap(data)));
- result(writer.write(entryId++, Unpooled.wrappedBuffer(data)));
- long expectedEntryId = result(writer.write(entryId++,
ByteBuffer.wrap(data)));
+ writer.write(entryId++, ByteBuffer.wrap(data));
+ writer.write(entryId++, Unpooled.wrappedBuffer(data));
+ long expectedEntryId = writer.write(entryId++,
ByteBuffer.wrap(data));
assertEquals(expectedEntryId, writer.getLastAddConfirmed());
assertEquals(3 * data.length, writer.getLength());
}
@@ -140,9 +140,9 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
.execute())) {
assertEquals(1234, writer.getId());
long entryId = 0;
- result(writer.write(entryId++, ByteBuffer.wrap(data)));
+ writer.write(entryId++, ByteBuffer.wrap(data));
assertEquals(data.length, writer.getLength());
- result(writer.write(entryId - 1, ByteBuffer.wrap(data)));
+ writer.write(entryId - 1, ByteBuffer.wrap(data));
}
}
@@ -183,7 +183,7 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
.execute())) {
lId = writer.getId();
assertEquals(-1L, writer.getLastAddPushed());
- result(writer.append(ByteBuffer.wrap(bigData)));
+ writer.append(ByteBuffer.wrap(bigData));
assertEquals(bigData.length, writer.getLength());
}
try (ReadHandle reader = result(newOpenLedgerOp()
@@ -191,7 +191,7 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
.withPassword(password)
.withLedgerId(lId)
.execute())) {
- LedgerEntries entries = result(reader.read(0, 0));
+ LedgerEntries entries = reader.read(0, 0);
checkEntries(entries, bigData);
}
result(newDeleteLedgerOp().withLedgerId(lId).execute());
@@ -250,8 +250,8 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
.execute())) {
long lId = writer.getId();
// write data and populate LastAddConfirmed
- result(writer.append(ByteBuffer.wrap(data)));
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
+ writer.append(ByteBuffer.wrap(data));
try (ReadHandle reader = result(newOpenLedgerOp()
.withPassword(password)
@@ -274,9 +274,9 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
.execute())) {
lId = writer.getId();
// write data and populate LastAddConfirmed
- result(writer.append(ByteBuffer.wrap(data)));
- result(writer.append(ByteBuffer.wrap(data)));
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
+ writer.append(ByteBuffer.wrap(data));
+ writer.append(ByteBuffer.wrap(data));
}
try (ReadHandle reader = result(newOpenLedgerOp()
@@ -287,14 +287,14 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
assertTrue(reader.isClosed());
assertEquals(2, reader.getLastAddConfirmed());
assertEquals(3 * data.length, reader.getLength());
- assertEquals(2, result(reader.readLastAddConfirmed()).intValue());
- assertEquals(2,
result(reader.tryReadLastAddConfirmed()).intValue());
- checkEntries(result(reader.read(0, reader.getLastAddConfirmed())),
data);
- checkEntries(result(reader.readUnconfirmed(0,
reader.getLastAddConfirmed())), data);
+ assertEquals(2, reader.readLastAddConfirmed());
+ assertEquals(2, reader.tryReadLastAddConfirmed());
+ checkEntries(reader.read(0, reader.getLastAddConfirmed()), data);
+ checkEntries(reader.readUnconfirmed(0,
reader.getLastAddConfirmed()), data);
// test readLastAddConfirmedAndEntry
LastConfirmedAndEntry lastConfirmedAndEntry =
- result(reader.readLastAddConfirmedAndEntry(0, 999, false));
+ reader.readLastAddConfirmedAndEntry(0, 999, false);
assertEquals(2L, lastConfirmedAndEntry.getLastAddConfirmed());
assertArrayEquals(data,
lastConfirmedAndEntry.getEntry().getEntryBytes());
lastConfirmedAndEntry.close();
@@ -320,8 +320,8 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
.execute())) {
lId = writer.getId();
- result(writer.append(ByteBuffer.wrap(data)));
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
+ writer.append(ByteBuffer.wrap(data));
assertEquals(1L, writer.getLastAddPushed());
// open with fencing
@@ -334,7 +334,7 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
assertEquals(1L, reader.getLastAddConfirmed());
}
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
}
}
@@ -383,9 +383,9 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
.execute().get()) {
lId = writer.getId();
// write data and populate LastAddConfirmed
- result(writer.append(ByteBuffer.wrap(data)));
- result(writer.append(ByteBuffer.wrap(data)));
- result(writer.append(ByteBuffer.wrap(data)));
+ writer.append(ByteBuffer.wrap(data));
+ writer.append(ByteBuffer.wrap(data));
+ writer.append(ByteBuffer.wrap(data));
}
try (ReadHandle reader = newOpenLedgerOp()
@@ -396,7 +396,7 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
long lac = reader.getLastAddConfirmed();
assertEquals(2, lac);
- try (LedgerEntries entries = reader.read(0, lac).get()) {
+ try (LedgerEntries entries = reader.read(0, lac)) {
AtomicLong i = new AtomicLong(0);
for (LedgerEntry e : entries) {
assertEquals(i.getAndIncrement(), e.getEntryId());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteAdvHandleTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteAdvHandleTest.java
index 24408d9..585a7b2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteAdvHandleTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteAdvHandleTest.java
@@ -59,47 +59,47 @@ public class WriteAdvHandleTest {
ByteBuf buf = invocationOnMock.getArgument(1);
entryQueue.add(buf);
return FutureUtils.value(-1L);
- }).when(handle).write(anyLong(), any(ByteBuf.class));
- when(handle.write(anyLong(), any(byte[].class))).thenCallRealMethod();
- when(handle.write(anyLong(), any(byte[].class), anyInt(),
anyInt())).thenCallRealMethod();
- when(handle.write(anyLong(),
any(ByteBuffer.class))).thenCallRealMethod();
+ }).when(handle).writeAsync(anyLong(), any(ByteBuf.class));
+ when(handle.writeAsync(anyLong(),
any(byte[].class))).thenCallRealMethod();
+ when(handle.writeAsync(anyLong(), any(byte[].class), anyInt(),
anyInt())).thenCallRealMethod();
+ when(handle.writeAsync(anyLong(),
any(ByteBuffer.class))).thenCallRealMethod();
}
@Test
public void testAppendBytes() throws Exception {
byte[] testData = runtime.getMethodName().getBytes(UTF_8);
- handle.write(entryId, testData);
+ handle.writeAsync(entryId, testData);
ByteBuf buffer = entryQueue.take();
byte[] bufferData = ByteBufUtil.getBytes(buffer);
assertArrayEquals(testData, bufferData);
- verify(handle, times(1)).write(eq(entryId), any(ByteBuf.class));
+ verify(handle, times(1)).writeAsync(eq(entryId), any(ByteBuf.class));
}
@Test
public void testAppendBytes2() throws Exception {
byte[] testData = runtime.getMethodName().getBytes(UTF_8);
- handle.write(entryId, testData, 1, testData.length / 2);
+ handle.writeAsync(entryId, testData, 1, testData.length / 2);
byte[] expectedData = new byte[testData.length / 2];
System.arraycopy(testData, 1, expectedData, 0, testData.length / 2);
ByteBuf buffer = entryQueue.take();
byte[] bufferData = ByteBufUtil.getBytes(buffer);
assertArrayEquals(expectedData, bufferData);
- verify(handle, times(1)).write(eq(entryId), any(ByteBuf.class));
+ verify(handle, times(1)).writeAsync(eq(entryId), any(ByteBuf.class));
}
@Test
public void testAppendByteBuffer() throws Exception {
byte[] testData = runtime.getMethodName().getBytes(UTF_8);
- handle.write(entryId, ByteBuffer.wrap(testData, 1, testData.length /
2));
+ handle.writeAsync(entryId, ByteBuffer.wrap(testData, 1,
testData.length / 2));
byte[] expectedData = new byte[testData.length / 2];
System.arraycopy(testData, 1, expectedData, 0, testData.length / 2);
ByteBuf buffer = entryQueue.take();
byte[] bufferData = ByteBufUtil.getBytes(buffer);
assertArrayEquals(expectedData, bufferData);
- verify(handle, times(1)).write(eq(entryId), any(ByteBuf.class));
+ verify(handle, times(1)).writeAsync(eq(entryId), any(ByteBuf.class));
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteHandleTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteHandleTest.java
index fd7ac5e..113f585 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteHandleTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteHandleTest.java
@@ -34,7 +34,6 @@ import io.netty.buffer.ByteBufUtil;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -51,12 +50,12 @@ public class WriteHandleTest {
private final WriteHandle handle = mock(WriteHandle.class);
private final LinkedBlockingQueue<ByteBuf> entryQueue;
- public WriteHandleTest() {
+ public WriteHandleTest() throws Exception {
this.entryQueue = new LinkedBlockingQueue<>();
doAnswer(invocationOnMock -> {
ByteBuf buf = invocationOnMock.getArgument(0);
entryQueue.add(buf);
- return FutureUtils.value(-1L);
+ return -1L;
}).when(handle).append(any(ByteBuf.class));
when(handle.append(any(byte[].class))).thenCallRealMethod();
when(handle.append(any(byte[].class), anyInt(),
anyInt())).thenCallRealMethod();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index c3f1b89..26b2448 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -65,7 +65,7 @@ public final class TestUtils {
long lac = rh.getLastAddConfirmed();
while (lac < newLac) {
TimeUnit.MILLISECONDS.sleep(20);
- lac = rh.readLastAddConfirmed().get();
+ lac = rh.readLastAddConfirmed();
}
}
diff --git
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommandTest.java
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommandTest.java
index 946afc5..a901f21 100644
---
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommandTest.java
+++
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommandTest.java
@@ -41,7 +41,7 @@ import org.junit.Test;
public class SimpleTestCommandTest extends ClientCommandTestBase {
@Test
- public void testCommandShortArgs() {
+ public void testCommandShortArgs() throws Exception {
testCommand(
"simpletest",
"-e", "5",
@@ -51,7 +51,7 @@ public class SimpleTestCommandTest extends
ClientCommandTestBase {
}
@Test
- public void testCommandLongArgs() {
+ public void testCommandLongArgs() throws Exception {
testCommand(
"simpletest",
"--ensemble-size", "5",
@@ -60,10 +60,10 @@ public class SimpleTestCommandTest extends
ClientCommandTestBase {
"--num-entries", "10");
}
- public void testCommand(String... args) {
+ public void testCommand(String... args) throws Exception {
WriteHandle wh = mock(WriteHandle.class);
AtomicLong counter = new AtomicLong(0L);
-
when(wh.append(any(byte[].class))).thenReturn(FutureUtils.value(counter.get()));
+ when(wh.append(any(byte[].class))).thenReturn(counter.get());
CreateBuilder createBuilder = mock(CreateBuilder.class);
when(createBuilder.execute())
.thenReturn(FutureUtils.value(wh));
--
To stop receiving notification emails like this one, please contact
[email protected].