codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r970672795


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -1919,6 +1919,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "The number of bytes before triggering automatic offload to long 
term storage"
     )
     private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "The threshold to triggering automatic offload to long term 
storage"
+    )
+    private long managedLedgerOffloadTimeThresholdInSeconds = -1L;

Review Comment:
   Please also add to broker.conf



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,68 @@ private void 
maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != 
NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = 
config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a 
list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = 
{}, to offload = {}",
-                            name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty 
list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or 
offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final Long offloadThresholdInBytes = 
policies.getManagedLedgerOffloadThresholdInBytes();
+        final Long offloadTimeThreshold = 
policies.getManagedLedgerOffloadTimeThresholdInSeconds();
+        for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && 
info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes != null && sizeSummed > 
offloadThresholdInBytes)
+                        || (offloadTimeThreshold != null && now - timestamp > 
offloadTimeThreshold)) {

Review Comment:
   I think we should add the check to the method `maybeOffloadInBackground`?
   If both the `thresholdInBytes` and `thresholdInSeconds` is null or negative 
value, we should skip the `maybeOffload` method



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -1919,6 +1919,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "The number of bytes before triggering automatic offload to long 
term storage"
     )
     private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "The threshold to triggering automatic offload to long term 
storage"
+    )
+    private long managedLedgerOffloadTimeThresholdInSeconds = -1L;

Review Comment:
   ```suggestion
       private long managedLedgerOffloadThresholdInSeconds = -1L;
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,68 @@ private void 
maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != 
NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = 
config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a 
list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = 
{}, to offload = {}",
-                            name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty 
list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or 
offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = 
config.getLedgerOffloader().getOffloadPolicies();
+        final Long offloadThresholdInBytes = 
policies.getManagedLedgerOffloadThresholdInBytes();
+        final Long offloadTimeThreshold = 
policies.getManagedLedgerOffloadTimeThresholdInSeconds();
+        for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && 
info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes != null && sizeSummed > 
offloadThresholdInBytes)
+                        || (offloadTimeThreshold != null && now - timestamp > 
offloadTimeThreshold)) {

Review Comment:
   What does `offloadTimeThreshold = -1` mean? From here if users provide a 
negative value or zero, will we offload all ledgers?



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -1919,6 +1919,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "The number of bytes before triggering automatic offload to long 
term storage"
     )
     private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "The threshold to triggering automatic offload to long term 
storage"
+    )
+    private long managedLedgerOffloadTimeThresholdInSeconds = -1L;

Review Comment:
   Please check all.
   I noticed here is using `managedLedgerOffloadThresholdInSeconds`
   
   
https://github.com/apache/pulsar/pull/17398/files#diff-a9946a41d139c80164fa362288fdf2ec6a5af5a901213881238e47517ddc0f3bR329



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to