ivankelly commented on a change in pull request #1549: offloadPrefix implementation for managed ledger URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181420020
########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, Object ctx) { @Override public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) { - callback.offloadFailed(new ManagedLedgerException("Not implemented"), ctx); + PositionImpl requestOffloadTo = (PositionImpl)pos; + if (!isValidPosition(requestOffloadTo)) { + callback.offloadFailed(new InvalidCursorPositionException("Invalid position for offload"), ctx); + return; + } + + Position firstUnoffloaded; + + List<LedgerInfo> ledgersToOffload = Lists.newArrayList(); + synchronized (this) { + log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", name, ledgers.keySet(), + TOTAL_SIZE_UPDATER.get(this)); + + if (STATE_UPDATER.get(this) == State.Closed) { + log.info("[{}] Ignoring offload request since the managed ledger was already closed", name); + callback.offloadFailed(new ManagedLedgerAlreadyClosedException( + "Can't offload closed managed ledger"), ctx); + return; + } + + long current = ledgers.lastKey(); + + // the first ledger which will not be offloaded. Defaults to current, + // in the case that the whole headmap is offloaded. Otherwise it will + // be set as we iterate through the headmap values + long firstLedgerRetained = current; + for (LedgerInfo ls : ledgers.headMap(current).values()) { + if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) { + if (!ls.hasOffloadContext()) { + ledgersToOffload.add(ls); + } + } else { + firstLedgerRetained = ls.getLedgerId(); + break; + } + } + firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0); + } + + if (ledgersToOffload.isEmpty()) { + callback.offloadComplete(firstUnoffloaded, ctx); + return; + } + + log.info("[{}] Going to offload ledgers {}", name, + ledgersToOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList())); + List< Pair<Long, CompletableFuture<byte[]>> > contexts = ledgersToOffload.stream() + .map(info -> { + long ledgerId = info.getLedgerId(); + Map<String, String> extraMetadata = ImmutableMap.of("ManagedLedgerName", name); + CompletableFuture<byte[]> context = getLedgerHandle(ledgerId).thenCompose( + readHandle -> config.getLedgerOffloader().offload(readHandle, extraMetadata)); + return Pair.of(ledgerId, context); + }).collect(Collectors.toList()); + + CompletableFuture.allOf(contexts.stream().map(p -> p.getRight()).toArray(CompletableFuture[]::new)) + .whenComplete((ignore, offloadException) -> { + // If any of the offload operations failed offloadException will + // be set. However some of the operations could have been successful. + // If so, save the contexts for the successful prefix and notify the client + // of the error if any occurred. + List<Pair<Long, byte[]>> successfulOffloads = Lists.newArrayList(); + int errors = 0; + synchronized (this) { + ledgersListMutex.lock(); + + // loop through results of offload operations. If an error occurred + // check that the ledger still exists for the ML. If not, then it was + // trimmed and the error can be ignored. + for (Pair<Long, CompletableFuture<byte[]>> context : contexts) { + if (context.getRight().isCompletedExceptionally()) { + if (ledgers.containsKey(context.getLeft())) { + errors++; + } + } else { + successfulOffloads.add(Pair.of(context.getLeft(), context.getRight().join())); + } + } + ledgersListMutex.unlock(); + } + log.info("[{}] All offload operations complete, {} successful, {} errors", + name, successfulOffloads.size(), errors); + + int errorsToReport = errors; // effectively final so can be used in lambda + if (successfulOffloads.size() > 0) { + updateLedgerInfoForOffloaded(successfulOffloads).whenComplete( + (ignore2, exception) -> { + if (exception != null) { + callback.offloadFailed(new ManagedLedgerException(exception), ctx); + } else if (offloadException != null && errorsToReport > 0) { + callback.offloadFailed(new ManagedLedgerException(offloadException), ctx); + } else { + callback.offloadComplete(firstUnoffloaded, ctx); + } + }); + } else { + // All failed, so throw up the error + callback.offloadFailed(new ManagedLedgerException(offloadException), ctx); + } + }); } + private CompletableFuture<Void> updateLedgerInfoForOffloaded(List<Pair<Long, byte[]>> contexts) { + CompletableFuture<Void> promise = new CompletableFuture<>(); + synchronized (this) { + ledgersListMutex.lock(); + + contexts.forEach((context) -> { + long ledgerId = context.getLeft(); + LedgerInfo oldInfo = ledgers.get(ledgerId); + if (oldInfo == null) { + cleanupOffloadedOnFailure(context.getLeft(), context.getRight(), Review comment: My thinking on it was that there'd be a GC type mechanism as crashes during offload should be pretty rare, similar to what must happen if a ledger is created but the ML crashes before updating the managed ledger metadata. Being able to cleanup a failed offload implies the location being deterministic from the ledger info. We should avoid requiring that the name is deterministic from the ledger ids, as then we could end up with nasty races, where one is broker is offloading and pauses, and another picks up the offloading process, then the first one wakes up and corrupts the offloading process. Unlikely, but there's no fencing, so entirely possible. We could do it by adding an offloadId UUID parameter to LedgerOffloader calls. We could generate this, store in offload indication field, offload, and the update context to indictate that offload is complete. This change would also imply more writes on ZK, which may not be desirable. @merlimat What are your thoughts on this? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services