codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r994087613


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean 
isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> 
maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != 
NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = 
config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a 
list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = 
{}, to offload = {}",
-                            name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty 
list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or 
offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);

Review Comment:
   @zymap Got your point.
   
   I have a tryout on my laptop, it looks simpler. @zymap @tjiuming Could you 
please take a look?
   
   ```diff
   --- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
   +++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
   @@ -2366,11 +2366,13 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
            final long offloadThresholdInSeconds =
                    
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
            if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) 
{
   -            executor.executeOrdered(name, safeRun(() -> 
maybeOffload(promise)));
   +            executor.executeOrdered(name, safeRun(() -> 
maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
            }
        }
    
   -    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) 
{
   +    private void maybeOffload(long offloadThresholdInBytes, long 
offloadThresholdInSeconds,
   +                              CompletableFuture<PositionImpl> finalPromise) 
{
   +        checkArgument(offloadThresholdInBytes >= 0 || 
offloadThresholdInSeconds >= 0);
            if (!offloadMutex.tryLock()) {
                scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
                        100, TimeUnit.MILLISECONDS);
   @@ -2397,22 +2399,8 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
            long sizeSummed = 0;
            long toOffloadSize = 0;
            long alreadyOffloadedSize = 0;
   +        long offloadTimeThresholdMillis = 
TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
            ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
   -
   -        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
   -        final long offloadThresholdInBytes =
   -                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
   -        final long offloadTimeThresholdMillis =
   -                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v
 -> v >= 0)
   -                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
   -
   -        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
   -            log.debug("[{}] Nothing to offload due to 
[managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
   -                    + "and [managedLedgerOffloadThresholdInSeconds] are 
null OR negative values.", name);
   -            unlockingPromise.complete(PositionImpl.LATEST);
   -            return;
   -        }
   -
            for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to