codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991170457


##########
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java:
##########
@@ -69,6 +69,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
             .of("S3", "aws-s3", "google-cloud-storage", "filesystem", 
"azureblob", "aliyun-oss");
     public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
+    public static final Long DEFAULT_OFFLOAD_TIME_THRESHOLD = null;

Review Comment:
   ```suggestion
       public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS = null;
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java:
##########
@@ -178,7 +182,9 @@ public static OffloadPoliciesImpl create(String driver, 
String region, String bu
                                              String role, String 
roleSessionName,
                                              String credentialId, String 
credentialSecret,
                                              Integer maxBlockSizeInBytes, 
Integer readBufferSizeInBytes,
-                                             Long offloadThresholdInBytes, 
Long offloadDeletionLagInMillis,
+                                             Long offloadThresholdInBytes,
+                                             //Long offloadTimeThreshold, 
//TODO

Review Comment:
   Is It still useful?



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String 
pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}
+        };
+    }
+
+    @Test(dataProvider = "testAutoTriggerOffload")
+    public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold) 
throws Exception {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(sizeThreshold);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(timeThreshold);
         config.setLedgerOffloader(offloader);
 
-        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger" + UUID.randomUUID(), config);
 
         // Ledger will roll twice, offload will run on first ledger after 
second closed
         for (int i = 0; i < 25; i++) {
+            Thread.sleep(10);

Review Comment:
   Why we need sleep here?



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String 
pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}

Review Comment:
   We should also have one that with long time threshold and 100 bytes size 
threshold, make sure the part of the ledgers will be offloaded



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean 
isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> 
maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != 
NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = 
config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a 
list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = 
{}, to offload = {}",
-                            name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty 
list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or 
offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v
 -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and 
`offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to 
[managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null 
OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, 
trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && 
info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > 
offloadThresholdInBytes)

Review Comment:
   ```
   sizeSummed > offloadThresholdInBytes
   ```
   
   Interesting, I think this one is not correct. It will make the ledger 
offload earlier. It's not introduced by this PR but it looks like we need to 
fix it with another PR.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean 
isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> 
maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != 
NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = 
config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a 
list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = 
{}, to offload = {}",
-                            name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty 
list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or 
offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v
 -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and 
`offloadThresholdInBytes` are null OR negative values.

Review Comment:
   ```suggestion
           // Skip the following steps if `offloadTimeThreshold` and 
`offloadThresholdInBytes` are negative values.
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean 
isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> 
maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != 
NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = 
config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a 
list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = 
{}, to offload = {}",
-                            name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty 
list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or 
offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v
 -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and 
`offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to 
[managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null 
OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, 
trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && 
info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;

Review Comment:
   This one is also interesting, if the ledger is offloaded, why not break the 
loop? All the ledgers before an offloaded ledger should be offloaded, no? 



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String 
pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}
+        };
+    }
+
+    @Test(dataProvider = "testAutoTriggerOffload")
+    public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold) 
throws Exception {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(sizeThreshold);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(timeThreshold);
         config.setLedgerOffloader(offloader);
 
-        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger" + UUID.randomUUID(), config);
 
         // Ledger will roll twice, offload will run on first ledger after 
second closed
         for (int i = 0; i < 25; i++) {
+            Thread.sleep(10);

Review Comment:
   Why we need sleep here?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean 
isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 != null
-                && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> 
maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != 
NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = 
config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a 
list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = 
{}, to offload = {}",
-                            name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty 
list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or 
offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v
 -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and 
`offloadThresholdInBytes` are null OR negative values.

Review Comment:
   And I think we don't need a comment here, the code is very obvious and clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to