zymap commented on a change in pull request #9202: URL: https://github.com/apache/pulsar/pull/9202#discussion_r568269513
########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -386,6 +408,292 @@ public void operationFailed(MetaStoreException e) { scheduleRollOverLedgerTask(); } + /** + * Should be called after `ledgers` were initialized. + */ + void initializeStreamingOffloader() { + if (getOffloadMethod() == OffloadMethod.STREAMING_BASED) { + log.info("Streaming offload enabled for managed ledger: {}", name); + } else { + log.info("Streaming offload not enabled for managed ledger: {}", name); + return; + } + + if (!offloadMutex.tryLock()) { + log.info("try streaming offload,but already offloading"); + return; + } + + //get newest config and drop progress status of last offload + offloader = getConfig().getLedgerOffloader().fork(); + + this.offloadSegments = Queues.newConcurrentLinkedQueue(); + + initializeSegments(); + + if (offloadSegments.isEmpty()) { + log.error("Streaming offloading began but there is no segments to offload, should not happen."); + throw new RuntimeException( + "Streaming offloading began but there is no segments to offload, should not happen."); + } + + startOffload(); + } + + private void initializeSegments() { + Long updatedLedgerId = null; + LedgerInfo updatedLedgerInfo = null; + for (Map.Entry<Long, LedgerInfo> idInfo : ledgers.entrySet()) { + + final Long ledgerId = idInfo.getKey(); + LedgerInfo ledgerInfo = idInfo.getValue(); + String driverName = OffloadUtils.getOffloadDriverName(ledgerInfo, + config.getLedgerOffloader().getOffloadDriverName()); + Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ledgerInfo, + config.getLedgerOffloader().getOffloadDriverMetadata()); + + if (!ledgerInfo.hasOffloadContext()) { + final OffloadContext context = OffloadContext.newBuilder() + .setComplete(false) + .build(); + ledgerInfo = ledgerInfo.toBuilder().setOffloadContext(context).build(); + } + + if (!isStreamingOffloadCompleted(ledgerInfo)) { + List<OffloadSegment> newSegments = Lists.newArrayList(); + // Continue from incomplete context + long beginEntry = 0; + for (OffloadSegment offloadSegment : ledgerInfo.getOffloadContext().getOffloadSegmentList()) { + if (offloadSegment.getComplete()) { + if (!offloadSegment.hasEndEntryId()) { + log.error("segment of ledger {} offload completed bug not have end entry id " + + "should not happen. {}", ledgerId, ledgerInfo); + } else { + beginEntry = offloadSegment.getEndEntryId() + 1; + newSegments.add(offloadSegment); + } + } + } + + UUID uuid = UUID.randomUUID(); + final OffloadSegment.Builder segment = OffloadSegment.newBuilder() + .setUidLsb(uuid.getLeastSignificantBits()) + .setUidMsb(uuid.getMostSignificantBits()) + .setAssignedTimestamp(System.currentTimeMillis()) + .setComplete(false); + OffloadUtils.setOffloadDriverMetadata(segment, driverName, driverMetadata); + newSegments.add(segment.build()); + final OffloadContext context = ledgerInfo.getOffloadContext().toBuilder().clearOffloadSegment() + .addAllOffloadSegment(newSegments).build(); + final LedgerInfo newLedgerInfo = ledgerInfo.toBuilder().setOffloadContext(context).build(); + updatedLedgerId = idInfo.getKey(); + updatedLedgerInfo = newLedgerInfo; + offloadSegments.add(new OffloadSegmentInfoImpl(uuid, ledgerId, beginEntry, driverName, driverMetadata)); + break; + } + } + log.debug("updated ledgerId: {}", updatedLedgerId); + ledgers.put(updatedLedgerId, updatedLedgerInfo); + } + + public static boolean isStreamingOffloadCompleted(LedgerInfo ledgerInfo) { + if (!ledgerInfo.hasEntries()) { + //ledger is not closed + return false; + } + if (!ledgerInfo.hasOffloadContext()) { + return false; + } + final List<OffloadSegment> offloadSegmentList = ledgerInfo.getOffloadContext().getOffloadSegmentList(); + if (offloadSegmentList.isEmpty()) { + return false; + } + final OffloadSegment lastSegment = offloadSegmentList.get(offloadSegmentList.size() - 1); + return lastSegment.getComplete() && lastSegment.getEndEntryId() == ledgerInfo.getEntries() - 1; + } + + private synchronized void startOffload() { + final OffloadSegmentInfoImpl headSegment = offloadSegments.peek(); + try { + this.currentOffloadHandle = offloader + .streamingOffload(this, headSegment.uuid, headSegment.beginLedgerId, headSegment.beginEntryId, + headSegment.driverMetadata).get(); + this.currentOffloadHandle.getOffloadResultAsync().whenComplete((result, ex) -> { + if (ex != null) { + offloadMutex.unlock(); + log.error("offload failed", ex); + } else { + final OffloadSegmentInfoImpl segmentInfo = offloadSegments.poll(); + if (segmentInfo == null) { + offloadMutex.unlock(); + throw new RuntimeException("An empty segment list, should not happen"); + } + + if (segmentInfo.beginLedgerId != result.beginLedger || segmentInfo.beginEntryId != result.beginEntry) { + offloadMutex.unlock(); + throw new RuntimeException( + Strings.lenientFormat("expect result %s got %s, should not happen", segmentInfo, + result)); + } else { + segmentInfo.closeSegment(result.endLedger, result.endEntry); + } + log.debug("updatedMetaForOffloaded: {}", segmentInfo); + updatedMetaForOffloaded(segmentInfo).whenComplete((updateResult, updatedEx) -> { + if (updatedEx != null) { + offloadMutex.unlock(); + log.error("update metadata failed", updatedEx); + return; + } + + if (!offloadSegments.isEmpty()) { + log.error("offload segments not cleared, should not happen: {}", offloadSegments); + offloadSegments.clear(); + } + + initializeSegments(); + //use new offloader after segment closed + offloader = config.getLedgerOffloader().fork(); + + if (offloadSegments.isEmpty()) { + offloadMutex.unlock(); + throw new RuntimeException( + "Streaming offloading began but there is no segments to offload, should not happen."); + } + if (getOffloadMethod().equals(OffloadMethod.STREAMING_BASED) && STATE_UPDATER + .get(this) != State.Closed) { + startOffload(); + } else { + offloadMutex.unlock(); + log.info("streaming offload disabled due to configuration changed or ledger closed," + + "method: {},ledger status: {}", + getOffloadMethod(), ledgersStat); + } + }); + } + }); + } catch (InterruptedException | ExecutionException e) { + log.error("failed to continue streaming offload", e); + } + } + + + @ToString + public class LedgerInSegment { + public long ledgerId; + public long beginEntryId; + public long endEntryId; + public long beginTs; Review comment: What this the `beginTs`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org