sijie closed pull request #1543: offloadPrefix interface for ManagedLedger
URL: https://github.com/apache/incubator-pulsar/pull/1543
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 71ff228aa6..c60aee6a94 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -125,4 +125,9 @@
         void getInfoFailed(ManagedLedgerException exception, Object ctx);
     }
 
+    interface OffloadCallback {
+        void offloadComplete(Position pos, Object ctx);
+
+        void offloadFailed(ManagedLedgerException exception, Object ctx);
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 8dbb846d10..d6813e4a4b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -24,6 +24,7 @@
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
@@ -345,6 +346,31 @@
      */
     void asyncDelete(DeleteLedgerCallback callback, Object ctx);
 
+    /**
+     * Offload as many entries before position as possible to longterm storage.
+     *
+     * @param pos the position before which entries will be offloaded
+     * @return the earliest position which was not offloaded
+     *
+     * @see #asyncOffloadPrefix(Position,OffloadCallback,Object)
+     */
+    Position offloadPrefix(Position pos) throws InterruptedException, 
ManagedLedgerException;
+
+    /**
+     * Offload as many entries before position as possible to longterm storage.
+     *
+     * As internally, entries is stored in ledgers, and ledgers can only be 
operated on as a whole,
+     * it is likely not possible to offload every entry before the passed in 
position. Only complete
+     * ledgers will be offloaded. On completion a position will be passed to 
the callback. This
+     * position is the earliest entry which was not offloaded.
+     *
+     * @param pos the position before which entries will be offloaded
+     * @param callback a callback which will be supplied with the earliest 
unoffloaded position on
+     *                 completion
+     * @param ctx a context object which will be passed to the callback on 
completion
+     */
+    void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object 
ctx);
+
     /**
      * Get the slowest consumer.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c96b35e665..2f7666c321 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -42,6 +42,8 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -60,6 +62,7 @@
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -1805,6 +1808,39 @@ public void operationFailed(MetaStoreException e) {
         });
     }
 
+    @Override
+    public Position offloadPrefix(Position pos) throws InterruptedException, 
ManagedLedgerException {
+        CompletableFuture<Position> promise = new CompletableFuture<>();
+
+        asyncOffloadPrefix(pos, new OffloadCallback() {
+                @Override
+                public void offloadComplete(Position offloadedTo, Object ctx) {
+                    promise.complete(offloadedTo);
+                }
+
+                @Override
+                public void offloadFailed(ManagedLedgerException e, Object 
ctx) {
+                    promise.completeExceptionally(e);
+                }
+            }, null);
+
+        try {
+            return promise.get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+        } catch (TimeoutException te) {
+            throw new ManagedLedgerException("Timeout during managed ledger 
offload operation");
+        } catch (ExecutionException e) {
+            log.error("[{}] Error offloading. pos = {}", name, pos, 
e.getCause());
+            throw 
ManagedLedgerException.getManagedLedgerException(e.getCause());
+        }
+    }
+
+
+
+    @Override
+    public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
+        callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+    }
+
     /**
      * Get the number of entries between a contiguous range of two positions.
      *


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to