tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1006598566


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean 
isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 >= 0) {
-            executor.executeOrdered(name, safeRun(() -> 
maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, 
offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long 
offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != 
NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = 
config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a 
list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = 
{}, to offload = {}",
-                            name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty 
list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to 
offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new 
IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to 
[managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", 
name);
+            finalPromise.completeExceptionally(new 
IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = 
TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, 
trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && 
info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > 
offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp 
>= offloadTimeThresholdMillis)) {

Review Comment:
   @hangc0276 
   Say, in the `OffloadPrefixTest`, there is a test called `public void 
testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold)`. And if I set 
`sizeThreshold = null`, and `timeThreshold = 0`.
   
   Each ledger has 10 entries, and I'll write 35 entries into the topic. So, 
there will 4 ledgers, right? Say, the four ledgers' id are [1,2,3,4]. [1,2,3] 
reached the 10 entries threshold, and it closed. all the 3 ledgers will be 
offloaded right? it is the expected behaviour.
   
   However, the result is actually random. [3] ledger maybe offloaded, or maybe 
not. 
   
   In the `ledgerClosed()` method, we set the LedgerInfo timestamp
   ```
               LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger)
                       
.setSize(lh.getLength()).setTimestamp(clock.millis()).build();
               ledgers.put(lh.getId(), info);
   ```
   and trigger offload 
   ```
           maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
   ```
   
   and `maybeOffloadInBackground` will goes to 
   
   ```
       private void maybeOffload(long offloadThresholdInBytes, long 
offloadThresholdInSeconds,
                                 CompletableFuture<PositionImpl> finalPromise) {
           // ignore
   
           for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
               // ignore...
   
               final long timestamp = info.getTimestamp();
               final long now = System.currentTimeMillis();
               sizeSummed += size;
   
               final boolean alreadyOffloaded = info.hasOffloadContext() && 
info.getOffloadContext().getComplete();
               if (alreadyOffloaded) {
                   alreadyOffloadedSize += size;
               } else {
                   if ((offloadThresholdInBytes >= 0 && sizeSummed > 
offloadThresholdInBytes)
                           || (offloadTimeThresholdMillis >= 0 && now - 
timestamp >= offloadTimeThresholdMillis)) {
                       toOffloadSize += size;
                       toOffload.addFirst(info);
                   }
               }
           }
         
         // ignore
   ```
   
   In the process from 
   ```
   LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger)
                       
.setSize(lh.getLength()).setTimestamp(clock.millis()).build()
   ```
    to  
   ```
     if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
                           || (offloadTimeThresholdMillis >= 0 && now - 
timestamp >= offloadTimeThresholdMillis)) {
                       toOffloadSize += size;
                       toOffload.addFirst(info);
                   }
   ```
   , it may costs more than 1ms, or maybe not.  So the last ledger maybe will 
offloaded, or maybe not.
   This brings uncertainty, So I `now - timestamp >= 
offloadTimeThresholdMillis` here to eliminate this uncertainty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to