sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181449813
 
 

 ##########
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##########
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
     @Override
     public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-        callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+        PositionImpl requestOffloadTo = (PositionImpl)pos;
+        if (!isValidPosition(requestOffloadTo)) {
+            callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+            return;
+        }
+
+        Position firstUnoffloaded;
+
+        List<LedgerInfo> ledgersToOffload = Lists.newArrayList();
+        synchronized (this) {
+            log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+                     TOTAL_SIZE_UPDATER.get(this));
+
+            if (STATE_UPDATER.get(this) == State.Closed) {
+                log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+                callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+                                               "Can't offload closed managed 
ledger"), ctx);
+                return;
+            }
+
+            long current = ledgers.lastKey();
+
+            // the first ledger which will not be offloaded. Defaults to 
current,
+            // in the case that the whole headmap is offloaded. Otherwise it 
will
+            // be set as we iterate through the headmap values
+            long firstLedgerRetained = current;
+            for (LedgerInfo ls : ledgers.headMap(current).values()) {
+                if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
+                    if (!ls.hasOffloadContext()) {
+                        ledgersToOffload.add(ls);
+                    }
+                } else {
+                    firstLedgerRetained = ls.getLedgerId();
+                    break;
+                }
+            }
+            firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0);
+        }
+
+        if (ledgersToOffload.isEmpty()) {
+            callback.offloadComplete(firstUnoffloaded, ctx);
+            return;
+        }
+
+        log.info("[{}] Going to offload ledgers {}", name,
+                 ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+        List< Pair<Long, CompletableFuture<byte[]>> > contexts = 
ledgersToOffload.stream()
+            .map(info -> {
+                    long ledgerId = info.getLedgerId();
+                    Map<String, String> extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
+                    CompletableFuture<byte[]> context = 
getLedgerHandle(ledgerId).thenCompose(
+                            readHandle -> 
config.getLedgerOffloader().offload(readHandle, extraMetadata));
+                    return Pair.of(ledgerId, context);
+                }).collect(Collectors.toList());
+
+        CompletableFuture.allOf(contexts.stream().map(p -> 
p.getRight()).toArray(CompletableFuture[]::new))
+            .whenComplete((ignore, offloadException) -> {
+                    // If any of the offload operations failed 
offloadException will
+                    // be set. However some of the operations could have been 
successful.
+                    // If so, save the contexts for the successful prefix and 
notify the client
+                    // of the error if any occurred.
+                    List<Pair<Long, byte[]>> successfulOffloads = 
Lists.newArrayList();
+                    int errors = 0;
+                    synchronized (this) {
+                        ledgersListMutex.lock();
+
+                        // loop through results of offload operations. If an 
error occurred
+                        // check that the ledger still exists for the ML. If 
not, then it was
+                        // trimmed and the error can be ignored.
+                        for (Pair<Long, CompletableFuture<byte[]>> context : 
contexts) {
+                            if (context.getRight().isCompletedExceptionally()) 
{
+                                if (ledgers.containsKey(context.getLeft())) {
+                                    errors++;
+                                }
+                            } else {
+                                
successfulOffloads.add(Pair.of(context.getLeft(), context.getRight().join()));
+                            }
+                        }
+                        ledgersListMutex.unlock();
+                    }
+                    log.info("[{}] All offload operations complete, {} 
successful, {} errors",
+                             name, successfulOffloads.size(), errors);
+
+                    int errorsToReport = errors; // effectively final so can 
be used in lambda
+                    if (successfulOffloads.size() > 0) {
+                        
updateLedgerInfoForOffloaded(successfulOffloads).whenComplete(
+                                (ignore2, exception) -> {
+                                    if (exception != null) {
+                                        callback.offloadFailed(new 
ManagedLedgerException(exception), ctx);
+                                    } else if (offloadException != null && 
errorsToReport > 0) {
+                                        callback.offloadFailed(new 
ManagedLedgerException(offloadException), ctx);
+                                    } else {
+                                        
callback.offloadComplete(firstUnoffloaded, ctx);
+                                    }
+                                });
+                    } else {
+                        // All failed, so throw up the error
+                        callback.offloadFailed(new 
ManagedLedgerException(offloadException), ctx);
+                    }
+                });
     }
 
+    private CompletableFuture<Void> 
updateLedgerInfoForOffloaded(List<Pair<Long, byte[]>> contexts) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        synchronized (this) {
+            ledgersListMutex.lock();
+
+            contexts.forEach((context) -> {
+                    long ledgerId = context.getLeft();
+                    LedgerInfo oldInfo = ledgers.get(ledgerId);
+                    if (oldInfo == null) {
+                        cleanupOffloadedOnFailure(context.getLeft(), 
context.getRight(),
 
 Review comment:
   > as crashes during offload should be pretty rare, similar to what must 
happen if a ledger is created but the ML crashes before updating the managed 
ledger metadata.
   
   these two cases are different in terms of costs. 
   
   " ledger is created but the ML crashes before updating the managed ledger 
metadata" => there is no data in the ledger. so it is effectively just one 
metadata.
   
   "crashes during offload" => it can happen after you fully copy the data. so 
you are leaving a bucket in tiered storage.
   
   > Being able to cleanup a failed offload implies the location being 
deterministic from the ledger info.
   
   It doesn't need to a deterministic name. it just need to be tracked 
somewhere for cleanup. `somewhere` can be the ledger info in ML.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to