sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181171240
 
 

 ##########
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##########
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
     @Override
     public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-        callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+        PositionImpl requestOffloadTo = (PositionImpl)pos;
+        if (!isValidPosition(requestOffloadTo)) {
+            callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+            return;
+        }
+
+        Position firstUnoffloaded;
+
+        List<LedgerInfo> ledgersToOffload = Lists.newArrayList();
+        synchronized (this) {
+            log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+                     TOTAL_SIZE_UPDATER.get(this));
+
+            if (STATE_UPDATER.get(this) == State.Closed) {
+                log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+                callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+                                               "Can't offload closed managed 
ledger"), ctx);
+                return;
+            }
+
+            long current = ledgers.lastKey();
+
+            // the first ledger which will not be offloaded. Defaults to 
current,
+            // in the case that the whole headmap is offloaded. Otherwise it 
will
+            // be set as we iterate through the headmap values
+            long firstLedgerRetained = current;
+            for (LedgerInfo ls : ledgers.headMap(current).values()) {
+                if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
+                    if (!ls.hasOffloadContext()) {
+                        ledgersToOffload.add(ls);
+                    }
+                } else {
+                    firstLedgerRetained = ls.getLedgerId();
+                    break;
+                }
+            }
+            firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0);
+        }
+
+        if (ledgersToOffload.isEmpty()) {
+            callback.offloadComplete(firstUnoffloaded, ctx);
+            return;
+        }
+
+        log.info("[{}] Going to offload ledgers {}", name,
+                 ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+        List< Pair<Long, CompletableFuture<byte[]>> > contexts = 
ledgersToOffload.stream()
+            .map(info -> {
+                    long ledgerId = info.getLedgerId();
+                    Map<String, String> extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
+                    CompletableFuture<byte[]> context = 
getLedgerHandle(ledgerId).thenCompose(
+                            readHandle -> 
config.getLedgerOffloader().offload(readHandle, extraMetadata));
+                    return Pair.of(ledgerId, context);
+                }).collect(Collectors.toList());
+
+        CompletableFuture.allOf(contexts.stream().map(p -> 
p.getRight()).toArray(CompletableFuture[]::new))
 
 Review comment:
   I am not sure we should parallel the offloading here. It is better to do 
this in sequence and update metadata one by one, for considering the failures 
(e.g. successfully offloading but fail to update offload contexts). 
`CompletableFuture.allOf` sometime can be bad for production. E.g. if S3 / cold 
storage outage for a while, pulsar can't offload the ledgers. the ledgers are 
accumulated, when S3 / cold storage is back online, the `allOf` logic will 
potentially start offloading ledgers in parallel, which can overwhelm the 
brokers and cause the system getting into a state that never recover.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to