sijie closed pull request #1543: offloadPrefix interface for ManagedLedger URL: https://github.com/apache/incubator-pulsar/pull/1543
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java index 71ff228aa6..c60aee6a94 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java @@ -125,4 +125,9 @@ void getInfoFailed(ManagedLedgerException exception, Object ctx); } + interface OffloadCallback { + void offloadComplete(Position pos, Object ctx); + + void offloadFailed(ManagedLedgerException exception, Object ctx); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 8dbb846d10..d6813e4a4b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; @@ -345,6 +346,31 @@ */ void asyncDelete(DeleteLedgerCallback callback, Object ctx); + /** + * Offload as many entries before position as possible to longterm storage. + * + * @param pos the position before which entries will be offloaded + * @return the earliest position which was not offloaded + * + * @see #asyncOffloadPrefix(Position,OffloadCallback,Object) + */ + Position offloadPrefix(Position pos) throws InterruptedException, ManagedLedgerException; + + /** + * Offload as many entries before position as possible to longterm storage. + * + * As internally, entries is stored in ledgers, and ledgers can only be operated on as a whole, + * it is likely not possible to offload every entry before the passed in position. Only complete + * ledgers will be offloaded. On completion a position will be passed to the callback. This + * position is the earliest entry which was not offloaded. + * + * @param pos the position before which entries will be offloaded + * @param callback a callback which will be supplied with the earliest unoffloaded position on + * completion + * @param ctx a context object which will be passed to the callback on completion + */ + void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx); + /** * Get the slowest consumer. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c96b35e665..2f7666c321 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -42,6 +42,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -60,6 +62,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.Entry; @@ -1805,6 +1808,39 @@ public void operationFailed(MetaStoreException e) { }); } + @Override + public Position offloadPrefix(Position pos) throws InterruptedException, ManagedLedgerException { + CompletableFuture<Position> promise = new CompletableFuture<>(); + + asyncOffloadPrefix(pos, new OffloadCallback() { + @Override + public void offloadComplete(Position offloadedTo, Object ctx) { + promise.complete(offloadedTo); + } + + @Override + public void offloadFailed(ManagedLedgerException e, Object ctx) { + promise.completeExceptionally(e); + } + }, null); + + try { + return promise.get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); + } catch (TimeoutException te) { + throw new ManagedLedgerException("Timeout during managed ledger offload operation"); + } catch (ExecutionException e) { + log.error("[{}] Error offloading. pos = {}", name, pos, e.getCause()); + throw ManagedLedgerException.getManagedLedgerException(e.getCause()); + } + } + + + + @Override + public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) { + callback.offloadFailed(new ManagedLedgerException("Not implemented"), ctx); + } + /** * Get the number of entries between a contiguous range of two positions. * ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services